JDK source code -- CountDownLatch (AQS subclass)

  • CountDownLatch is used to synchronize one or more threads, forcing them to wait for a set of operations performed by other threads to complete. A typical use is to divide a program into n mutually independent solvable tasks and create a CountDownLatch with a value of n. When each task is completed, countDown will be called, waiting for the task whose problem is solved to call await to suspend itself until the end of counting.

Initialization

public CountDownLatch(int count) { //Parameter is the number of threads waiting to complete
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count); //Set AQS status to count
}

Task hang up

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg); //Whether the status has been set to 0
                if (r >= 0) { //The status has been set to 0. All pending tasks can be woken up
                    setHeadAndPropagate(node, r); //All waiting threads in the bounded blocking queue wake up
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //Suspend thread and return whether thread is interrupted
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

Release status on task completion

public void countDown() {
    sync.releaseShared(1);
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

View the number of tasks waiting to be completed

public long getCount() {
    return sync.getCount();
}

int getCount() {
   return getState();
}

Added by hoppyite on Mon, 02 Dec 2019 00:16:12 +0200