Synchronization component applications of AQS: CountDownLatch, Cyclicbarrier and Semaphore

Synchronization: concurrent threads may need to wait for notifications or exchange messages at a key point. This mechanism is called thread synchronization.

Several of their underlying implementations are AQS, which are concurrent tool classes provided in the jdk contract. Note that they are tool classes, the same as the tool classes we usually write or use. Their purpose is to help us deal with concurrent process control or solve some problems we encounter when multithreading (such as multithreading waiting for notification, simulating concurrency, etc.)

CountDownLatch

You can let threads wait for other threads to complete the operation before doing it themselves.

Similar to join, it allows the main thread to suspend the sub thread to execute first; It is also more flexible than join. A join can only notify the main thread when the sub thread is fully executed. countDownLatch allows the sub thread to display the notification to the main thread at any time. After receiving the notification, the main thread can execute with the sub thread; The implementation function can be similar to that of FutureTask. FutureTask can return the status of the sub thread processing tasks. The main thread continuously judges whether the sub thread status is done through the while loop. This method takes up cpu resources because it needs to be busy to cycle and constantly check that all sub threads are processed. The implementation mechanism of countDownLatch is that the main thread is "suspended" after it is suspended, The cpu is released and the main thread is notified after all sub threads are completed. This method causes context switching between threads and consumes performance.

Usage:

public class CountDownLatchTest {
    public static void main(String[] args) {
        CountDownLatch taskLock = new CountDownLatch(3);
       Thread thread1 = new Thread(()->{
               String name = Thread.currentThread().getName();
               System.out.println(name+":Processing business");
               taskLock.countDown();
               System.out.println(name+":After the call, the resource is released, and the synchronization status remains:"+taskLock.getCount());
       },"Thread-1");
        Thread thread2 = new Thread(()->{
                String name = Thread.currentThread().getName();
                System.out.println(name+":Processing business");
                taskLock.countDown();
                System.out.println(name+":After the call, the resource is released, and the synchronization status remains:"+taskLock.getCount());
        },"Thread-2");
        Thread thread3 = new Thread(()->{
                String name = Thread.currentThread().getName();
                System.out.println(name+":Processing business");
                taskLock.countDown();
                System.out.println(name+":After the call, the resource is released, and the synchronization status remains:"+taskLock.getCount());
        },"Thread-3");

        System.out.println("----------The main thread starts and multiple threads are started to process business------------");
        thread1.start();
        thread2.start();
        thread3.start();
        System.out.println("When the main thread encounters a synchronization state, it needs to synchronize and enter the waiting state,Suspended——————");
//After calling the await method, the main thread is blocked. When the child thread countdown, the synchronization state is reduced by 1 until it is reduced to 0, and the main thread continues to execute
//      taskLock.await(2000, TimeUnit.MILLISECONDS);
        taskLock.await();
        System.out.println("The main thread is notified that it can continue"+taskLock.getCount());

    }
}
-----------------The main thread starts and multiple threads are started to process business------------
When the main thread encounters a synchronization state, it needs to synchronize and enter the waiting state,Suspended——————
Thread-3:Processing business
Thread-3:Release the resource after calling, and the synchronization status remains: 2
Thread-2:Processing business
Thread-2:Release the resource after calling, and the synchronization status remains: 1
Thread-1:Processing business
 The main thread is notified that it can continue 0
Thread-1:Release resources after calling, synchronization status remaining: 0

1. When countDownLatch is created, the number of synchronization states is passed in (assign a value to state), and several sub threads are set;

2. After calling await, the main thread is blocked until the state value is 0;

3. When the child thread calls countDown once, it will reduce the state by 1 until it is reduced to 0 and wake up the main thread.

The bottom layer of countDown latch uses AQS to subtract the synchronization state variables. The two main methods are await and countDown, which are similar to the monitor's await and notify. For thread blocking at the bottom, the park and unpark of LockSupport are used to schedule multiple threads through countDown latch to realize multi-threaded synchronization.

CyclicBarrier

Synchronization barrier: let a group of threads execute together after reaching a certain point.

Similar to a barrier, it blocks threads and releases them at the same time after they arrive, and its synchronization state can be recycled. Its application can simulate concurrency, or let a thread wait for multiple threads to execute, and then comprehensively process the results of each thread.

