1. CountDownLatch
CountDownLatch is a synchronization tool class that allows one or more threads to wait until other threads have finished executing.
CountDownLatch is implemented through a counter that is initialized to the number of threads. Each time a thread completes its own task, the counter is subtracted by 1. When the counter reaches 0, it means that all threads have completed their task, and threads waiting on the latch can resume execution.
There is now a sports scene in which there are five athletes and one referee. Before the competition starts, all the athletes need to wait for the referee to fire the starting gun and then start running. After all the athletes have finished running, the referee needs to call all the athletes together
public class SportsMeeting{ public static void main(String[] args) throws InterruptedException { CountDownLatch begin = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(5); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("Athletes" + no + "Ready"); begin.await(); System.out.println("Athletes" + no + "Running!"); Thread.sleep(new Random().nextInt(2000)); System.out.println("Athletes" + no + "Run to the end!"); } catch (InterruptedException e) { e.printStackTrace(); } finally { end.countDown(); } } }; poolExecutor.execute(runnable); } /* The referee is ready to fire the gun */ Thread.sleep(5000); System.out.println("---------Start the game by shooting!---------"); begin.countDown(); end.await(); System.out.println("---------At the end of the competition, please gather all the participants!---------"); poolExecutor.shutdown(); } }
Athlete 5 is ready Athlete 4 is ready Athlete 1 is ready Athlete 2 is ready Athlete 3 is ready ---------Start the game by shooting!--------- Athlete 4 is running! Athlete 2 is running! Athlete 1 is running! Athlete 5 is running! Athlete 3 is running! Athlete 5 runs to the end! Athlete 1 runs to the end! Athlete 2 runs to the end! Athlete 4 runs to the end! Athlete 3 runs to the end! ---------At the end of the competition, please gather all the participants!---------
ps: The counter must be greater than 0, and when the counter is equal to 0, the current thread will not be blocked when the await method is called. CountDownLatch cannot reinitialize or modify the value of the CountDownLatch internal counter
2. CyclicBarrier
Cyclic Barrier literally means a reusable barrier (Barrier). What it does is block a group of threads when they reach a barrier (also known as a synchronization point), until the last thread reaches the barrier, the barrier will not open, and all threads blocked by the barrier will continue to run.
CyclicBarrier has two sets of constructors
public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
The second set of constructors differs in that when all threads reach the barrier, the barrierAction thread is executed first, making it easier to handle more complex business scenarios
CountDownLatch was introduced earlier, and the main differences between the two are:
- CountDownLatch is one-time, CyclicBarrier is reusable
- The functions of the threads that CountDownLatch participates in are different, either in a countdown or waiting for the countdown to end, mainly for events.
Parameters are primarily used for threads.
Now there is a scene where a group of 15 students need to take a car to a tourist attraction, but a car can only take 5 students at a time, so it can only take a car in batches, and each batch must wait for everyone to arrive before it can leave.
public class Ride{ public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("---------Here's the wave. Let's go by car!---------"); } }); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,10,60, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10)); for (int i = 0; i < 15; i++) { final int no = i+1; Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println("Classmate"+no+"Going to the designated location"); Thread.sleep(new Random().nextInt(4000)); System.out.println("Classmate"+no+"To the designated location, waiting for someone else"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; poolExecutor.execute(runnable); } poolExecutor.shutdown(); } }
Class 3 is going to the designated location Class 1 is going to the designated location Class 5 is going to the designated location Class 4 is going to the designated location Class 2 is going to the designated location Class 5 arrives at the designated location and is waiting for someone else Class 3 arrives at the designated location and is waiting for someone else Classmate 1 arrives at the designated location and is waiting for someone else Classmate 4 arrives at the designated location and is waiting for someone else Classmate 2 arrives at the designated location and is waiting for someone else ---------Here's the wave. Let's go by car!--------- Class 6 is going to the designated location Class 7 is going to the designated location Class 10 is going to the designated location Class 9 is going to the designated location Class 8 is going to the designated place Class 8 arrives at the designated location and is waiting for someone else Classmate 10 arrives at the designated location and is waiting for someone else Classmate 6 arrives at the designated location and is waiting for someone else Class 9 arrives at the designated location and is waiting for someone else Class 7 arrives at the designated location and is waiting for someone else ---------Here's the wave. Let's go by car!--------- Class 11 is going to the designated location Class 12 is going to the designated location Class 14 is going to the designated location Class 13 is going to the designated location Class 15 is going to the designated location Classmate 11 arrives at the designated location and is waiting for someone else Classmate 13 arrives at the designated location and is waiting for someone else Class 14 arrives at the designated location and is waiting for someone else Class 12 arrives at the designated location and is waiting for someone else Class 15 arrives at the designated location and is waiting for someone else ---------Here's the wave. Let's go by car!---------
3. Semaphore
Semaphore (semaphore) is used to control the number of threads accessing a specific resource at the same time. By coordinating threads, each thread is issued a "pass" to ensure the rational use of public resources.
Common scenarios for Semaphore:
Semaphore scenarios are mainly used for traffic control, such as database connections. There are limitations on the number of database connections used at the same time. Database connections cannot exceed a certain number. When the number of connections is reached, subsequent threads can only queue until the front threads release the database connection before they can obtain the database connection.
Another example is the traffic lights on highways, where only 100 vehicles can pass when the green lights are on, but no vehicles are allowed when the red lights are on.
For example, in a parking lot scene, a parking lot has a limited number of parking spaces, and how many cars can it hold at the same time. When the parking space is full, only the cars inside can enter when they leave the parking lot.
Semaphore schematic:
To explain Semaphore, Semaphore has an initial capacity, which is the semaphore Semaphore can allow. After calling the acquire method in Semaphore, Semaphore has a capacity of -1, as opposed to + 1 after calling the release method, during which the counter has been monitoring SemaphoreQuantity changes until traffic exceeds Semaphore's capacity, and excess traffic queues up in a waiting queue. Semaphore's capacity allows it to re-enter.
Semaphore controls traffic as threads, since the primary object of study for concurrency tools is threads.
There is a scenario where you need to read data from tens of thousands of files and write the file data to the database, but there are only 10 connections to the database, so you have to control that only 10 threads get the database connection resources to operate the database
public class SavaMassiveData{ private static final int ThreadCount = 30; private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 12, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < ThreadCount; i++) { poolExecutor.execute(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println("save data"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } poolExecutor.shutdown(); } }
Other Semaphore methods
- drainPermits: Obtaining and returning all immediately available licenses is essentially equivalent to using the CAS method to set the memory value to zero
- ReducPermits: Similar to the nonfairTryAcquireShared method, except that nonfairTryAcquireShared uses CAS to make the memory value + 1 and reducePermits makes the memory value - 1.
- isFair: Is the competition for Semaphore licenses fair or unfair, and the internal implementations are FairSync and NonfairSync
- hasQueuedThreads: Are there any threads currently blocked due to Semaphore licensing?
- getQueuedThreads: Returns a collection of threads waiting to be licensed.
- getQueueLength: Gets the number of threads that are queued and blocked