Detailed explanation of CountDownLatch principle

introduce

When you read this article, you need to understand the principle of AQS first, because this article does not involve the explanation of the internal principle of AQS.

CountDownLatch is a synchronization assistant. When multiple threads execute tasks, we need to wait for the thread to complete the execution before executing the following statement. Previously, thread was used for thread operation The join method waits. CountDownLatch uses an AQS lock internally. The internal structure of AQS has been described earlier. In fact, there is a state field inside, which controls the operation of the lock. How does CountDownLatch control the execution of multiple threads to end? In fact, CountDownLatch uses state as a counter internally. For example, when we initialize, the state counter is 3 and three threads are started at the same time. When one thread executes successfully, reduce the state value by 1 every time one thread completes execution until it is reduced to 0, indicating that all threads have completed execution.

Source code analysis

Start the source code analysis with an example. The following content will decompose the source code according to the example. We start three threads. The main thread needs to wait for all three threads to complete the subsequent task processing. The source code is as follows:

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
      	// Three counters.
        CountDownLatch countDownLatch = new CountDownLatch(3);

        for (int i = 0; i < 3; ++i) {
            new Thread(new Worker(countDownLatch, i)).start();
        }
      	// Wait for all three threads to complete
        countDownLatch.await();
        System.out.println("3 All threads are completed");
    }

    // Porter worker thread work class.
    static class Worker implements Runnable {
        private final CountDownLatch countDown;
        private final Integer id;
				
        Worker(CountDownLatch countDown, Integer id) {
            this.countDown = countDown;
            this.id = id;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(500);
                doWork();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDown.countDown();
            System.out.println("The first" + id + "A thread executes the completed work");
        }

        void doWork() {
            System.out.println("The first" + id + "A thread starts working");
        }
    }
}

An example is given to illustrate the working principle of CountDownLatch. In the above example, we have started three threads. Each thread executes its own task respectively. The main thread waits for the three threads to complete. Let's see the output results:

Wait for three threads to complete
 The first thread starts working
 Thread 0 starts working
 The 0 th thread executes the completed work
 The first thread executes the completed work
 The second thread starts working
 The second thread executes the completed work
3 All threads are completed

Here, we think of the three threads as porters, who carry the goods to the vehicle. The three people must carry all the tasks assigned to them before they can trigger, that is, the truck driver needs to wait for all three people to complete before departure. At this time, the truck driver has a small notebook in his hand to record the total number of people transported this time. When the thread is not started, it is as follows

When the porters begin to work, each Porter is busy with his own task. If when worker 1 completes, he needs to report to the driver and say that I have completed the task, the driver will record in his small notebook that worker 1 has completed the task, and there are still two workers who have not completed the task.

Every time the worker completes the task at hand, he will report to the driver. When all the workers are finished, the number of people recorded in the worker's notebook has been completed, and the driver can start at this time, because three people have completed the handling work.

Through the above example, we have a general understanding of the simple principle of CountDownLatch. How to ensure that the driver (state) records who has completed and who has not completed? CountDownLatch completes the counter function through AQS state internally, and then makes a detailed analysis through the source code:

public class CountDownLatch {
    /**
     * Synchronous control,
     * The state of AQS is used to represent the count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        // Initialize the state value (that is, you need to wait for several threads to complete the task)
        Sync(int count) {
            setState(count);
        }
        // Get the state value.
        int getCount() {
            return getState();
        }
        // Get the lock.
        protected int tryAcquireShared(int acquires) {
            // Here, it is judged that the lock can be obtained only when state=0, otherwise, the current thread can not be obtained and put into the queue for blocking.
            // Here is the key point.
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Reduce the state. When the state is reduced to 0, the blocked thread can process.
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    // Lock object.
    private final Sync sync;

    /**
     * Initialize the synchronization lock object.
     */
    public CountDownLatch(int count) { 
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted. If the current count is zero, this method returns immediately. If the current count is greater than zero,
     * The current thread will be disabled for thread scheduling and sleep until the following two conditions occur:
     * 1.The count reached zero.
     * 2.If the current thread is interrupted.
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Wait for the counter to be cleared or interrupted. If it still does not work after waiting for a period of time
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted or the specified wait time has elapsed.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the state value.
     */
    public long getCount() {
        return sync.getCount();
    }
}

The CountDownLatch source code seems to be very few. From the CountDownLatch source code, we can see that the internal implementation is based on AQS. The internal class Sync class inherits from AbstractQueuedSynchronizer and implements tryacquishared and tryrereleaseshared. From the constructor, we can see that an AQS synchronization object will be created and the state value will be initialized, If the initialization count is less than 0, an exception is thrown.

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
  	// Initialize the state value of AQS.
    this.sync = new Sync(count);
}

