Concurrency and multithreading -- source code analysis of CountDownLatch, Semaphore and CyclicBarrier

sketch:

CountDownLatch, Semaphore and CyclicBarrier are common and easy-to-use tool classes for concurrent programming. Developers do not need to use the underlying api, such as join(), which can be replaced by CountDownLatch out of the box to reduce the possibility of errors in using the underlying api, and the function is more enhanced. CountDownLatch and Semaphore directly implement AQS and realize the function, CyclicBarrier is implemented through ReentrantLock, and ReentrantLock is also implemented through AQS, so in the final analysis, these three tool classes are implemented by AQS. If you don't know about AQS, please refer to the following connection. The AQS implementation will not be introduced here
AbstractQueuedSynchronizer source code (Part 1) – exclusive lock
AbstractQueuedSynchronizer source code (Part 2) – shared lock and Condition queue
Analysis of ReentrantLock source code

CountDownLatch

CountDownLatch is generally referred to as blocking or counter. It is implemented internally through counter and internally through AQS. The code is very simple. There are two main application scenarios:
1. The idea of join() is to let one or more threads wait for other threads to complete operations before continuing execution.
2. Stop the execution of multiple threads at the lock position (await() code position) and release them uniformly through countdown().

schematic diagram:

Write a demo first, and then check the source implementation.

demo:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch1 = new CountDownLatch(5);
    CountDownLatch latch2 = new CountDownLatch(1);
    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"Start successful");
            try {
                //The second scenario mentioned above is to ensure that multiple threads execute subsequent code at the same time
                latch2.await();
                System.out.println(Thread.currentThread().getName()+"Execute code logic");
                latch1.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    }
    System.out.println("Main Start execution");
    Thread.sleep(1000);
    latch2.countDown();
    //The main thread waits for other threads to finish executing. The first usage scenario is
    latch1.await();
    System.out.println("Main End execution");
}
Thread-0 Start successful
Thread-2 Start successful
Thread-1 Start successful
Thread-3 Start successful
Main Start execution
Thread-4 Start successful
Thread-0 Execute code logic
Thread-2 Execute code logic
Thread-1 Execute code logic
Thread-3 Execute code logic
Thread-4 Execute code logic
Main End execution

The above demo is two simple applicable scenarios for CountDownLatch. Let's take a look at the important implementation methods.

await():

//① Sync is a static internal class that implements AQS
public CountDownLatch(int count) {
	if (count < 0) throw new IllegalArgumentException("count < 0");
	this.sync = new Sync(count);
}
//②
Sync(int count) {
	setState(count);
}
//③
public void await() throws InterruptedException {
	sync.acquireSharedInterruptibly(1);
}
//④
public final void acquireSharedInterruptibly(int arg)
		throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	//⑤
	if (tryAcquireShared(arg) < 0)
	//⑦
		doAcquireSharedInterruptibly(arg);
}
//⑥
protected int tryAcquireShared(int acquires) {
	return (getState() == 0) ? 1 : -1;
}

The above is the implementation method of await() method. Let's look at it step by step:

①. Set the value of counter count through the constructor
②. Assign count to the state variable in AQS
③. Call await() to obtain the interruptible shared lock
④. Interrupt accordingly and try to obtain the shared lock (judge whether the current state is 0). If it fails, directly call doacquiressharedinterruptible() in AQS. The basic steps are: add the shared node generated by the current thread to the end of the queue, and then judge whether the current node is the back drive node of the head. true, try to obtain the shared lock, and successfully set itself to head, And wake up the subsequent nodes, false, set the state of the precursor node corresponding to the node to signal, and then block itself until it is awakened.

Therefore, the overall idea is: set the value > 0 to state, and await() can make the thread join the queue blocking to realize the await() function.

countDown():

//java
public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    //Attempt to release shared lock
    if (tryReleaseShared(arg)) {
        //Call the method in AQS to wake up the qualified nodes with the status of signal from the head node
        doReleaseShared();
        return true;
    }
    return false;
}
//The code is very simple, so I won't describe it
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Here, we have learned about the two key methods of locking. It is relatively simple to implement through AQS. Through a simple demo, we have introduced the applicable scenario of CountDownLatch and have a basic understanding of it.

Semaphore:

Semaphore, that is, semaphore, is generally used to control the number of concurrent accesses to current resources. Just like the Spring Festival transportation train security inspection, the security personnel of each channel only put in a few people at a time, and semaphore is the same. According to the set permissions, set the number of threads allowed to pass each time, set several permissions to acquire each time, and release several permissions.

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    CountDownLatch countDownLatch = new CountDownLatch(200);
    Semaphore semaphore = new Semaphore(3);
    for (int i = 0; i < 200; i++) {
        executorService.execute(() -> {
            try {
                semaphore.acquire();
                log.info("Thread:{}do something",Thread.currentThread().getName());
                Thread.sleep(1000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        });
    }
    countDownLatch.await();
    executorService.shutdown();
    log.info("finish");
}

During the running of the code above, you can see that the log is printed every three times, the code setting has three permissions each time, and a thread acquires one permission each time. Therefore, three threads can obtain permissions, sleep for 1s, and then release permissions. Therefore, the log finally prints three lines of code each time.

Class definition:

//Sync implements AQS for static inner classes
private final Sync sync;
//Default unfair lock
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//fair lock
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

FairSync and NonFairSync

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
            	//Judge whether there is a precursor node. If so, return - 1
                if (hasQueuedPredecessors())
                    return -1;
                //Get the current state variable value
                int available = getState();
                
                int remaining = available - acquires;
                //If the balance of the currently held lock is insufficient, or CAS is successfully set, return directly. Remaining > 0 indicates that you can acquire, otherwise it will be blocked
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

FairSync and NonFairSync are two subclasses of Sync, corresponding to the versions of fair lock and unfair lock. The constructors of the two classes are about the difference between setting permissions to the State variable of AQS, and then tryacquisseshared. Fair lock acquisition will judge whether there is a precursor node in the synchronization queue. If so, it returns - 1 in accordance with the characteristics of FIFO, and the rest of the codes are consistent with the unfair lock.

acquire():

	//Get 1 permission at a time
	public void acquire() throws InterruptedException {
		sync.acquireSharedInterruptibly(1);
	}
	//Get the preset personal permissions each time
	public void acquire(int permits) throws InterruptedException {
		if (permits < 0) throw new IllegalArgumentException();
		sync.acquireSharedInterruptibly(permits);
	}
	public final void acquireSharedInterruptibly(int arg)
			throws InterruptedException {
			//Response interrupt
		if (Thread.interrupted())
			throw new InterruptedException();
		//Try to obtain a lock, which corresponds to FairSync and NonFairSync versions. The default is a non fair lock. You can select a fair lock through the constructor settings
		if (tryAcquireShared(arg) < 0)
			//Add the current thread to the tail of the synchronization queue, spin to try to obtain the lock (when the current drive node is head, you can try to obtain the lock), fail to block, wait to be awakened, and try to obtain the lock after being awakened.
			doAcquireSharedInterruptibly(arg);
	}

Release

	//release() and its overloaded method release(int permits)
	public void release() {
		sync.releaseShared(1);
	}
	public void release(int permits) {
		if (permits < 0) throw new IllegalArgumentException();
		sync.releaseShared(permits);
	}
	//Release arg permits
	public final boolean releaseShared(int arg) {
		//An attempt is made to release the shared lock. If it fails, false is returned. If it succeeds, doReleaseShared() releases the lock and its subsequent nodes
		if (tryReleaseShared(arg)) {
			doReleaseShared();
			return true;
		}
		return false;
	}
	//Attempt to acquire lock
	protected final boolean tryReleaseShared(int releases) {
		for (;;) {
			int current = getState();
			int next = current + releases;
			//If the integer maximum is exceeded, an exception is thrown
			if (next < current) // overflow
				throw new Error("Maximum permit count exceeded");
			//If CAS successfully sets the current state to next, it returns true
			if (compareAndSetState(current, next))
				return true;
		}
	}
	//After attempting to acquire the lock, release the lock and subsequent nodes
	private void doReleaseShared() {
	//spin
		for (;;) {
			Node h = head;
			//The current queue has at least two nodes
			if (h != null && h != tail) {
				int ws = h.waitStatus;
				//If the waitStatus of the head is signal
				if (ws == Node.SIGNAL) {
					//Skip first time
					if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
						continue;            // loop to recheck cases
					//Wake up the following nodes
					unparkSuccessor(h);
				}
				//If ws is in initial state, CAS failed skip
				else if (ws == 0 &&
						 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
					continue;                // loop on failed CAS
			}
			//In the end, h and head are the same and break directly
			//The head may change because AQS will call this method both to acquire and release locks
			if (h == head)                   // loop if head changed
				break;
		}
	}

Other methods:

	//Query the current number of licenses
	public int availablePermits() {
		return sync.getPermits();
	}
	
	//Get all permissions and return
	public int drainPermits() {
		return sync.drainPermits();
	}
	final int drainPermits() {
    	for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }

This principle introduces the basic use and source code implementation of Semaphore. Like CountDownLatch, it implements specific functions through AQS, but the use methods are different, and the functions obtained are also different. Now let's compare them, so we can have a more intuitive understanding.

Comparison between CountDownLatch and Semaphore:

1.CountDownLatch: use AQS shared lock
Set state: the constructor sets the counter
await(): by judging that the current state==0, if true is released, and if false blocks the executing thread, which is equivalent to a fence.
countDown(): say state-1 each time until it is reduced to 0, and release the threads blocked in the synchronization queue.

2.Semaphore: using AQS shared locks, you can select fair locks and non fair locks
Set state: constructor
Acquire (int permissions): set state - permissions > 0, if false, unable to obtain permission, and block to the synchronization queue. if true, you can obtain permission.
PS: acquire determines whether the lock is fair, and release does not
Release (int permissions): assign state + permissions to state. Permissions must be a positive integer, and then release the threads blocked in the synchronization queue.

For example, chestnuts:

Countdown latch: it's like a supermarket sale. Many people wait in line to open the door in the morning. When it's time, they open the door at 8:00 (state=0), and then they crash and let them all go.
Semaphore: just like the railway station security check, the staff control one or several people to pass the security check every time, one or several people pass the security check, and then release one or several people to go to the security check.

CyclicBarrier

We have learned about the basic use of CountDownLatch, but one disadvantage is that it cannot be recycled. When countDown() reduces the counter to 0, all blocked threads will be released, and then they will disappear. The CyclicBarrier also acts as a fence, but it can be used in a circular way. The function of CyclicBarrier is to let threads wait for each other until the internal counter decreases to 0 and release all blocked threads.

For example, chestnuts:

public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("Execute first Runnable command");
        });

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "Before reaching the fence");
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "Through the fence");
            }).start();
        }
    }
Thread-0 Before reaching the fence
Thread-2 Before reaching the fence
Thread-1 Before reaching the fence
 Execute first Runnable command
Thread-0 Through the fence
Thread-2 Through the fence
Thread-1 Through the fence

Set the counter of CyclicBarrier to 3. When all three threads execute await(), reduce the counter to 0 and execution will continue. If the Runnable instruction is set, the Runnable instruction will be executed first when passing through the fence.

Related properties:

public class CyclicBarrier {

    //Generation is the concept of age within CyclicBarrier
    private static class Generation {
        boolean broken = false;
    }
    //Reentrant lock
    private final ReentrantLock lock = new ReentrantLock();
    
    //condition instance
    private final Condition trip = lock.newCondition();
    
    //Conditions for permit
    private final int parties;
    
    //Runnable instruction
    private final Runnable barrierCommand;
    
    //
    private Generation generation = new Generation();

    //Counter, initially equal to parties
    private int count;
	
	//Set the parties, Runnable command
	public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    //Set the parties, that is, the initial value of the counter
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
}

Because the CyclicBarrier can be reused, it can pass through the fence if it meets the conditions. When it passes, it is a new Generation. We know that when passing through the fence, Runnable's Run() will be executed first, which is the meaning of the existence of barrierCommand.

await():

    public int await() throws InterruptedException, BrokenBarrierException {
    	try {
    		return dowait(false, 0L);
    	} catch (TimeoutException toe) {
    		throw new Error(toe); // cannot happen
    	}
    }
    public int await(long timeout, TimeUnit unit)
    	throws InterruptedException,
    		   BrokenBarrierException,
    		   TimeoutException {
    	return dowait(true, unit.toNanos(timeout));
    }

