1, Semaphore
Semaphore definition
Semaphore literally means semaphore. Its function is to control the number of threads accessing specific resources. The bottom layer depends on the State of AQS. It is a tool class commonly used in production.
Common methods of Semaphore
Construction method
public Semaphore(int permits) public Semaphore(int permits, boolean fair)
- permits indicates the number of threads allowed
- fair indicates fairness. If this is set to true, the next thread to execute will be the thread that waits the longest
common method
public void acquire() throws InterruptedException public void release() tryAcquire(long timeout, TimeUnit unit)
- acquire() indicates blocking and
- release() means to release the license
Basic use
Usage scenario:
Resource access and service flow restriction (in Hystrix, the flow restriction is based on semaphore).
Code implementation:
package com.jihu.test.semaphore; import java.util.concurrent.Semaphore; public class SemaphoreRunner { public static void main(String[] args) { Semaphore semaphore = new Semaphore(2); for (int i = 0; i < 5; i++) { new Thread(new Task(semaphore, "xiao yan+" + i)).start(); } } static class Task extends Thread { Semaphore semaphore; public Task(Semaphore semaphore, String tname) { this.semaphore = semaphore; this.setName(tname); } public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":aquire() at time:" + System.currentTimeMillis()); Thread.sleep(1000); semaphore.release(); System.out.println(Thread.currentThread().getName() + ":aquire() at time:" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
result:
Thread-1:aquire() at time:1622590044923 Thread-3:aquire() at time:1622590044924 Thread-3:aquire() at time:1622590045931 Thread-5:aquire() at time:1622590045931 Thread-7:aquire() at time:1622590045932 Thread-1:aquire() at time:1622590045932 Thread-9:aquire() at time:1622590046931 Thread-5:aquire() at time:1622590046931 Thread-7:aquire() at time:1622590046932 Thread-9:aquire() at time:1622590047932 Process finished with exit code 0
It can be seen from the print results that only two threads execute acquire() at a time. Only after the thread performs the release() method can other threads execute acquire().
2, CountDownLatch
definition
CountDownLatch enables a thread to wait for other threads to finish their work before executing. For example, the main thread of an application wants to execute after the thread responsible for starting the framework service has started all the framework services. Wait for all threads in the thread pool to finish executing before ending.
Usage scenario:
Zookeeper distributed lock, Jmeter simulation, high concurrency, etc.
working principle
CountDownLatch is implemented through a counter. The initial value of the counter is the number of threads. Every time a thread completes its task, the value of the counter will be reduced by 1. When the counter value reaches 0, it indicates that all threads have completed the task, and then the thread waiting on the lock can resume executing the task.
Common API
CountDownLatch.countDown() // The number of threads in the wait counter is decremented by 1 CountDownLatch.await(); // Wait for all threads in the counter to complete execution ============== Source code ============== public void countDown() { sync.releaseShared(1); }
Application scenario example
For example, accompany your daughter-in-law to see a doctor.
There are many people lining up in the hospital. If you are alone, you should see the doctor first, and then go to the queue to pay for the medicine. Now we are dual core and can do these two things at the same time (multithreading). Suppose it takes 3 seconds to see the doctor and 5 seconds to queue up to pay for the medicine. If we do it at the same time, we can finish it in five seconds, and then go home together (back to the main thread).
The code is as follows:
/** * Doctor mission * */ public class SeeDoctorTask implements Runnable { private CountDownLatch countDownLatch; public SeeDoctorTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("Start seeing a doctor"); Thread.sleep(3000); System.out.println("After seeing the doctor, get ready to leave the ward"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } }
/** * Queued tasks */ public class QueueTask implements Runnable { private CountDownLatch countDownLatch; public QueueTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { System.out.println("Began to queue up at the hospital pharmacy to buy medicine...."); Thread.sleep(5000); System.out.println("If the queue is successful, you can start paying for medicine"); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (countDownLatch != null) countDownLatch.countDown(); } } }
/** * When it's her turn to see a doctor * I began to line up to pay. */ public class CountDownLaunchRunner { public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new SeeDoctorTask(countDownLatch)).start(); new Thread(new QueueTask(countDownLatch)).start(); //Wait until the execution of two tasks in the thread pool is completed, otherwise it will continue countDownLatch.await(); long time = System.currentTimeMillis() - now; System.out.println("over,get home cost:" + time); } }
3, CyclicBarrier
Fence barrier, so that a group of threads will be blocked when they reach a barrier (also known as synchronization point). The barrier will not open until the last thread reaches the barrier, and all threads intercepted by the barrier will continue to run.
The default construction method of CyclicBarrier is CyclicBarrier (int parties). Its parameter indicates the number of threads intercepted by the barrier. Each thread calls the await method to tell CyclicBarrier that I have reached the barrier, and then the current thread is blocked.
Common methods
cyclicBarrier.await();
Application scenario
It can be used in the scenario of multi-threaded calculation of data and finally merging the calculation results. For example, an Excel is used to save all the user's Bank flow, and each sheet is used to save each bank flow of an account for nearly a year. Now it is necessary to count the user's daily average bank flow. First, use multiple threads to process the bank flow in each sheet, and then get the daily average bank flow of each sheet. Finally, use the calculation results of these threads with barrierAction, Calculate the daily average bank flow of the whole Excel. Example code:
public class CyclicBarrierRunner implements Runnable { private CyclicBarrier cyclicBarrier; private int index; public CyclicBarrierRunner(CyclicBarrier cyclicBarrier, int index) { this.cyclicBarrier = cyclicBarrier; this.index = index; } public void run() { try { System.out.println("index: " + index); index--; cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() { public void run() { System.out.println("All agents reached the barrier and were ready to start a secret mission"); } }); for (int i = 0; i < 10; i++) { new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start(); } cyclicBarrier.await(); System.out.println("All reach the barrier...."); } }
result:
index: 0 index: 1 index: 2 index: 3 index: 4 index: 5 index: 6 index: 7 index: 8 index: 9 All agents reached the barrier and were ready to start a secret mission All reach the barrier.... Process finished with exit code 0
4, Executors
It is mainly used to create thread pool. It acts as an agent for the creation of thread pool, which makes your creation entry parameters simple.
Related to thread pool can be found in this article: Java Concurrent ThreadPoolExecutor principle source code explanation- https://blog.csdn.net/qq_43631716/article/details/114788744
Important methods
- newCachedThreadPool: create a cacheable thread pool. If the length of the thread pool exceeds the processing needs, you can flexibly recycle idle threads. If there is no recyclable thread, you can create a new thread.
- newFixedThreadPool: create a fixed length thread pool to control the maximum concurrent number of threads. The exceeded threads will wait in the queue.
- newScheduledThreadPool: create a fixed length thread pool to support scheduled and periodic task execution.
- Newsinglethreadexecution creates a singleton thread pool, which only uses a unique worker thread to execute tasks, ensuring that all tasks are executed in the specified order (FIFO, LIFO, priority).
Disadvantages (not recommended by Alibaba):
The thread pool created by Executors uses unbounded queues, which will bring many disadvantages. The most important thing is that it can save tasks indefinitely, so it is likely to cause OOM exceptions. At the same time, in some types of thread pools, the use of unbounded queue will also lead to the invalidation of parameters such as maximumpoolsize, keepAliveTime and handler
CachedThreadPool: this maximum creates an integer MAX_ Value is a thread, which will consume a lot of CPU resources.
5, Exchange
When one thread runs to the exchange() method, it will block. When another thread runs to exchange(), they exchange data and then execute the following program.
There are few application scenarios. You can understand them.
package com.jihu.test.exchanger; import java.util.concurrent.Exchanger; public class ExchangerTest { public static void main(String[] args) { final Exchanger<Integer> exchanger = new Exchanger<Integer>(); for (int i = 0; i < 10; i++) { final Integer num = i; new Thread() { public void run() { System.out.println("I am a thread: Thread_" + this.getName() + "My data is:" + num); try { Integer exchangeNum = exchanger.exchange(num); Thread.sleep(1000); System.out.println("I am a thread: Thread_" + this.getName() + "My original data is:" + num + " , The data after exchange is:" + exchangeNum); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } } }