AbstractQueuedSynchronizer (AQS) source code detailed analysis - CountDownLatch source code analysis

AbstractQueuedSynchronizer (AQS) source code detailed analysis - CountDownLatch source code analysis

1. CountDownLatch introduction

CountDownLatch is a simple synchronizer. It means that one or more threads are allowed to wait for the operation of other threads to complete before performing subsequent operations.

Common usage of CountDownLatch and thread Join () is a bit similar, waiting for other threads to complete before executing the main task.

2. Introduction case analysis

Case 1:

  • For students like me, there are few practical development applications of CountDownLatch, and some students have not even touched it. However, under the condition of concurrency, the use of this class is still very common, so first introduce two cases to understand its purpose:
  • With the help of CountDownLatch, the main thread is controlled to wait for the sub thread to complete before execution
/**
 * @author wcc
 * @date 2022/2/15 19:09
 */
public class CountDownLatchTest01 {

  private static final int TASK_COUNT = 8;
  private static final int THREAD_CORE_SIZE = 10;

  public static void main(String[] args) throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(TASK_COUNT);

    Executor executor = Executors.newFixedThreadPool(10);

    for (int i = 0; i < TASK_COUNT; i++) {
      executor.execute(new WorkerRunnable(i, countDownLatch));
    }

    System.out.println("The main thread is waiting for all subtasks to complete...");
    long mainWaitStartTimeMillis = System.currentTimeMillis();
    countDownLatch.await();

    long mainWaitEndTimeMillis = System.currentTimeMillis();
    System.out.println("Main thread waiting time:"+ (mainWaitEndTimeMillis - mainWaitStartTimeMillis));
  }

  static class WorkerRunnable implements Runnable{

    private int taskId;
    private CountDownLatch latch;

    @Override
    public void run() {
      doWorker();
    }

    public void doWorker(){
      System.out.println("task ID: "+ taskId + ",Task in progress...");
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      }catch (Exception e){
        e.printStackTrace();
      }finally {
        latch.countDown();
      }
    }

    public WorkerRunnable(int taskId, CountDownLatch latch) {
      this.taskId = taskId;
      this.latch = latch;
    }
  }
}

The operation results are as follows:

Case 2:

  • The thread executing the task may also be a many to many relationship: in this case, with the help of CountDownLatch, the main thread controls the sub threads to start at the same time, and then the main thread blocks and waits for the sub threads to end!
/**
 * @author wcc
 * @date 2022/2/15 19:09
 */
public class CountDownLatchTest02 {

  public static void main(String[] args) throws InterruptedException {
    CountDownLatch startSignal = new CountDownLatch(1);
    CountDownLatch doneSignal = new CountDownLatch(10);


    for (int i = 0; i < 10; i++) {
      new Thread(new Worker(startSignal, doneSignal, i)).start();
    }

    // Here, let the main thread sleep for 500 milliseconds to ensure that all child threads have been started and blocked at the startSignal fence
    TimeUnit.MILLISECONDS.sleep(500);

    // Because the value of startSignal is 1, the main thread only needs to call it once
    // Then all calls startsignal The child threads blocked by await () can pass through the fence at the same time
    System.out.println("The subtask has started");
    startSignal.countDown();

    System.out.println("Wait for the subtask to end...");
    long startTime = System.currentTimeMillis();
    // Wait for all subtasks to end
    doneSignal.await();

    long endTime = System.currentTimeMillis();
    System.out.println("All subtasks have ended, time consuming:" + (endTime - startTime));
  }

  static class Worker implements Runnable{

    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private int id;

    @Override
    public void run() {

      try {
        // In order to enable all threads to start the task at the same time, we let all threads block here first
        // When everyone is ready, open the threshold
        startSignal.await();
        System.out.println("Subtask-" + id + ",Opening time:" + System.currentTimeMillis());
        doWorker();
      }catch (Exception e){
        e.printStackTrace();
      }finally {
        doneSignal.countDown();
      }
    }

    public void doWorker() throws InterruptedException{
      TimeUnit.SECONDS.sleep(5);

    }

    public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, int id) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
      this.id = id;
    }
  }
}

Execution result:

Startsignal in the above code await(); It is equivalent to a fence, which resists all sub threads in their run method and waits for the main thread to execute startsignal countDown(); That is, after closing the fence, all child threads continue to execute their own run() method at the same time, as shown in the following figure:

Case 3:

/**
 * @author wcc
 * @date 2022/2/16 14:14
 */
public class CountDownLatchTest03 {

  public static void main(String[] args) {
    CountDownLatch latch = new CountDownLatch(2);

    Thread t1 = new Thread(() -> {
      try {
        Thread.sleep(5000);
      }catch (Exception e){
      }
      // After resting for 5 seconds (simulating that the worker thread works for 5 seconds), call countDown()
      latch.countDown();
    }, "t1");

    Thread t2 = new Thread(() -> {
      try {
        Thread.sleep(10000);
      }catch (Exception e){
      }
      // After 10 seconds of rest (10 seconds of simulated working thread), call countDown()
      latch.countDown();
    }, "t2");

    t1.start();
    t2.start();

    Thread t3 = new Thread(() -> {
      try {
        // Block, wait for state to decrease to 0
        latch.await();
        System.out.println("thread  t3 from await Returned in");
      }catch (Exception e){
        System.out.println("thread  t3  await Interrupted");
        Thread.currentThread().interrupt();
      }
    }, "t3");

    Thread t4 = new Thread(() -> {
      try {
        // Block, wait for state to decrease to 0
        latch.await();
        System.out.println("thread  t4 from await Returned in");
      }catch (Exception e){
        System.out.println("thread  t4  await Interrupted");
        Thread.currentThread().interrupt();
      }
    }, "t4");

    t3.start();
    t4.start();
  }
}

The results are as follows:

3. Source code analysis

3.1. Sync internal class

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    //Incoming initial count times
    Sync(int count) {
        // Call the setState() method to set the value of state in AQS
        setState(count);
    }

    //Get the remaining count times
    int getCount() {
        return getState();
    }

  //Attempt to acquire shared lock
  protected int tryAcquireShared(int acquires) {
    // Note that when state equals 0, 1 is returned
    // When state is not equal to 0, it returns - 1, that is, when state is not equal to 0, it always has to queue
    return (getState() == 0) ? 1 : -1;
  }
    
  /**
   * Attempt to release shared lock
   * Update AQS The value of state. The state value decreases by 1 every time it is called. When state - 1 is exactly 0, it returns true
   */
  protected boolean tryReleaseShared(int releases) {
    // Spin operation to ensure AQS The value of state was updated successfully
    for (;;) {
      // Gets the value of the current state
      int c = getState();
      // Condition holds: it indicates that a thread has triggered the wake-up operation (the shared lock has been released and cannot be released again). false is returned here
      if (c == 0)
        return false;
      // Execute here, and state > 0
      // If the value of c is > 0, the value of c is - 1
      int nextc = c-1;
      // Atomic update state CAS succeeded: it means that no other thread has modified the state value before the current thread executes tryrereleaseshared method c-1
      //Atomic update state value:
      if (compareAndSetState(c, nextc))
        // nextc == 0:true: indicates that the thread currently calling the countDown() method is the thread that needs to trigger the wake-up operation, which will return true for wake-up operation.
        return nextc == 0;
    }
}

The Sync internal class rewrites the tryrereleaseshared (int releases) and tryacquireshared (int requires) methods and saves the count in the state variable. Note here that the parameters of the above two methods are not used.

3.2 construction method

// The constructor needs to pass in a count, that is, the initial number of times
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");

    this.sync = new Sync(count);
}

3.3 await() method

await() method is a method of waiting for other threads to complete. It will first try to obtain the shared lock. If it fails, it will enter the blocking queue of AQS and wait to be awakened.

According to the above Sync source code, we know that when the state is not equal to 0, tryAcquireShared() returns - 1, that is, when the count is not reduced to 0, all threads calling the await() method have to queue.

public void await() throws InterruptedException {
    // Call the acquiresharedinterruptible() method of AQS
    sync.acquireSharedInterruptibly(1);
}

Acquiresharedinterruptible method in AQS:

