1, Introduction
It's best to familiarize yourself with CyclicBarrier before you understand it CountDownLatch The two tool classes can achieve the effect of thread waiting, but their focus is different. CountDownLatch can only be used once. When its counter is 0, it will wake up all waiting threads in the synchronization queue, and then it will not be available. The CyclicBarrier counter can be reset to achieve the effect of reuse, so it is more suitable for complex business scenarios. For example, if there is an error in the calculation, you can reset the calculator to make the thread execute again
CyclicBarrier: loop fence (loop barrier), through which a group of threads can wait to a certain state (barrier point) and then execute all at the same time. It is called loopback because when all waiting threads are released, the CyclicBarrier can be reused.
The approximate working principle diagram is as follows:
CyclicBarrier provides two construction methods:
// parties indicates the number of threads intercepted by the barrier. Each thread calls the await method and is blocked until the last thread arrives, and then wakes up the previously blocked thread. public CyclicBarrier(int parties) // It is used to give priority to executing barrierAction when the thread reaches the barrier (the last thread arrives), which is convenient for handling more complex business scenarios (the execution time of the thread is to execute after reaching the barrier) public CyclicBarrier(int parties, Runnable barrierAction)
The most important method in CyclicBarrier is await()
//When all the specified number of threads call the await() method, these threads will no longer block, that is, the last arriving thread will wake up the previously blocked thread // BrokenBarrierException indicates that the fence has been damaged. The reason for the damage may be that one of the threads was interrupted or timed out while await() public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
In addition to the above, the following methods are provided:
// Gets the number of threads required to pass the barrier public int getParties() // Judge whether the barrier is in an interrupted state public boolean isBroken() // Reset the number of threads required to pass the barrier public void reset() // Gets the number of threads in the waiting state public int getNumberWaiting()
2, Usage scenario
2.1 consolidated calculation results
At the same time, there are multiple threads to calculate the data, and finally merge them. Next, simulate three threads to query the price of goods respectively, and finally calculate the average price of goods
In the following example, the CyclicBarrier(int parties, Runnable barrierAction) construction method of CyclicBarrier is adopted. After each thread queries the commodity price, it calls the await() method to block. When the last thread calls the await() method, it will execute the logic of calculating the average price and wake up the blocked thread
//Save the price of each category of goods ConcurrentHashMap<String, Integer> priceMap = new ConcurrentHashMap<>(); ExecutorService executorService = Executors.newFixedThreadPool(3); CyclicBarrier cyclicBarrier = new CyclicBarrier(3,() ->{ Collection<Integer> priceCollect = priceMap.values(); long avePrice = priceCollect.stream().reduce(Integer::sum).get()/3; System.out.println("Average price of goods:"+avePrice); }); for (int i = 0; i < 3; i++){ executorService.execute(() ->{ // Simulated query of commodity price int price = (int) (Math.random() * 100); priceMap.put(Thread.currentThread().getName(), price); System.out.println(Thread.currentThread().getName()+"Price is"+price); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { log.error(e); } }); }
2.2 full passenger departure
The counter of CyclicBarrier can be reset and the barrier can be reused, which can support scenes like "full departure"
public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("Ready to start")); for (int i=0; i<10;i++){ executorService.submit(() -> { try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000)); System.out.println("On board" + (cyclicBarrier.getNumberWaiting()+1) + "personal"); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } }
The console print results are as follows:
1 person on board 2 people on board Three people have been on board Four people have been on board Five people have been on board Ready to start 1 person on board 2 people on board Three people have been on board Four people have been on board Five people have been on board Ready to start
3, Source code analysis
After reading the CountDownLatch source code, we know that it is implemented through shared locks, and the CyclicBarrier is completed by using ReentrantLock exclusive lock and Condition queue. In terms of implementation, the CyclicBarrier is more complex than CountDownLatch, because it also involves the transfer of threads from Condition queue to synchronization queue, and the source code of this part is a little complex.
3.1 construction method
Take the multi parameter construction method as an example:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
In this construction method, there is an additional input parameter of Runnable type. When all threads reach the barrier, they go back and call its run(). At the same time, the parties and count in the construction method record the number of threads required to cross the barrier, but the parties record a copy, which will be used when the counter is reset, and the count records how many threads need to wait to reach the barrier, This value is decremented by one when each thread calls the await() method
3.2 thread blocking and wakeup
The blocking and wake-up of threads in the CyclicBarrier are implemented in the await() method, which will complete the transfer of threads from the condition queue to the synchronization queue, as well as the blocking and wake-up of threads. The following focuses on the details of the implementation of the source code of this method
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
The core method is dowait(). The following is the source code of the whole method, which is divided into three parts to analyze one by one
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
3.2.1 function of lock
In the dowait() method, the lock operation is performed first. What is the purpose of this step?
final ReentrantLock lock = this.lock; lock.lock();
In the CyclicBarrier class attribute, there are two special attributes, the exclusive lock and the condition object of the condition queue
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition();
In the dowait() method, the await() and signalAll() methods of Condition will be called, and the calls of these two methods must be used with lock(), which is the same reason that wait() and notify()/notifyAll() must be used with the Synchronized keyword
3.2.2 thread blocking
In the dowait() method, count will be subtracted by one, and then it will be judged whether to block or wake up
int index = --count;
If it is not equal to 0, it will call for (;) In the loop, the await() method of Condition will be called
for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } }
In AbstractQueuedSynchronizer, its internal class ConditionObject implements the Condition interface and await() method
This method is a bit confusing. First, call the addCondition() method to encapsulate the current thread into a node, and then add it to the condition queue; In the dowait() method, the lock() method is called, which blocks the thread that calls the await() method later. Here, we call the fullRelease() method to release the lock, which is to wake up the thread blocked by the await() method, but failed when lock() fails.
Then call the isOnSyncQueue() method to determine whether the node of the currently generated condition queue is in the synchronization queue (of course not at this time). If it is not in the synchronization queue, directly call locksupport The park () method blocks it
After the last thread calls the await() method, it will transfer the nodes of the condition queue to the synchronization queue and wake up these threads, so that it can jump out of the loop
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Let's take a detailed look at several core methods introduced above
3.2.2.1 add node to condition queue
This method is also very simple. This Node is the same class as the Node in ReentrantLock. When the Node is used for condition queue, it can mainly use the two attributes of nextWaiter and waitStatus of Node
As described in the previous ReentrantLock article, the synchronous queue is implemented by using a two-way linked list, while the conditional queue is implemented by using a one-way linked list
In this method, first remove the node whose state is not CONDITION(-2) from the condition queue, and then encapsulate the current thread into a node of the condition queue. Unlike the synchronization queue, the condition queue has no head node with NULL thread, its head node is the first waiting node, and the tail node is the last waiting node
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
The structure diagram of condition queue is as follows:
3.2.2.2 release lock
fullyRelease() is also relatively simple. First, call the getState() method to obtain the value of state. Because it is an exclusive lock, the return value is 1, and then call the release() method. This method is also very simple, that is, call tryRelease() to change the value of state to 0 with the help of CAS, and then call the unparksuccess () method to wake up the thread blocked by previous lock failure, The source code of the two methods, release() and tryRelease(), has been introduced in the previous ReentrantLock article and will not be repeated here
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3.2.3 thread transfer and wake-up
As described above, after the thread is encapsulated as a node and added to the condition queue, it is blocked again. When do you wake up?
Of course, you have to wait for the last thread to call the await() method. At this time, the value of count has changed to 0, indicating that the thread waiting before can cross the barrier with the current thread.
Its core source code is the source code of the dowait() method
int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
It can be clearly seen in this code that when the last thread arrives, it will first execute the run() of the construction parameter Runable object
Then call the nextGeneration() method. It can also be seen from the name of this method. It means that the current round of waiting is over and the next round of thread can be executed repeatedly, and the thread wake-up is realized in this method
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
First, call the signalAll() method of Condition to wake up the thread, and then reset the counter. Here you can understand the role of parties as a copy. The following focuses on the signalAll() method, which has also been implemented in ConditionObject
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
The above series of method calls are relatively simple. In the doSignalAll() method, transferForSignal() is called circularly from the head node of the condition queue, and the main implementation is also in the method.
In transferForSignal(), first, change the node state of the condition queue from condition state to 0 (initialization state) through CAS. The purpose of this step is to add the node to the synchronization queue. The next enq() method is familiar. It has also been introduced in the ReentrantLock article. Its function is to add the node to the synchronization queue, Then the previous node of the current node
Finally, change the state of the previous node to SIGNAL through CAS, indicating that it needs to wake up the threads of the following nodes
Note that the signalAll() of Condition does not really wake up the thread. It just completes the transfer of the node of the Condition queue to the synchronization queue, and changes the state of the front node to SIGNAL, indicating that it can wake up
At this time, the structure of the synchronization queue corresponding to the CyclicBarrier is roughly as follows:
When the last thread calls await(), it will go back to complete all the above operations, and finally call lock in the finally code segment of dowait() Unlock(), and only here can we really wake up the blocked threads in the synchronization queue
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
Note that here, it only wakes up the thread waiting for the first node in the synchronization queue, and it just wakes up without doing any other operation. When the thread is awakened, it returns to the place where it is blocked, that is, in the await() method of Condition. At that time, it was blocked in the while loop. Now it is awakened, and the current node is also in the synchronization queue, The following is to call the acquirequeueueueueueueueued () method, which will remove the currently awakened thread node from the synchronization queue and finally return to the Condition's dowait() method
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
When the first thread of the synchronization queue returns to the dowait() method, it will exit at the following code, but it will also call lock in the finally block before exiting Unlock(), so that all threads in the synchronization queue can be awakened in turn
if (g != generation) return index;