[Java deep series] concurrent programming series let's explore the technical principle and source code analysis of CyclicBarrier

CyclicBarrier and CountDownLatch

CyclicBarrier and CountDownLatch are both located in Java util. Under the concurrent package, the core points of its working principle are as follows:

Working principle analysis of CyclicBarrier

Then let's share and analyze jdk1 8. How CyclicBarrier works.

A brief understanding of CyclicBarrier

What is CyclicBarrier?

  1. CyclicBarrier is understood literally in English. At first glance, it seems that the circular fence has little to do with the synchronizer, while the fence is blocked in rows, which seems to mean a bit of synchronous waiting;

  2. CyclicBarrier is also a synchronization help tool, which allows multiple threads to wait for each other, that is, multiple threads are blocked when they reach the synchronization point, and the fence will not be opened until the last thread reaches the synchronization point;

  3. There are no so-called static internal classes of fair lock / unfair lock in the CyclicBarrier, but the reentrantlock and conditionobject are used to realize the function of waiting for each other between threads;

state keyword of CyclicBarrier

  1. CyclicBarrier has no real state keyword. It only has the total number of parties threads, and count has not entered the number of blocked threads;

  2. The implementation of CyclicBarrier indirectly utilizes the state variable value of AQS, the parent class of reentrantlock (exclusive lock);

  3. CountDownLatch, group A, B and C threads execute at the same time. If a finishes executing first, wait there, and start executing their own events only after the longest executing thread among all threads a, B and C finishes executing;

Common and important methods

 // The total number of fences created for a given value, that is, the maximum number of participating threads supported
public CyclicBarrier(int parties)

// Create the total number of fences with a given value, that is, the maximum number of participating threads supported, and call back the barrierAction method when the last thread finishes executing
public CyclicBarrier(int parties, Runnable barrierAction)

// Update the generation, change the dynasty, trigger to wake up all threads waiting on the Lock object, and release all threads that are blocking
private void nextGeneration()

// Break the balance, set the break balance flag, and then wake up all blocked threads,
private void breakBarrier() 

// Causes the current thread to block until another thread calls trip Signal () or trip The signalall() method wakes up the thread
public int await()

// There are two more parameters than await(), which means the maximum duration of blocking waiting semaphore. The waiting time value is timeout, and the unit is unit;
public int await(long timeout, TimeUnit unit)

// The core method of blocking waiting. If you do not need to wait for semaphores over time, the nanos parameter is useless, otherwise it is useful
private int dowait(boolean timed, long nanos)

// The waiting between threads, whether such a waiting balance system has been broken
public boolean isBroken() 

// Reset to the initial state value, just like the initial creation of the CyclicBarrier instance object, the initial state value is clean
public void reset()

// Gets the value of the number of threads currently blocked
public int getNumberWaiting()

// Get the number of threads, that is, the total number of fences
public int getParties()

Design and implementation of pseudo code

Waiting to be released:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

The core method of blocking waiting;

  • Trip. Will be called internally The await () method enters the Condition waiting queue;
  • Once the number of fences is zero, the Condition waiting queue will be transferred to the waiting blocking queue of CLH one by one;
  • All threads wake up and wait for the internal lock of the dowait method Unlock() releases threads one by one to wait;
  • The last thread blocked also has the opportunity to execute the interface callback passed in by the constructor;

CyclicBarrier's detailed understanding of life

For example, in the 100 meter race, I will take the race as an example to illustrate the principle of the CyclicBarrier. Scene: ten people participate in the 100 meter race, and there is a referee count at the end;

  • When the gun rang, ten people rushed to the finish line. It was really exciting for many seconds;

  • When a man reaches the finish line, he will finish his race and play at the same time. Then the referee will subtract one person;

  • As the personnel reached the end point one after another, the final referee count showed that there were 0 people who did not arrive, which means that the personnel have reached the end point;

  • Then the referee took the registered results and input them into the computer for registration;

  • This is the end of this series of actions. It is considered that group A thread waits for the operation of other groups of threads until the counter is zero, and then A does other things;

Source code analysis CyclicBarrier

CyclicBarrier constructor

Constructor source code

Create the total number of fences with a given value, that is, the maximum number of participating threads. However, construction method 2 can also call back through the incoming interface. When the last blocked thread is released, it will have the opportunity to execute the incoming callback interface barrierAction;

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

