JDK source code series: thread concurrency coordination artifact CountDownLatch and CyclicBarrier

introduction

I have always believed that the program is a logical description of the real world, and in the real world, many things need the coordination and cooperation of all parties to complete. For example, it is impossible to complete the delivery of a platform by only one person, but it needs the cooperation of different roles such as R & D, testing, products and project managers to complete the final delivery. So how to describe this coordination relationship in the world of program? Today, let's talk about the code description of how the great God of Java Doug Lea realizes task coordination through CountDownLatch and CyclicBarrier in the concurrent package.

CountDownLatch

I believe we all know that an important feature of good code is that the naming of classes and variables in the code can be as the name suggests, that is, you can roughly know what kind of business semantics this class or variable expresses when you see the naming. Take CountDownLatch for example. Its name vividly represents its capability attribute. Count represents count, Down represents the decrement operation of the counter, and Latch represents the result action after the counter decrements. The literal meaning of the combination of CountDownLatch is to open the door bolt after the counter is decremented. Through the description of the following content, you will certainly feel that the name is very vivid.

Well, through the name of its class, we guess that its function is to control threads through the decrement operation of the counter. Let's see if the official description means that.

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *...
*/

The general meaning of the above note is that CountDownLatch is a thread synchronizer, which allows one or more threads to block and wait until the business execution in other threads is completed. CountDownLatch can be initialized through a counter, which can block the waiting thread until the corresponding counter is set to 0. When the counter is set to 0, the blocked thread is released. In addition, it is a one-time synchronizer, and the counter cannot be reset.

Through the official description of JDK, we can clarify the three core features of CountDownLatch:

1. It is a thread synchronizer, which is used to coordinate the execution trigger timing of threads;

2. It is essentially a counter and a command gun for controlling threads;

3. It is disposable and becomes invalid after use.

After knowing what CountDownLatch is, let's take a look at its use scenario and under what circumstances we can use it to help us solve some problems in the code.

Usage scenario

As described above, countdown latch is like the starting gun fired by the referee in the track and field field field. When all the contestants are ready, the starting gun rings and all the athletes move. Then in the Java multithreading scenario, CountDownLatch is the thread coordinator, and its counter is not reduced to 0. Suppose there is such a business scenario. In a monitoring alarm platform, you need to query the alarm information from the alarm service and the work order information from the work order service, and then analyze which alarms are not transferred to the work order. According to the practice of the old system, see the following simplified pseudo code:

List<Alarm> alarmList = alarmService.getAlarm();
List<WorkOrder> workOrderList = workOrderService.getWorkOrder();
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);

Can you see what needs to be optimized in this pseudo code? Let's analyze it together. This code may have little impact when the amount of data is small, but once the amount of alarm and work order data is large, the problem of slow data query may occur when obtaining alarm information or work order information, which will lead to the performance bottleneck of this analysis task. So how should we optimize it? From the business and code, we can see that there is no coupling between acquiring alarm information and acquiring work order information. In the above code, they are executed in sequence. Therefore, to optimize the performance, we can consider executing them in parallel.

The modified and optimized pseudo code is as follows:

Executor executor = Executors.newFixedThreadPool(2);
executor.execute(()-> { alarmList = alarmService.getAlarm(); });
executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); });

List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);

By using thread pool, we can execute simultaneously when obtaining alarm information and work order information, instead of obtaining alarm information and then obtaining work order information after the previous execution, which is more efficient. However, there are still problems with this implementation method. Because the operation is executed in the online thread and the actual execution result is not known, it is difficult to judge the specific timing of data analysis. At this time, CountDownLatch comes in handy. It can be used to wait for thread picking. After the conditions are met, the subsequent logic can be released and executed. This is like the company organizing the group construction. It is agreed to gather at the company gate at 8:30 in the morning. Then the driver will not start until all the participants in the group construction arrive at the same time.

The pseudo code after using CountDownLatch is as follows:

Executor executor = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
executor.execute(()-> { alarmList = alarmService.getAlarm();
                      latch.countDown();
                      });
executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); 
                      latch.countDown();
                      });
latch.await();
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);

Underlying implementation principle

initialization

Before using CountDownLatch, we have to initialize first. In the process of initialization, we actually do two things: one is to create a synchronization queue of AQS, and the other is to set the state in AQS to count, which is the core variable of AQS (AQS is the underlying implementation basis of concurrent packages, and we will analyze it in the next article).

From the code, we can see that the internal class instance of Sync is actually created, and Sync inherits the AQS, rewrites the method of AQS locking and unlocking, and calls the AQS method through the Sync object to block the operation of the thread. The code of the Sync internal class is as follows. The tryAcquireShared method overrides the AQS template method and is mainly used to obtain the shared lock. In CountDownLatch, it mainly determines whether the lock is obtained by judging whether the value of the obtained state is 0. If the obtained state is 0, it means that the lock is obtained successfully, and the thread will not block at this time. On the contrary, the lock acquisition fails, and the thread will block.

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
		//Try to add a shared lock (judged by state)
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
		//Attempt to release the shared lock (judged by state)
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