await() is the main method of CyclicBarrier. The thread calls the counter -- count every time until it is equal to 0, and then through condition Notifyall() wakes up all threads. Of course, Generation is the next new era. Let's look at the implementation of dowait().

dowait():

	private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        //Get lock and lock it
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //Get current generation
            final Generation g = generation;

            //Whether the year is broken. The default is false. if true, a BrokenBarrierException is thrown
            if (g.broken)
                throw new BrokenBarrierException();
            //Whether it is interrupted. If interrupted, break the fence (set generation.broken to true, reset the counter, execute signalAll()), and respond to the interrupt
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //count minus 1 each time
            int index = --count;
            //If the counter value count is 0, or the current generation, a thread finally reaches the fence            
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //Give priority to the Runnable instruction
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //Start the next generation, execute signalAll(), wake up the thread waiting before, reset the counter, and reset the initialization
                    nextGeneration();
                    return 0;
                } finally {
                    //If it fails, set the broker of generation to true, reset the counter, and generation resets initialization.
                    if (!ranAction)
                        breakBarrier();
                }
            }

            for (;;) {
                try {
                
//If timeout is not set, call await() to block the condition queue until it is awakened by signal()/signalAll(), and join the synchronization queue to obtain the lock
                    if (!timed)
                        trip.await();
                    //If timeout is set, awaitNanos is called
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //The thread must be the current generation and the broken is false
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        //If the thread of the current generation is not interrupted, the jvm considers that the task has been completed and directly interrupts the thread.
                        Thread.currentThread().interrupt();
                    }
                }
                //If there is any thread breakBarrier, the awakened thread must also throw an exception.
                if (g.broken)
                    throw new BrokenBarrierException();
                //Judge whether it is the current band
                if (g != generation)
                    return index;
                
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

There are two most important concepts for realizing functions, counter count and age generation. Why do you need generation?

Because the same thread can use multiple cyclicbarriers, it cannot be distinguished without generation. Therefore, judge whether the age has changed through generation to ensure the correct use of the fence.

If break is set to true, other threads will also judge the break when they wake up, resulting in the unavailability of the CyclicBarrier.

Differences between CountDownLatch and CyclicBarrier:

1.CountDownLatch can only be used once, and the await() of CyclicBarrier can be recycled or reset with reset().

2.CyclicBarrier means that multiple threads wait for each other until the conditions are met to open the fence. CountDownLatch can also complete such a function, that is, set the counter of CountDownLatch to 1 and block the fence through await(). It needs to rely on external threads to execute countDown().

3. In the scenario where the main thread needs to wait for other threads to finish executing, these two locks can be completed.

4. If it is not a recycling scenario, lz thinks that CountDownLatch can be used more flexibly, so it is more recommended.

Added by vigour on Sun, 09 Jan 2022 04:44:33 +0200