[Java rocker] the difference between CountdownLatch and CyclicBarrier, usage scenarios and specific implementation

CountdownLatch and CyclicBarrier are both tools for thread synchronization, but the specific implementation and use are different. Let's take a look at the different use cases first

CountdownLatch usage scenario

As the name suggests, CountdownLatch can be used as a counter. For example, a thread needs to wait until several other threads have executed a certain time node before it can continue to execute. Let's simulate a scenario. There are ten people in a company. The guard can't rest until ten people come to work. The code is implemented as follows

public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            //Only final variables can be used in lambda
            final int times = i;
            new Thread(() -> {
                try {
                    System.out.println("Child thread" + Thread.currentThread().getName() + "We're on our way");
                    Thread.sleep(1000 * times);
                    System.out.println("Child thread" + Thread.currentThread().getName() + "Here we are");
                    //Call the countDown method of latch to make the counter - 1
                    latch.countDown();
                    System.out.println("Child thread" + Thread.currentThread().getName() + "start-up");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }


        try {
            System.out.println("The guard is waiting for the staff to go to work...");
            //The main thread is blocked waiting for the counter to return to zero
            latch.await();
            System.out.println("The staff are here,The guard went to rest");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

The results after operation are as follows:

Child thread Thread-0 We're on our way
 Child thread Thread-2 We're on our way
 Child thread Thread-0 Here we are
 Child thread Thread-0 start-up
 Child thread Thread-1 We're on our way
 The guard is waiting for the staff to go to work...
Child thread Thread-4 We're on our way
 Child thread Thread-9 We're on our way
 Child thread Thread-5 We're on our way
 Child thread Thread-6 We're on our way
 Child thread Thread-7 We're on our way
 Child thread Thread-8 We're on our way
 Child thread Thread-3 We're on our way
 Child thread Thread-1 Here we are
 Child thread Thread-1 start-up
 Child thread Thread-2 Here we are
 Child thread Thread-2 start-up
 Child thread Thread-3 Here we are
 Child thread Thread-3 start-up
 Child thread Thread-4 Here we are
 Child thread Thread-4 start-up
 Child thread Thread-5 Here we are
 Child thread Thread-5 start-up
 Child thread Thread-6 Here we are
 Child thread Thread-6 start-up
 Child thread Thread-7 Here we are
 Child thread Thread-7 start-up
 Child thread Thread-8 Here we are
 Child thread Thread-8 start-up
 Child thread Thread-9 Here we are
 Child thread Thread-9 start-up
 The staff are here,The guard went to rest

You can see that the child thread does not call latch If countdown is blocked, it will continue to do the work. It just notifies the counter - 1, that is, the scenario described above is completed. Only after all processes have reached a certain node will the blocked process be executed If we want multiple threads to run at the same time, we need to use CyclicBarrier

CyclicBarrier usage scenario

Let's re simulate a new scene. Let's use the running scene that has been said to be bad. Ten athletes prepare for the competition. The referee can't say to start until all athletes are ready. Then all athletes run together. The code is implemented as follows

public static void main(String[] args) {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{
            System.out.println("Everyone is ready. The referee begins");
        });
        for (int i = 0; i < 10; i++) {
            //Only final variables can be used in lambda
            final int times = i;
            new Thread(() -> {
                try {
                    System.out.println("Child thread" + Thread.currentThread().getName() + "Preparing");
                    Thread.sleep(1000 * times);
                    System.out.println("Child thread" + Thread.currentThread().getName() + "be ready");
                    cyclicBarrier.await();
                    System.out.println("Child thread" + Thread.currentThread().getName() + "Started running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

The results are as follows:

Child thread Thread-0 Preparing
 Child thread Thread-2 Preparing
 Child thread Thread-1 Preparing
 Child thread Thread-3 Preparing
 Child thread Thread-4 Preparing
 Child thread Thread-0 be ready
 Child thread Thread-5 Preparing
 Child thread Thread-6 Preparing
 Child thread Thread-7 Preparing
 Child thread Thread-8 Preparing
 Child thread Thread-9 Preparing
 Child thread Thread-1 be ready
 Child thread Thread-2 be ready
 Child thread Thread-3 be ready
 Child thread Thread-4 be ready
 Child thread Thread-5 be ready
 Child thread Thread-6 be ready
 Child thread Thread-7 be ready
 Child thread Thread-8 be ready
 Child thread Thread-9 be ready
 Everyone is ready. The referee begins
 Child thread Thread-9 Started running
 Child thread Thread-0 Started running
 Child thread Thread-2 Started running
 Child thread Thread-1 Started running
 Child thread Thread-7 Started running
 Child thread Thread-6 Started running
 Child thread Thread-5 Started running
 Child thread Thread-4 Started running
 Child thread Thread-3 Started running
 Child thread Thread-8 Started running

We can see that all threads are blocked before other threads are ready, until all threads are ready to continue. When we create the CyclicBarrier object, we introduce a method. When calling the await method of CyclicBarrier, the current thread will be blocked until all the threads call the await method to call the CyclicBarrier. Then let all blocked threads run together

After we have finished the application scenario, let's take a look at the specific implementation of the two tools

CountdownLatch underlying implementation

Let's first look at the construction method of CountdownLatch

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

First, ensure that the count must be greater than zero, and then initialize a Sync object to see what the Sync object is

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

Sync is a static internal class of CountdownLatch. It inherits AbstractQueuedSynchronizer (that is, AQS), provides a tool to implement blocking locks and a series of synchronizer depending on FIFO waiting queue. The abstract class is called setState method in Sync construction method. It can be regarded as initializing a mark to record the number of current counters.

Let's look at the two core methods of CountdownLatch, await and countdown. Let's look at await first

public void await() throws InterruptedException {
        //It can be regarded as blocking threads
        sync.acquireSharedInterruptibly(1);
    }

await calls AQS methods and can be regarded as blocking threads. The specific implementation is expanded in the chapter of analyzing AQS. Let's take a look at the countdown method

public void countDown() {
        sync.releaseShared(1);
    }

Call a method of sync. Let's take a look at the implementation of this method

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

Let's look at the tryrereleaseshared method

protected boolean tryReleaseShared(int releases) {
            for (;;) {
                //Get tag bit
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                //Update tag bit by cas
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

It can be seen that when calling tryrereleaseshared, the tag bit is actually - 1 and returns whether the tag bit is 0. If the tag bit is 0, the called dorreleaseshared can be regarded as releasing the blocked thread, so the whole process is complete

Underlying implementation of CyclicBarrier

The old rule is to look at the construction method first

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

Here, two objects are passed in, and the stored values are simply recorded. Let's directly check the key await methods

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

Let's look at the implementation of dowait

/** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** Omit some codes**/
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            //Judge whether it is interrupted
            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //The counter - 1 is the count assigned in the constructor
            int index = --count;
            if (index == 0) {  // tripped
            //If all threads are executed, that is, when count=0
                boolean ranAction = false;
                try {
                    //Execute the incoming method
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //Wake up all threads
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //If the count does not reach 0, the current thread will be blocked
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

As can be seen from the code, CyclicBarrier uses the condition method of Lock to block and wake up threads, similar to object Wait() and notifyAll() block when count is not 0, and wake up all threads when count=0

summary

1. CountdownLatch is suitable for all threads to notify the method after passing a certain point, while CyclicBarrier is suitable for all threads to execute at the same point at the same time
2. CountdownLatch uses the shared lock inherited from AQS to notify threads, CAS to –, and CyclicBarrier uses the Condition of ReentrantLock to block and notify threads

Keywords: Java Back-end

Added by matt_wood87 on Fri, 31 Dec 2021 01:49:39 +0200