Java Review - Concurrent Programming_ CountDownLatch Principle & source code analysis

Article catalog

Pre

Daily blog - CountDownLatch usage scenario analysis and source code analysis

In daily development, we often encounter the scenario that multiple threads need to be opened in the main thread to execute tasks in parallel, and the main thread needs to wait for all sub threads to execute before summarizing.

Before the emergence of CountDownLatch, the join() method of thread is generally used to achieve this, but the join method is not flexible enough to meet the needs of different scenarios. Therefore, the JDK development team provides the class CountDownLatch, which will be more elegant to use CountDownLatch

Small Demo

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author Small craftsman
 * @version 1.0
 * @description: TODO
 * @date 2021/12/19 10:46
 * @mark: show me the code , change the world
 */
public class CountDownLatchTest {

    // Create a CountDownLatch instance
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " Simulate business operation");

            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " Business operation Over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // The execution of the child thread ends, minus 1
                countDownLatch.countDown();
            }


        });


        executorService.submit(() -> {
            System.out.println(Thread.currentThread().getName() + " Simulate business operation");

            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + " Business operation Over");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // The execution of the child thread ends, minus 1
                countDownLatch.countDown();
            }



        });

        // Wait for the child thread to return after execution
        countDownLatch.await();
        System.out.println( "Sub thread business operation Over,The main thread continues to work");

        executorService.shutdown();
    }
}

In the above code,

  • An instance of CountDownLatch is created. Because there are two child threads, the parameter passed to the constructor is 2.
  • The main thread calls countdownlatch Await () method will be blocked after.
  • After Zi Xiancheng finished, he called countDownLatch.. The countdown () method decrements the counter inside countdownlatch by 1
  • After all the child threads have completed execution and called the countDown () method, the counter will change to 0, and the await () method of the main thread will return.

CountDownLatch VS join method

  • After calling the join () method of a child thread, the thread will be blocked until the child thread runs
  • CountDownLatch uses a counter to allow the child thread to finish running or decrement the count during running, that is, CountDownLatch can let the await method return at any time when the child thread runs, without necessarily waiting until the thread ends
  • In addition, when using the thread pool to manage threads, we usually add Runable directly to the thread pool. At this time, there is no way to call the thread join method, that is, countDownLatch gives us more flexible control over thread synchronization than the join method

Class diagram relationship

As can be seen from the class diagram, CountDownLatch is implemented using AQS.

Through the following constructor, the counter value is actually assigned to the AQS state variable state, that is, the AQS state value is used to represent the counter value.

 /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
 Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

Core method & source code analysis

Next, analyze several important methods in CountDownLatch to see how they call AQS to realize functions.

void await()

When the thread calls the await method of the CountDownLatch object, the current thread will be blocked and will not return until one of the following occurs

  • When all threads call the countDown method of the CountDownLatch object, that is, when the counter value is 0
  • If other threads call the interrupt () method of the current thread to interrupt the current thread, the current thread will throw an InterruptedException exception and return
   public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await() method delegates sync to call the acquiresharedinterruptible method of AQS

	// Method of responding to interruption when AQS obtains shared resources
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
         // Response interrupt 
        if (Thread.interrupted())
            throw new InterruptedException();
       // Check whether the current counter is 0. If it is 0, it will be returned directly. Otherwise, it will enter the AQS queue and wait  
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
  • As can be seen from the above code, the feature of this method is that the thread can be interrupted when acquiring resources, and the acquired resources are shared resources.
  • Acquiressharedinterruptible first determines whether the current thread has been interrupted. If so, an exception will be thrown. Otherwise, call the tryAcquireShared method implemented by sync to check whether the current state value (counter value) is 0. If yes, the await() method of the current thread will return directly. Otherwise, call the doacquiressharedinruptible method of AQS to block the current thread.
  • In addition, it can be seen that the arg parameter passed by tryAcquireShared here is not used. The tryAcquireShared method is called only to check whether the current state value is 0, and CAS is not called to reduce the current state value by 1.

boolean await(long timeout, TimeUnit unit)

When the thread calls this method of the CountDownLatch object, the current thread will be blocked and will not return until one of the following occurs

  • When all threads call the countDown method of the CountDownLatch object, that is, when the counter value is 0, true will be returned
  • The set timeout time has expired, and false is returned due to timeout
  • Other threads call the interrupt () method of the current thread to interrupt the current thread. The current thread will throw an InterruptedException exception and return
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

void countDown()

After the thread calls this method, the counter value decreases. If the counter value is 0 after the decrease, all threads blocked by calling the await method will be awakened, otherwise nothing will be done

Let's take a look at how the countDown() method calls the AQS method.

 /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * If the current count equals zero then nothing happens.
     */
    public void countDown() {
    	// Delegate to call releaseShared of AQS
        sync.releaseShared(1);
    }

AQS method

  /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
    	// Call tryrereleaseshared of syn implementation
        if (tryReleaseShared(arg)) {
        	// How AQS releases resources
            doReleaseShared();
            return true;
        }
        return false;
    }

In the above code, releaseShared first calls the tryrereleaseshared method of AQS implemented by sync. The code is as follows

       protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // The loop performs CAS until the current thread successfully bends the CAS and updates the state together with the counter value (state value)
            for (;;) {
                int c = getState();
                // 1 if the status value is 0, it returns directly
                if (c == 0)
                    return false;
               // 2 use CAS to decrease the counter by 1     
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

Above code

  • First, get the current status value (counter value).
  • The code (1) judges that if the current status value is 0, it will directly return false, so the countDown () method will directly return false
  • Otherwise, execute the code (2) Use CAS to decrease the counter value by 1. If CAS fails, the loop will retry. Otherwise, if the current counter value is 0, it will return true, indicating that it is the countdown method called by the last thread. In addition to decreasing the counter value by 1, the thread also needs to wake up the thread blocked by calling the await method of CountDownLatch. Specifically, call the doReleaseShared method of AQS to stimulate Live blocked thread
  • Code (1) here seems redundant, but it is not. Code (1) is added to prevent other threads from calling the countDown method when the counter value is 0. If there is no code (1), the status value may become negative.

long getCount()

Get the value of the current counter, that is, the state value of AQS. This method is generally used during testing

  /**
     * Returns the current count.
     *
     * This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

Internally, the getState method of AQS is called to obtain the value of state (the current value of the counter)

Summary

CountDownLatch is implemented using AQS. Use the AQS state variable state to store the counter value.

First, set the status value (counter value) when initializing CountDownLatch. When multiple threads call the countdown method, it is actually the status value of atomically decreasing AQS.

When the thread calls the await method, the current thread will be put into the AQS blocking queue and wait for the counter to be 0 before returning. Other threads call the countdown method to decrement the counter value by 1. When the counter value becomes 0, the current thread also calls the doReleaseShared method of AQS to activate the thread blocked by calling the await() method.

Added by pinxue on Sun, 02 Jan 2022 10:18:03 +0200