CountDownLatch source code reading

brief introduction

CountDownLatch is a thread synchronization tool provided by JUC. Its main function is to coordinate the synchronization between multiple threads, or to realize the communication between threads

CountDown, a number, can only be counted down. Latch, latch. Just look at the name to see how this CountDownLatch is used. Ha ha. CountDownLatch is equivalent to a counter. The initial value of the counter is set through the parameters of the construction method. The thread calling the await method of the CountDownLatch instance will wait until the counter changes to 0 before being awakened and continue to execute downward. So how does the counter become 0?
If other threads call the countDown method of the CountDownLatch instance, the count value will be reduced by 1. When it is reduced to 0, it will let those threads that block waiting due to calling the await method continue to execute. This enables synchronization between these threads

Author: Wine lie Source: https://www.cnblogs.com/frankiedyz/p/15730573.html
Copyright: the copyright of this article belongs to the author and the blog park
Reprint: reprint is welcome, but this statement must be retained without the consent of the author; The original connection must be given in the article; Otherwise, legal responsibility must be investigated

Usage scenario

The direct introduction may be too abstract and difficult for beginners to understand. The best way is to introduce it through an actual scene

There is a general scenario: the main thread starts multiple sub threads to execute multiple sub tasks in parallel, waits for all sub threads to complete execution, and the main thread collects and counts the execution results of sub threads

Scenario example

For example, the main thread waits for A and B to transfer money to it, collects all the money and puts it into the bank to earn interest. The example code is as follows:

public class TestCountDownLatch {

    // This must be an atomic class. It is necessary to ensure that the modification of money is an atomic operation in order to ensure thread safety
    // You can't just set money to volatile int
    private static final AtomicInteger money = new AtomicInteger(0);

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);

        Thread threadA = new Thread(() -> {
            try {
                System.out.println("A Transfer 30 yuan to me");
                Thread.sleep(1000);
                
                // Thread A and thread B must use CAS to modify money, otherwise an error may occur
                int origin = money.get();
                while (!money.compareAndSet(origin, origin + 30)) {
                    origin = money.get();
                    continue;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            System.out.println("A Transfer completed");
        });

        Thread threadB = new Thread(() -> {
            try {
                System.out.println("B Transfer 70 yuan to me");
                Thread.sleep(1000);
                int origin = money.get();
                while (!money.compareAndSet(origin, origin + 30)) {
                    origin = money.get();
                    continue;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            System.out.println("B Transfer completed");
        });

        System.out.println("etc. A and B Transfer to me...");
        threadA.start();
        threadB.start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // System.out.println(latch.getCount());
        System.out.println("After the transfer is completed, deposit the money in the bank...");

        // There is no need to modify money by CAS, because only the main thread can modify money
        int origin = money.get();
        money.set(origin * 2);
        System.out.println("Remove the money and add the principal and profit together" + money.get() + "element");
    }
}

The command line output is as follows:

Click to view the code
etc. A and B Transfer to me...
A Transfer 30 yuan to me
B Transfer 70 yuan to me
A Transfer completed
B Transfer completed
 After the transfer is completed, deposit the money in the bank...
After removing the money, the principal plus profit is 120 yuan

Process finished with exit code 0

Thanks to the synchronization function of CountDownLatch, when the execution of the above code ends, the value of money must be 120, not 0, 60 or 140. Because the main thread will be consistent await until thread A and thread B both execute latch Countdown() will continue to execute

Implementation principle

The implementation principle of CountDownLatch is actually AbstractQueuedSynchronizer (AQS)

CountDownLatch has an internal class sync, which implements some hook methods defined by AQS class. CountDownLatch implements all functions through sync class instance sync. The method calling CountDownLatch will be delegated to the sync domain for execution

// All CountDown functions are delegated to this Sync class object
private final Sync sync;

Therefore, to understand CountDownLatch, you must understand AQS and sync classes. Next, let's analyze the source code with me to see what this Sync has done

Author: Wine lie Source: https://www.cnblogs.com/frankiedyz/p/15730573.html
Copyright: the copyright of this article belongs to the author and the blog park
Reprint: reprint is welcome, but this statement must be retained without the consent of the author; The original connection must be given in the article; Otherwise, legal responsibility must be investigated

Source code analysis

Construction method

The count parameter can be passed in the constructor of CountDownLatch, indicating that the countDown method must be called count times before the thread calling await can continue to execute downward. Its source code is as follows:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

In fact, a sync class object is initialized and injected into the sync domain of CountDownLatch. Sync is constructed as follows:

Sync(int count) {
    setState(count);
}

The Sync constructor actually sets the state in AQS to count

Important conclusion: AQS state indicates the current count value of CountDownLatch

await

The await method is an instance method. The thread calling it blocks and waits until the count value of the CountDownLatch object drops to 0. If the count value is already 0 when await is called, it will not be blocked

The await method responds to interrupts:

  • If a thread has been interrupted before calling the await method, an interrupt exception will be thrown directly during the call
  • If a thread calls the await method to block the waiting process and receives an interrupt signal, it will throw an interrupt exception

Having said so much, let's take a look at the source code of await:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

It can be seen that this method is actually delegated to sync class object sync to execute. Acquiresharedinterruptible here has been implemented by AQS, the parent class of sync class. The source code is as follows:

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

This code is in The most detailed AbstractQueuedSynchronizer(AQS) source code analysis in the whole network There is a detailed introduction in the series, but it does not involve specific application classes (such as CountDownLatch), but has been analyzed from a strategic perspective. Here, we just use CountDownLatch to better understand it

As can be seen from the acquiresearedinterruptible source code, if the thread has been set to interrupt state before calling await, then the InterruptedException exception will be thrown directly

Next, the method will call the hook method tryAcquireShared. The Sync class provides a specific implementation of the method. The source code is as follows:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

Although AQS does not provide a specific implementation for tryAcquireShared, it specifies the meaning of the return value:

  • Negative number: indicates that the acquisition fails, and the thread needs to be added to the synchronization queue to block and wait
  • 0: indicates that the acquisition of shared resources is successful, but subsequent acquisition of shared resources will not be successful
  • Positive number: indicates that the acquisition of shared resources is successful, and subsequent acquisition may also be successful

Let's analyze the tryAcquireShared method implemented by the Sync class:

  • If the state is not 0, that is, the count value of the CountDownLatch object has not been reduced to 0, then - 1 will be returned. The doacquiresharedinterruptible method will continue to be executed, and the thread calling await will be added to the synchronization queue to block waiting
  • If the state is 0, that is, the count value of the CountDownLatch object has been reduced to 0, 1 will be returned, and await will be returned directly without blocking

Note: see for detailed analysis of doacquireeshared interruptible The most detailed AbstractQueuedSynchronizer(AQS) source code analysis in the whole network series

countDown

The countDown method is also an instance method. Calling it will reduce the count value of the CountDownLatch object by 1. If it is exactly reduced to 0, all threads blocked by calling await will wake up. Its source code is as follows:

public void countDown() {
    sync.releaseShared(1);
}

This method is actually delegated to sync for execution. The releaseShared method here provides a specific implementation in AQS, the parent class of sync class. Its source code is as follows:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

This method first calls the hook method tryrereleaseshared, which provides a specific implementation in the Sync class. The source code is as follows:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Although AQS does not provide a specific implementation for tryrereleaseshared, it specifies the meaning of the return value:

  • true: This release of resources may wake up a blocked waiting thread
  • false: otherwise

Let's analyze the tryrereleaseshared method implemented by the Sync class:
All the code of this method is contained in a for loop, which is to deal with CAS failure. Modify the state of CAS in the loop, that is, subtract 1 from the count value of CountDownLatch
If the count value of CountDownLatch decreases by 1 and becomes 0, it returns true. Then the releaseShared method will continue to call the doReleaseShared method to wake up subsequent threads in the synchronization queue
If it is not 0, it returns false and nothing happens~

Note: the purpose of the doReleaseShared method is to wake up the first thread of the queue and ensure the state propagation. See for the detailed explanation of this method The most detailed AbstractQueuedSynchronizer(AQS) source code analysis in the whole network series

getCount

The getCount method returns the current count value of the CountDownLatch object. The source code is as follows:

public long getCount() {
    return sync.getCount();
}

In fact, it is delegated to the getCount method of the sync object for execution. Its source code is as follows:

int getCount() {
    return getState();
}

In fact, it is very simple to call the getState method of AQS and return the current state, that is, the count value of CountDownLatch~

Author: Wine lie Source: https://www.cnblogs.com/frankiedyz/p/15730573.html
Copyright: the copyright of this article belongs to the author and the blog park
Reprint: reprint is welcome, but this statement must be retained without the consent of the author; The original connection must be given in the article; Otherwise, legal responsibility must be investigated

Differences between CountDownLatch and join methods

Of course, in the above scenario, we can also use Thread's object method join to achieve this, calling the join method of all child threads in the main thread, and executing the result collection and statistical task. If the above example is implemented using the join method, the code is as follows:

public class TestJoin {
    private static final AtomicInteger money = new AtomicInteger(0);

    public static void main(String[] args) {
        Thread threadA = new Thread(() -> {
            try {
                System.out.println("A Transfer 30 yuan to me");
                Thread.sleep(1000);

                int origin = money.get();
                while (!money.compareAndSet(origin, origin + 30)) {
                    origin = money.get();
                    continue;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A Transfer completed");
        });

        Thread threadB = new Thread(() -> {
            try {
                System.out.println("B Transfer 70 yuan to me");
                Thread.sleep(1000);
                int origin = money.get();
                while (!money.compareAndSet(origin, origin + 30)) {
                    origin = money.get();
                    continue;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B Transfer completed");
        });

        System.out.println("etc. A and B Transfer to me...");
        threadA.start();
        threadB.start();
        try {
            threadA.join();
            threadB.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("After the transfer is completed, deposit the money in the bank...");

        int origin = money.get();
        money.set(origin * 2);
        System.out.println("Remove the money and add the principal and profit together" + money.get() + "element");
    }
}

The command line output is as follows:

Click to view the code
etc. A and B Transfer to me...
A Transfer 30 yuan to me
B Transfer 70 yuan to me
B Transfer completed
A Transfer completed
 After the transfer is completed, deposit the money in the bank...
After removing the money, the principal plus profit is 120 yuan

Process finished with exit code 0

However, unlike CountDownLatch with AQS, the execution principle of the join method is to constantly check whether the calling thread has completed execution. If not, let the current thread wait. Otherwise, notifyAll will be called to wake up the current thread

From the implementation principle, we can see that their differences mainly lie in two points:

  • The join method is not as flexible as CountDownLatch: when using the join method, you must wait for the calling thread to finish executing, and then you can't continue executing. The countDown method of CountDownLatch can be placed in the middle of the run method of the calling thread, so that the calling thread can wake up other await threads without ending execution
  • The thread calling the join method will always consume CPU resources and will not block and hang, i.e. "busy", while the thread calling the await method of CountDownLatch will be blocked and hung to give up the CPU execution right. The CPU resources can be occupied only after the conditions are appropriate and scheduled by the thread

Keywords: Java Concurrent Programming

Added by hmb3801 on Tue, 04 Jan 2022 19:10:31 +0200