Counter decrement

As described in the above scenario, each thread executes the countDown operation after completing its own business, indicating that the thread is ready to complete. Also check whether the count value is 0. If it is 0, you need to wake up all waiting threads. As shown in the following code, it actually calls the releaseShared method of the parent AQS.

public void countDown() {
        sync.releaseShared(1);
    }

The tryrereleaseshared method actually attempts to release the lock. If the count decreases to 0 this time, then release all threads.

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

See the following figure for the general code execution logic:

Blocking thread

The function of await is to block the current thread until the count value is reduced to 0. It actually calls the tryacquisuresharednanos method of the inner class, which is actually a method in AQS, the parent class of Sync class.

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

AQS provides a way to obtain a fair lock in response to an interrupt. tryAcquireShared has been described above. The function of this method is to try to obtain the shared lock. If the acquisition fails, the thread will be added to the synchronization queue of AQS to wait, that is, the so-called thread blocking.

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

CyclicBarrier

We should first understand it from the literal meaning of CyclicBarrier. Cyclic means circular, while Barrier means fence and Barrier. The literal meaning is recyclable fence. It's still the old routine. Before the CyclicBarrier, let's take a look at the description of JDK.

/**
 * A synchronization aid that allows a set of threads to all wait for
 * each other to reach a common barrier point.  CyclicBarriers are
 * useful in programs involving a fixed sized party of threads that
 * must occasionally wait for each other. The barrier is called
 * <em>cyclic</em> because it can be re-used after the waiting threads
 * are released.
 *
 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
 * that is run once per barrier point, after the last thread in the party
 * arrives, but before any threads are released.
 * This <em>barrier action</em> is useful
 * for updating shared-state before any of the parties continue.
 *...
 **/

Through the description of JDK, we can see that CyclicBarrier is also a thread synchronization coordinator to coordinate the execution of a group of processes. When the specified number of threads reach the fence, the fence can be released to end the thread blocking state. So it seems that it has almost the same function as CountDownLatch. In fact, it is different. CyclicBarrier is recyclable, while CountDownLatch is disposable. Let's take a look at the core attributes of CyclicBarrier.

//Lock at fence entrance
private final ReentrantLock lock = new ReentrantLock();
//Thread wait condition
private final Condition trip = lock.newCondition();
//Number of threads intercepted
private final int parties;
//Tasks to be performed before the arrival of the next fence algebra
private final Runnable barrierCommand;
//Current fence algebra
private Generation generation = new Generation();

The source code implementation of CyclicBarrier is similar to that of CountDownLatch. CountDownLatch uses AQS based sharing mode, while CyclicBarrier is implemented based on Condition.

The CyclicBarrier internally maintains the parties and count variables. Parties indicates the number of threads that need to be intercepted each time they participate in a Generation, and count is an internal counter. During initialization, the count is equal to the parties. Each time the await method is called, the counter count will be reduced by 1, which is similar to countDown above.

Usage scenario

Let's take the business scenario in the above article as an example. In the above, we realized the thread coordination problem of querying alarm information and work order information through CountDownLatch, but new problems appeared again. Because the alarm information and work order information are generated in real time, and the implementation method of CountDownLatch can only complete thread coordination once. If the subsequent alarm information and work order information still need to be queried and then analyzed, it will help. In other words, if we need to wait for each other between threads to complete and then perform subsequent business operations, we need to use CyclicBarrier to realize our requirements at this time.

Underlying implementation principle

initialization

There are two constructors for CyclicBarrier. One is to specify the number of threads to be coordinated each time when building CyclicBarrier and the execution of subsequent tasks after unblocking. The other is to set the number of threads to be coordinated without setting subsequent tasks.

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

 public CyclicBarrier(int parties) {
        this(parties, null);
    }

Blocking waiting

For CyclicBarrier, the core implementation of the wait method is the dowait method. The specific code is as follows:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            //If the count is calculated as 0, all threads need to be awakened and enter the next stage of thread coordination
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //Counter is not 0, continue the cycle
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

We can see that in the dowait method, we have carried out count decrement operation to check whether the value of count is 0. If we define the tasks to be performed in initialization, we will perform task execution when count is 0. After completing the task execution, we call nextGeneration for the next thread coordination cycle, while waking up all threads and resetting the counters.

summary

This paper introduces the thread synchronization and coordination artifact CountDownLatch and CyclicBarrier from the perspective of use scenario and underlying implementation. Although they can coordinate threads, they are different in fact. CountDownLatch is more suitable for the synchronous coordination scenario between one thread and multiple other threads, while CyclicBarrier is suitable for the mutual waiting between a group of threads. In addition, CountDownLatch is a disposable product, while the counter of CyclicBarrier can be reused and can be reset automatically.

Keywords: Java Back-end

Added by nick2005 on Mon, 21 Feb 2022 02:15:23 +0200