CountDownLatch is a thread synchronization tool provided in the JUC package. Using CountDownLatch, one or more threads can wait until a group of operations are completed in other threads.
CountDownLatch is implemented based on AQS, and there is not much code, so it is easy to analyze. This article only analyzes the implementation of CountDownLatch, and the implementation of AQS is described in another article.
The following is the class diagram of CountDownLatch:
Next, paste the source code of CountDownLatch:
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
First of all, instantiate CountDownLatch, there is not much to analyze, just passing in a number in the constructor, which we can understand as "total operands". Because it is a multithreaded shared access operand, the related shared access methods in AQS will be called later.
Secondly, await() is a wait function. When the remaining "operands" are greater than 0, the current thread will be blocked. Call AQS:: acquireSharedInterruptibly() through Sync, and the code is as follows:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
It can be seen that acquiresharedinterruptible() will call back the tryAcquiredShred() method to obtain the sharing mode for subsequent operations. If the acquisition succeeds, the subsequent operations will continue, and if the acquisition fails, the blocking will occur.
During the implementation of tryAcquiredShared, please note that there are three return values: a negative value indicates failure, a zero value indicates success in acquiring the sharing mode, but the subsequent acquisition of the sharing mode will not succeed, a positive value indicates success in acquiring the sharing mode and subsequent acquisition may succeed. So if the operand is greater than 0, it needs to return a negative value so as to block. This is how the CountDownLatch::tryAcquireShared code is implemented, as follows:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
Third, countDown() indicates that an operand is consumed. AQS::releaseShared() method is called through countDown(), which determines whether the consumption is successful. If it is successful, true is returned, otherwise false is returned.
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
The call tryreleased() returns true to allow a thread of await() to obtain shared resources and interrupt blocking.
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }