Concurrent tool classes in Java
Several useful concurrency tool classes are provided in the concurrency package of JDK. CountDownLatch, CyclicBarrier and Semaphore tool classes provide a means to control concurrent processes, while exchange tool classes provide a means to exchange data between processes.
1. CountDownLatch waiting for multithreading to complete
CountDownLatch allows one or more threads to wait for other threads to complete operations.
If there is such a demand: we need to analyze the data of multiple sheets in an Excel. At this time, we can consider using multiple threads. Each thread analyzes the data in a sheet. After all sheets are analyzed, the program needs to prompt that the analysis is completed.
In this requirement, the simplest way to realize that the main thread waits for all threads to complete the sheet parsing operation is to use the join() method.
public class JoinCountDownLatchTest { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @Override public void run() { } }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish " ); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish ."); } }
Join is used to make the current executing thread wait for the execution of the join thread to end. Its implementation principle is to constantly check whether the join thread is alive. If the join thread is alive, let the current thread wait forever. Where wait(0) means to wait forever.
while (isAlive()) { wait(0); }
Until the join thread terminates, the thread's this The notifyAll() method will be called. The notifyAll() method is implemented in the JVM, so it can't be seen in the JDK.
In jdk1 The CountDownLatch provided in the concurrent package after 5 can also realize the function of join, and has more functions than join. The code is as follows:
public class CountDownLatchTest { static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println("Log information of magic leather:---->"+ 1); c.countDown(); System.out.println("Log information of magic leather:---->"+ 2); c.countDown(); } }).start(); c.await(); System.out.println("Log information of magic leather:---->"+ 3); } }
The constructor of CountDownLatch receives an int type parameter as a counter. If you want to wait for N points to complete, enter N here.
When we call the CountDown method of CountDownLatch, N will be reduced by one, and the await method of CountDownLatch will block the current thread until N becomes 0
Since the countDown method can be used anywhere, the n points mentioned here can be n threads or N execution steps in one thread. For multithreading, you only need to pass the reference of CountDownLatch to the thread.
If a thread that parses a sheet processes slowly, it is impossible for us to keep the main thread waiting, so we can use another await method with a specified time - await(long time, TimeUnit unit). After waiting for a specific time, this method will no longer block the current thread. join has a similar method.
be careful:
When the counter is greater than or equal to 0, but equal to 0, the counter is zero. When await method is called, the current thread will not be blocked.
CountDownLatch cannot reinitialize or modify the value of the internal counter of the CountDownLatch object.
One thread calls the countDown method happen before, and the other thread calls the await method.
2. Synchronization barrier CyclicBarrier
CyclicBarrier literally means a recyclable barrier. What it needs to do is to block a group of threads when they reach a barrier (also known as synchronization point). The barrier will not open until the last thread reaches the barrier, and all threads intercepted by the barrier will continue to run.
(1) introduction to CyclicBarrier
The default construction method of CyclicBarrier is CyclicBarrier(int parties). Its parameter indicates the number of threads intercepted by the barrier. Each thread calls the await method to tell CyclicBarrier that I have reached the screen, and then the current thread is blocked.
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { try { c.await(); }catch (Exception e){ } System.out.println(1); } }).start(); try { c.await(); }catch (Exception e){ } System.out.println(2); } }
Because the scheduling of the main thread and sub thread is determined by the CPU, both threads may execute, so two kinds of outputs will be generated.
If new CyclicBarrier(2) is changed to new CyclicBarrier(3), the main thread and sub thread will wait forever, because no third thread executes the await() method, that is, no third thread reaches the barrier, so the two threads that previously reached the barrier will not continue to execute.
CyclicBarrier also provides a more advanced constructor CyclicBarrier(int parties, Runnable barrierAction), which is used to give priority to the execution of barrierAction when the thread reaches the barrier, so as to facilitate the processing of more complex business scenarios.
public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2,new A()); public static void main(String[] args){ new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(2); } static class A implements Runnable{ @Override public void run() { System.out.println(3); } } }
(2) Application scenario of CyclicBarrier
CyclicBarrier can be applied to the scenario of multi-threaded computing data and finally merging computing results.
For example, an Excel is used to maintain all the bank flow of the user, and each sheet saves each bank flow of an account for nearly a year. Now it is necessary to count the user's daily average bank flow. First, use multi threads to process the daily average bank flow in each sheet, and finally, use the calculation results of these threads to calculate the daily average bank flow of the whole Excel.
public class BankWaterService implements Runnable{ /* Create four barriers and execute the run method of the current class after processing */ private CyclicBarrier c = new CyclicBarrier(4,this); /* Suppose there are only four sheet s, so start four threads */ private Executor executor = Executors.newFixedThreadPool(4); /* Save the calculated silver flow results of each sheet */ private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<String,Integer>(); private void count(){ for (int i=0;i<4;i++){ executor.execute(new Runnable() { @Override public void run() { // Calculate the silver flow data of the current sheet, and the calculation code is omitted sheetBankWaterCount.put(Thread.currentThread().getName(),1); // Silver flow calculation is completed and a barrier is inserted try { c.await(); } catch (Exception e) { e.printStackTrace(); } } }); } } @Override public void run() { int result = 0; // Summarize the calculated results of each sheet for (Entry<String,Integer> sheet: sheetBankWaterCount.entrySet()) { result+=sheet.getValue(); } // Output results sheetBankWaterCount.put("result",result); System.out.println(result); } public static void main(String[] args){ BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); } }
Use the thread pool to create four threads and calculate the data in each sheet respectively. The calculation result of each sheet is 1, and then the BankWaterService thread summarizes the results calculated by the four sheets.
(3). The difference between CyclicBarrier and CountDownLatch
The counter of CountDownLatch can only be used once, while the counter of CyclicBarrier can be reset with the reset() method, so CyclicBarrier can handle more complex business scenarios. For example, if a calculation error occurs, you can reset the counter and let the thread execute again.
CyclicBarrier also provides other useful methods, such as getNumberWaiting method, which can obtain the number of threads blocked by CyclicBarrier. The isbroken () method is used to know whether the blocked thread is interrupted.
public class CyclicBarrierTest3 { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args){ Thread thread = new Thread(new Runnable() { @Override public void run() { try { c.await(); }catch (Exception e){ } } }); thread.start(); thread.interrupt(); try { c.await(); }catch (Exception e){ System.out.println(c.isBroken()); } } }
3. Semaphore for controlling the number of concurrent threads
Semaphore is used to control the number of threads accessing specific resources at the same time. It ensures the rational use of public resources by coordinating each thread.
(1) Application scenario
Semaphore can be used for flow control, especially in application scenarios with limited public resources, such as database connection.
For example, there is a need to read the data of tens of thousands of files. Because they are IO intensive tasks, we can start dozens of threads to read them concurrently. However, if we need to store them in the database after reading the memory, and there are only 10 connections to the database, we must control only 10 threads to obtain the database connection and save the data at the same time, Otherwise, an error will be reported and the database connection cannot be obtained. At this time, Semaphore can be used for flow control.
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args){ for (int i=0;i<THREAD_COUNT;i++){ threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (Exception e) { e.printStackTrace(); } } }); } threadPool.shutdown(); } }
Although there are 30 threads executing in the code, only 10 concurrent executions are allowed. Semaphore's construction method Semaphore(int permits) receives an integer number indicating the number of licenses available. Semaphore(10) indicates that 10 threads are allowed to obtain licenses, that is, the maximum number of concurrency is 10
The usage of Semaphore is also very simple. First, thread uses the acquire() method of Semaphore to get a license, then use release() method to return the license. You can also try to acquire a license using the tryAcquire() method.
(2) Other methods
Semaphore also provides some other methods, as follows:
a. Int availablepermissions(): returns the number of currently available licenses in this semaphore.
b. int getQueueLength(): returns the number of threads waiting to obtain licenses.
c. boolean hasQueueThreads(): whether there are threads waiting to obtain licenses.
d. void reducePermits(int reduction): reducing licenses is a protected method.
e. Collection getQueuedThreads(): returns the collection of all threads waiting to obtain licenses. It is a protected method.
3. Exchange for data exchange between threads
Exchange is a tool class for inter thread cooperation. Exchange is used for inter thread data exchange.
It provides a synchronization point where two threads can exchange data with each other.
The two threads exchange data through the exchange method. If the first thread executes the exchange() method first, it will wait until the second thread also executes the exchange method. When both threads reach the synchronization point, the two threads can exchange data and pass the data generated by this thread to each other.
Exchange can be used in genetic algorithm. In the genetic algorithm, two people need to be selected as mating objects. At this time, the data of the two people will be exchanged and two mating results will be obtained by using the crossover rule.
Exchange can also be used for proofreading. For example, we need to manually enter the paper bank flow into the E-bank flow. In order to avoid errors, we use two people from AB post to enter. After entering into excel, the system needs to load the two excel and proofread the two Excel data to see whether the entries are consistent.
public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args){ threadPool.execute(new Runnable() { @Override public void run() { try { String A = "Bank flow A"; // A. enter bank daily data exgr.exchange(A); }catch (Exception e){ } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String B = "Bank flow B"; // B entered bank journal String A = exgr.exchange("B"); System.out.println("A and B Whether the data are consistent:"+ B.equals(A)+",A Enter:"+A+",B Enter:"+B ); }catch (Exception e){ } } }); threadPool.shutdown(); } }
If one of the two threads does not execute the exchange() method, it will wait all the time. If you are worried about special situations and avoid waiting all the time, you can use exchange(V x,long timeout,TimeUnit unit) to set the maximum waiting time.