await()

  • The core method of blocking waiting will call trip internally The await () method enters the Condition waiting blocking queue. Once the number of fences is zero, the Condition waiting queue will be transferred to the CLH waiting blocking queue one by one;

  • All threads wake up and wait for the internal lock of the dowait method Unlock () releases the thread waiting one by one, and the last thread blocked has the opportunity to execute the interface callback passed in by the constructor;

    /**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting,
     * or if the barrier {@linkplain #isBroken is broken} when
     * {@code await} is invoked, or while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
     * then all other waiting threads will throw
     * {@link BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.
     * If an exception occurs during the barrier action then that exception
     * will be propagated in the current thread and the barrier is placed in
     * the broken state.
     *
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first
     *         to arrive and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was
     *         broken when {@code await} was called, or the barrier
     *         action (if present) failed due to an exception
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L); // The core method of blocking is to complete the blocking wait through the combination of ReentrantLock and Condition
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

dowait(false, 0L); // Again, the core method of blocking is to complete the blocking wait through the combination of ReentrantLock and Condition

3.3,dowait(boolean, long)

  • dowait method is the core method of CyclicBarrier to realize blocking waiting. When await method is called, blocking waiting is maintained by a queue of Condition;
  • However, when a thread jumps out of await, normally, the semaphore is sent and the blocking is released, then the waiting queue of Condition will be transferred to the waiting queue of AQS;
  • Then a gradual lock is released, and finally the CyclicBarrier is in the initial value state for the next call;
  • Therefore, every time the CyclicBarrier runs out of a set of the whole process, it will return to the initial state value and can be used as a newly created object in other places, so it becomes a circular fence;
    /**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock; // Get exclusive lock
        lock.lock(); // The CLH queue of its parent AQS is blocked here by locking, but why does it continue to enter the critical area and execute the try method? The reason is trip The code await()
        try {
            final Generation g = generation;

            if (g.broken) // Once the balance is broken, all other threads will throw exceptions, because even if no throwing exception is encountered here, there will be an if (g.broken) judgment below
                throw new BrokenBarrierException();

            if (Thread.interrupted()) { // Check whether the thread has been interrupted elsewhere. If any thread has been interrupted
                breakBarrier(); // Then, break the balance, set the break balance flag, restore the initial state value, and then wake up all blocked threads,
                throw new InterruptedException();
            }

            int index = --count; // Execute a minus 1 operation. Under normal circumstances, count indicates how many have not entered the critical area, that is, they are still in the lock blocking queue
            if (index == 0) {  // When the count value is reduced to 0, it indicates that all threads have finished executing, so you can change the Dynasty and do other things together
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand; // Interface callback object passed in by constructor
                    if (command != null) // When the interface is not empty, the last executing thread has the opportunity to consume the callback method
                        command.run();
                    ranAction = true;
                    nextGeneration(); // After the dynasty changes, all the executed have been executed and restored to the initial state value so that they can be reused next time
                    return 0;
                } finally {
                    if (!ranAction) // If the last thread looks to be finished, if there are any exceptions, it will still break the overall balance, either live or die together
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) { // Dead cycle operation mode of spin
                try {
                    if (!timed) // If you do not need to use the timeout waiting semaphore, you can call trip directly below Await() enters blocking wait
                        trip.await(); // Under normal circumstances, the code will not move when it is executed. The park method has been called inside the method, causing the thread to block and wait
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos); // Wait for semaphores for a specified time
                } catch (InterruptedException ie) { // If it is interrupted due to during blocking waiting
                    if (g == generation && ! g.broken) { // If the dynasty has not changed and the balance flag is still false, continue to break the balance and throw an interrupt exception
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken) // There is also an if (g.broken) judgment. Once the balance is broken, all other threads will throw exceptions
                    throw new BrokenBarrierException();

                if (g != generation) // If the dynasty has been changed, the index value will be returned directly
                    return index;

                if (timed && nanos <= 0L) { // If the timeout flag is set, the balance will be broken as long as it is less than or equal to zero, whether it is the incoming nano value or the nano returned after waiting
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock(); // Release lock
        }
    }

breakBarrier()

Break the balance, set the break balance flag, and then wake up all blocked threads;

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true; // Set a sign to break the balance
        count = parties; // Restore count to the initial value
        trip.signalAll(); // Send semaphore to wake up the waiting queue in all conditions
    }

nextGeneration()

Wake up all queues waiting in Condition, restore the initial state value, replace the reference of generation again, change the dynasty, and prepare for the next round of operation;

/**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

Await of AQS ()

Method of the CyclicBarrier member property trip (condition type) object:

  1. The await method of AQS is because it involves why the exclusive lock is used After lock, the dowait method calls trip If await () blocks, how can the second and third threads still pass lock After the lock call, you can still enter the critical area.
  2. The AQS method will call fullyRelease(node) to release the lock held by the current thread, so lock Lock will not be blocked there all the time;
  3. And Condition also maintains its own linked list, usually by calling trip The thread of await () method will first enter the Condition queue, then release the exclusive lock, and try to call the park method to lock the current thread;
  4. Then, when notified by the semaphore, the node of the Condition queue will be transferred to the synchronization queue of AQS, and then wait for the unlock call to release the lock one by one;
	/**
	 * Implements interruptible condition wait.
	 * <ol>
	 * <li> If current thread is interrupted, throw InterruptedException.
	 * <li> Save lock state returned by {@link #getState}.
	 * <li> Invoke {@link #release} with saved state as argument,
	 *      throwing IllegalMonitorStateException if it fails.
	 * <li> Block until signalled or interrupted.
	 * <li> Reacquire by invoking specialized version of
	 *      {@link #acquire} with saved state as argument.
	 * <li> If interrupted while blocked in step 4, throw InterruptedException.
	 * </ol>
	 */
	public final void await() throws InterruptedException {
		if (Thread.interrupted())
			throw new InterruptedException();
		Node node = addConditionWaiter(); // Wrap the current thread and add it to the linked list queue maintained by Condition itself
		int savedState = fullyRelease(node); // Release the lock held by the current thread. If not, call lock the second time Lock();
		// If the first one is not completed, it will block and wait all the time, and the function of the fence will not be completed.
		int interruptMode = 0;
		while (!isOnSyncQueue(node)) { // Is it in the AQS queue
			LockSupport.park(this); // If it is not in the AQS queue, it will block and wait. This is the most core blocking place
			if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
				break;
		}
		// If you are in the AQS queue, you should consider re entering the lock, re competing for the lock and re resting
		if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
			interruptMode = REINTERRUPT;
		if (node.nextWaiter != null) // clean up if cancelled
			unlinkCancelledWaiters();
		if (interruptMode != 0)
			reportInterruptAfterWait(interruptMode);
	}