// Located in AQS: the method of obtaining shared lock in response to interrupt
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  // Condition set up: indicates that the thread currently calling the await method is already interrupted, and the exception is thrown directly.
  if (Thread.interrupted())
   throw new InterruptedException();
  // Condition holds: it indicates that the state of the current AQS is greater than 0. At this time, the thread will join the queue and then go to the wake-up operation
  // Condition not tenable: AQS State = = 0, the thread will not be blocked at this time
  // The thread executing the task at the corresponding business level has broken the latch, and then others call the latch The await thread will not be blocked here
  if (tryAcquireShared(arg) < 0)
   doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
      throws InterruptedException {
    // Latch. Will be called The thread of await () method is wrapped as a node and added to the blocking queue of AQS
    final Node node = addWaiter(Node.SHARED);
    // false: indicates that the current thread has not been interrupted, no interrupt exception has been thrown, and no response is required to interrupt the logic of leaving the queue
    // true: indicates that the current thread is interrupted and an interrupt exception is thrown. The logic of canceling the competition of the specified node is required
    boolean failed = true;
    try {
      // Spin operation
      for (;;) {
        // Gets the precursor node of the current node
        final Node p = node.predecessor();
        // Condition holds: indicates that the node corresponding to the current thread is head Next node
        if (p == head) {
          // head. The next node has the right to obtain the shared lock
          int r = tryAcquireShared(arg);
          if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
          }
        }
        // shouldParkAfterFailedAcquire(): it will find a good father for the current thread, and finally set the state of the father node to - 1(SIGNAL). Finally, this method returns true
        if (shouldParkAfterFailedAcquire(p, node) &&
            // parkAndCheckInterrupt(): suspends the current thread and returns the interrupt flag of the current thread
            parkAndCheckInterrupt())
          throw new InterruptedException();
      }
    } finally {
      // If the condition is true, it means that the current thread has been interrupted and needs to be cancelled. Specify the current node thread to participate in the competition
      if (failed)
        cancelAcquire(node);
    }
  }

/**
   * AQS The setHeadAndPropagate method of sets the current node as the head node and propagates backward (wake up in turn)
   * @param node
   * @param propagate 1: state==0 for the current shared lock, - 1: state for the current shared lock= 0
   */
  private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // Set the current node as the new head node, and set thread and prev to null
    setHead(node);

    // When setHeadAndPropagate is called, propagate == 1 must be true
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
      // Gets the successor node of the current node
      Node s = node.next;
      // Condition 1: when is s== null true: the current node node is already tail. At this time, the condition will be true. Calling doReleaseShared will handle this situation
      // Condition 2: precondition: s= Null requires the mode of the successor node s to be SHARED
      if (s == null || s.isShared())
        // Basically, the doReleaseShared method will be executed in all cases
        doReleaseShared();
    }
  }
}

Graphic analysis:

3.4. countDown() method

The countDown() method will release the shared lock, that is, the number of counts will be reduced by 1

According to the above Sync source code, we know that tryrereleaseshared() will reduce the number of count s by 1 each time, and return true when it is reduced to 0. At this time, the waiting thread will wake up.

Note that doReleaseShared() is the thread that wakes up and waits. This method has been analyzed in the previous chapter.