Usage:

public class CyclicBarrierTest {

    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Thread(()->{
            System.out.println("Initialized---3 Threads---Let's do it together");
    }));

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,3,0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3));
            threadPoolExecutor.submit(()->{
                try {
                    System.out.println("I am 1=="+System.currentTimeMillis());
                    cyclicBarrier.await();
                    System.out.println("I am 1 processing logic=="+System.currentTimeMillis());
                } catch (Exception e) {}
            });
            threadPoolExecutor.submit(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("I'm 2=="+System.currentTimeMillis());
                    cyclicBarrier.await();
                    System.out.println("I am 2 processing logic=="+System.currentTimeMillis());
                } catch (Exception e) {}
            });
            threadPoolExecutor.submit(()->{
                try {
                    Thread.sleep(2000);
                    System.out.println("I'm 3=="+System.currentTimeMillis());
                    cyclicBarrier.await();
                    System.out.println("I am 3 processing logic=="+System.currentTimeMillis());
                } catch (Exception e) {}
            });
            threadPoolExecutor.shutdown();
    }
}
I am 1==1640917282738
 I'm 2==1640917283741
 I'm 3==1640917284740
 Initialized---3 Threads---Let's do it together
 I am 3 processing logic==1640917284740
 I am 1 processing logic==1640917284740
 I am 2 processing logic==1640917284740

1. When the CyclicBarrier is created, the number of synchronization states is passed in (assign a value to state), and several sub threads are set;

2. The child thread is blocked after calling await;

3. When a child thread adjusts await once, it subtracts state by 1. All threads are not awakened until the state value is 0

The AQS used by the underlying implementation of the CyclicBarrier subtracts the initial synchronization state variables, which is realized by using the reentrant lock and the waiting condition. When locking to ensure that there is only one modified synchronization state for multiple threads, then judge whether the modified synchronization state is equal to 0. If it is equal to 0, wake up all threads waiting on the condition, which is not 0, The current thread is suspended on condition.

Semaphore

Semaphore: it also implements the scheduling between threads

It adopts the form of increasing synchronous state to realize scheduling. The synchronization status of initialization settings is 0. One group of threads let him increase and one group of threads let him decrease. Always judge around 0. Semaphores have two strategies: fair and unfair.

Usage:

public class SemaphoreTest3 {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(0);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,3,0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(3));
        threadPoolExecutor.submit(()->{
            try {
                System.out.println("I am 1=I'll give you a semaphore="+System.currentTimeMillis());
                semaphore.release(1);
            } catch (Exception e) {}
        });
        threadPoolExecutor.submit(()->{
            try {
                Thread.sleep(1000);
                System.out.println("I'm 2==I'll give you two semaphores="+System.currentTimeMillis());
                semaphore.release(2);
            } catch (Exception e) {}
        });
        threadPoolExecutor.submit(()->{
            try {
                Thread.sleep(2000);
                System.out.println("I'm 3=I'll give you a semaphore="+System.currentTimeMillis());
                semaphore.release(1);
            } catch (Exception e) {}
        });
        threadPoolExecutor.shutdown();
        try {
            System.out.println("The main thread is suspended as it walks. I need four semaphores");
            semaphore.acquire(4);//The synchronization status is - 4
            System.out.println("The main thread is awakened and can continue again");
        } catch (InterruptedException e) {}
    }
}
The main thread is suspended as it walks. I need four semaphores
 I am 1=I'll give you a semaphore=1640921214160
 I'm 2==I'll give you two semaphores=1640921215162
 I'm 3=I'll give you a semaphore=1640921216161
 The main thread is awakened and can continue again

Calling acquire is blocking. The condition of wake-up is the number of parameters to be passed in. Calling release is to wake up the thread, but the condition is to reach the value of acquire.

1. When the semaphore is initially synchronized, the synchronization state is set to 0,

2. Call acquire to pass in the value, block the thread, and change the synchronization status value to 0 minus the newly passed value (negative number)

3. Call release to pass in the value, which is added to the synchronization status. When the synchronization status is 0 again, the thread will wake up.

Keywords: Java Back-end

Added by jredsmyth on Mon, 03 Jan 2022 14:21:49 +0200