Practical usage of CyclicBarrier

CyclicBarrier provides 2 constructors:

public CyclicBarrier(int parties, Runnable barrierAction) {}
public CyclicBarrier(int parties) {}
  • parties: refers to the number of threads or tasks waiting to the barrier state;
  • barrierAction: the content that will be executed when these threads reach the barrier state.

The most important method in CyclicBarrier is the await method

//Suspend the current thread until all threads reach the barrier state, and then execute subsequent tasks at the same time;
public int await() throws InterruptedException, BrokenBarrierException { };

//Let these threads wait for a certain time. If there are still threads that do not reach the barrier state, directly let the threads that arrive at the barrier execute subsequent tasks
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };
public class cyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("End of thread group execution");
            }
        });
        for (int i = 0; i < 5; i++) {
            new Thread(new readNum(i,cyclicBarrier)).start();
        }
    }
    static class readNum  implements Runnable{
        private int id;
        private CyclicBarrier cyc;
        public readNum(int id,CyclicBarrier cyc){
            this.id = id;
            this.cyc = cyc;
        }
        @Override
        public void run() {
            synchronized (this){
                System.out.println("id:"+id);
                try {
                    cyc.await();
                    System.out.println("Thread group task" + id + "End, other tasks continue");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Output results:

id:1
id:2
id:4
id:0
id:3
 End of thread group execution
 Thread group task 3 ends and other tasks continue
 Thread group task 1 ends and other tasks continue
 Thread group task 4 ends and other tasks continue
 Thread group task 0 ended and other tasks continued
 Thread group task 2 ends and other tasks continue

summary

  1. After analyzing CountDownLatch and Semaphore, CyclicBarrier obviously has a solid foundation and is much easier to analyze;
  2. Here I briefly summarize some features of the CyclicBarrier process:
    • The purpose is to make a group of threads wait for each other until they all reach the public barrier point, and then start their own work;
    • It can be reused. Every time the process is completed normally or ends abnormally, the next round can continue to use the CyclicBarrier to realize the thread waiting function;
    • Co existence: as long as one thread is interrupted due to an exception, other threads will be awakened to continue working, and then exception handling will be thrown;

Keywords: Java Back-end

Added by payjo on Tue, 25 Jan 2022 07:36:16 +0200