public void countDown() {
        sync.releaseShared(1);
    }

    // Method of releasing shared lock
    public final boolean releaseShared(int arg) {
      // Condition set up: description of the current call latch. The thread of the countdown() method happens to be the thread with state - 1 == 0. It needs to trigger the thread that wakes up the await state
      if (tryReleaseShared(arg)) {
        // Only one thread of the thread calling the countDown() method will enter the if block and call donreleaseshared() to wake up the logic of the thread in the blocking state
        doReleaseShared();
        return true;
      }
      return false;
    }

  /**
   * Which paths will call the doReleaseShared method?
   * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() Wake up the head in the current blocking queue Thread corresponding to next
   * 2.Awakened thread - > doacquiressharedinterruptible() - > setheadandpropagate() - > doreleaseshared())
   */
  // doReleaseShared() method of AQS
  private void doReleaseShared() {
    for (;;) {
      // Get the header node in the current AQS
      Node h = head;
      // Condition 1: H= Null: indicates that the blocking queue is not empty
      // Not true: h == null, when will this be the case?
      // After the latch is created, no thread has called the await() method. Before that, a thread calls the latch Countdown() operation and triggered the logic of waking up the blocking node
      // Condition 2: H= Tail. Currently, there are other nodes in the blocking queue besides the head node
      // H = = tail - > head and tail point to the same node object. When will this happen?
      // 1. Under normal wake-up conditions, the shared lock is obtained in turn. When the current thread executes here (the thread is the tail node)
      // 2. The first thread calling await() method is concurrent with the thread calling countDown and triggering the wake-up blocking node
      //  Because the await() thread is the first to call latch At this time, there is nothing in the queue. It needs to create a head node and spin into the queue again
      //  Before the await() thread joins the queue, it is assumed that there is only the newly created empty element head in the current queue
      //  At the same time, there is an external thread calling countDown(), which changes the value of state from 1 to 0. This thread needs to wake up the logic of blocking the elements in the queue
      // Note: the thread calling the await() method will enter the spin because it returns to the upper layer method doacquiresharedinterruptible after the queue is completely completed
      // Spin will get the precursor of the current element and judge that you are head Next, all next changed threads will set themselves to head, and then do not interrupt the current thread
      if (h != null && h != tail) {
        // Execute to this if, indicating that the current head must have a successor node

        // Get the waiting state of the header node head
        int ws = h.waitStatus;
        // If the status of the current head node is signal, does it mean that the current successor node has not been awakened
        if (ws == Node.SIGNAL) {
          // Before waking up the successor node, change the status of the current head node to 0
          // Here, why use CAS operation?
          // Here, it is because when the current node wakes up the successor node, the successor node updates itself as the head node, resulting in that the current node cannot exit the spin, and then will participate in the logic of waking up the successor node of its successor node again
          // Therefore, there is concurrency at this time, and CAS logic should be used
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
            continue;            // loop to recheck cases
          //Wake up the successor node
          unparkSuccessor(h);
        }

        //This indicates that the waiting state of the current header node is not SIGNAL
        else if (ws == 0 &&
            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;                // loop on failed CAS
      }
      // Conditions are satisfied:
      // 1. It indicates that the successor node just woke up has not implemented the logic of setting the current wake-up node as head in the setHeadAndPropagate method
      // At this time, the current thread directly jumps out and ends
      // At this point, don't worry about the wake-up logic breaking here?
      // There is no need to worry, because the awakened thread will execute into the doReleaseShared method sooner or later

      // 2.head == null
      //  After the latch is created, no thread has called the await() method. Before that, a thread calls the latch Countdown() operation and triggered the logic of waking up the blocking node
      // 3.head == tail the first thread calling await() method is concurrent with the thread calling countDown and triggering the wake-up blocking node. head and tail point to the same object

      // Conditions not satisfied:
      // The awakened node is very active, and then it is directly awakened in the upper layer method doacquiresharedinterruptible, and directly set itself as a new head node
      // At this time, wake up its node (precursor node) to execute h == head, resulting in the condition not being tenable
      // At this time, the precursor of the head node will not jump out of doReleaseShared, and will continue to participate in the subsequent node logic of waking up the new head node
      if (h == head)                   // loop if head changed
        break;
    }
  }

  /**
   * Attempt to release shared lock
   * Update AQS The value of state. The state value decreases by 1 every time it is called. When state - 1 is exactly 0, it returns true
   */
  protected boolean tryReleaseShared(int releases) {
    // Spin operation to ensure AQS The value of state was updated successfully
    for (;;) {
      // Gets the value of the current state
      int c = getState();
      // Condition holds: it indicates that a thread has triggered the wake-up operation (the shared lock has been released and cannot be released again). false is returned here
      if (c == 0)
        return false;
      // Execute here, and state > 0
      // If the value of c is > 0, the value of c is - 1
      int nextc = c-1;
      // Atomic update state CAS succeeded: it means that no other thread has modified the state value before the current thread executes tryrereleaseshared method c-1
      //Atomic update state value:
      if (compareAndSetState(c, nextc))
        // nextc == 0:true: indicates that the thread currently calling the countDown() method is the thread that needs to trigger the wake-up operation, which will return true for wake-up operation.
        return nextc == 0;
    }
  }

CountDowmnLatch.countDown() execution process diagram:


Summary:

  • CountDownLatch indicates that one or more threads are allowed to wait for the operation of other threads to complete before performing subsequent operations
  • CountDownLatch is implemented using AQS's shared lock mechanism
  • Count (number of lock layers of shared lock) is required to be passed in during CountDownLatch initialization
  • Every time the countDown() method is called, the number of counts is reduced by 1
  • Every time you call the await() method, you will try to obtain the lock. In this case, you need to check whether the state value in AQS is 0
  • When the count value (that is, the value of state) is reduced to 0, it will wake up the threads in the blocking queue in AQS, and these threads call the await() method to join the queue

Keywords: Java CountDownLatch

Added by tulleh on Thu, 17 Feb 2022 05:10:11 +0200