According to the above example, let's take a look at the internal situation of AQS under initialization:

awit method

When the awit method is called, the acquiressharedinterruptible method of AQS is actually called internally. This method will call the tryAcquireShared method in Sync. Through the above example, we initialize the state value to 2 during initialization, but judge in Sync (getstate() = = 0)? 1 : -1; At this time, if the state value is 2 and it is determined to be false, it will return - 1. When a negative number is returned, the current thread will be suspended internally and put into the AQS queue until the AQS state value is reduced to 0, which will wake up the current thread, or if the current thread is interrupted, the thread will throw an InterruptedException exception, and then return.

/**
 * Causes the current thread to wait until the latch countdown reaches zero, unless the thread is interrupted. If the current count is zero, this method returns immediately. If the current count is greater than zero,
 * The current thread will be disabled for thread scheduling and sleep until the following two conditions occur:
 * 1.The count reached zero.
 * 2.If the current thread is interrupted.
 */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

When a thread calls the await method, the internal call is AQS acquiresharedinterruptible. Let's take a look at the internal acquiresharedinruptible method of AQS

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // Response interrupt
        if (Thread.interrupted())
            throw new InterruptedException();
        // Call the tryAcquireShared method.
        if (tryAcquireShared(arg) < 0)
            // Blocking thread: add the thread to the blocking queue and wait for other threads to resume the thread.
            doAcquireSharedInterruptibly(arg);
    }
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireSharedInterruptibly internally calls the tryAcquireShared method implemented by CountDownLatch internal class Sync. tryAcquireShared determines whether the state has been cleared, that is, whether the counter has been cleared. It can be executed only when it is cleared. If it is not cleared at this time, the current thread will be suspended and the suspended thread will be put into the blocking queue of AQS, Wait for other threads to wake up.

coutDown method

After the thread is executed, it will call countDown of CountDownLatch. The releaseShared method of AQS called internally in the countDown method is implemented in the Sync class. The main function of this method is to subtract the value in the state counter by 1. First, judge whether the state value has been modified to 0. If it is, the following operations will not be carried out, When the prevention status has been changed to 0, other threads also call the countDown operation, resulting in the state value becoming negative. When the state value decreases by 1, the waiting thread in the blocking queue will be notified. Assuming that one of the threads in the above example executes the countDown method first, then state=1 and wakes up the thread in the blocking queue, the thread will still call the tryAcquireShared method, If - 1 is found or returned, the current thread will also be suspended blocking and added to the blocking queue. The queue status is as follows:


When another thread completes execution and calls countDown, state decreases by 1 and becomes state=0. When waking up the waiting thread at this time, the result returned by tryAcquireShared is 1, and success will be returned directly.

summary

CountDownLatch uses AQS's state as the counter function. When initializing CountDownLatch, the state value will be initialized. When calling CountDownLatch's awit, it will judge whether the state counter has changed to 0. If not, suspend the current thread and join the AQS blocking queue. If a thread calls countDown of CountDownLatch, The operation at this time is to reduce the state counter by 1. Every time the operation is reduced, the thread in the blocking queue will be awakened. The thread will judge whether the state counter has been executed at this time. If it has not been executed, continue to suspend the current thread until the state counter is cleared or the thread is interrupted.

Love friends can pay attention to my WeChat official account, and I push the articles indefinite.

Keywords: Java Multithreading Concurrent Programming

Added by lasse48 on Sat, 29 Jan 2022 16:25:17 +0200