Thread pool
Personal blog: www.xiaobeigua.icu
1.2} thread pool
Thread pool is a common way to use threads effectively. A certain number of working threads can be created in the thread pool in advance. The client code directly submits tasks to the thread pool as an object. The thread pool caches these tasks in the work queue. The working threads in the thread pool constantly take out tasks from the queue and execute them.
1.2. 1 what is a thread pool
You can use new thread (() - > {task executed by thread}) start(); ( λ Expression) starts a thread in this form When the run() method ends, the thread object will be released by GC.
In a real production environment, many threads may be required to support the whole application. When the number of threads is very large, CPU resources will be exhausted. If you do not control and manage threads, it will affect the performance of the program.
The main thread overhead includes:
The cost of creating and starting threads;
Thread destruction overhead;
Thread scheduling overhead;
The number of threads is limited to the number of CPU processors.
Thread pool is a common way to use threads effectively. A certain number of working threads can be created in the thread pool in advance. The client code directly submits tasks to the thread pool as an object. The thread pool caches these tasks in the work queue. The working threads in the thread pool constantly take out tasks from the queue and execute them.
1.2.2 JDK support for thread pool
JDK provides a set of Executor framework, which can help developers use thread pool effectively
In java, there are many implementation classes for thread pools. Generally, we can use the ThreadPoolExecutor implementation class or Executors tool class in the above figure to create thread pools. More thread pool classes will be analyzed later
Example: use the} Executors tool class to create and use thread pools
/** * Basic use of thread pool * Author: Xiao Beigua */ public class TestOne { public static void main(String[] args) { //Create a thread pool with 5 threads using Executors ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); //Create 15 tasks to assign to the thread pool for (int i=0;i<15;i++){ //Perform tasks fixedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getId()+"The thread executed the task...."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
Results: one handed over 15 tasks to the thread pool, but there were only 5 active threads, so only five tasks could be executed at one time, and the tasks under him would enter the task waiting queue to wait
1.2. 3. Bottom implementation of core thread pool
View the source code of newcachedthreadpool(), newsinglethreadexcessor(), newfixedthreadpool() in the Executors tool class:
Note: these methods are used to create thread pools with different functions
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
It is found that the underlying methods that return the thread pool in the Excutor tool class use the ThreadPoolExecutor thread pool, so we can find that these methods are encapsulated by the ThreadPoolExecutor thread pool
Let's take a look at the construction method of ThreadPoolExecutor:
ThreadPoolExecutor Construction method of: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Meaning of each parameter:
corePoolSize, which specifies the number of core threads in the thread pool
maxinumPoolSize, specifies the maximum number of threads in the thread pool
keepAliveTime: when the number of threads in the thread pool exceeds the corePoolSize, the survival time of redundant idle threads, that is, how long the idle threads will be destroyed in domestic sales, which is the unit of keepAliveTime
workQueue, task queue, submit tasks to the task queue for execution
threadFactory, a thread factory, is used to create a thread handler rejection policy. When there are too many tasks to process, how to reject them
handler reject policy. How to reject when there are too many tasks to process
workQueue} Description:
workQueue work queue refers to the queue that submits unexecuted tasks. It is an object of the BlockingQueue interface and is only used to store Runnable tasks. According to the queue function classification, the following blocking queues can be used in the ThreadPoolExecutor construction method:
1) the direct submission queue is provided by the SynchronousQueue object. The queue has no capacity, and the tasks submitted to the thread pool will not be saved. New tasks are always submitted to the thread for execution. If there are no idle threads, try to create a new thread. If the number of threads has reached the maximum value specified in maxinumPoolSize, execute the rejection policy.
2) the bounded task queue is provided by the ArrayBlockingQueue object. When creating the ArrayBlockingQueue object, you can specify a capacity. When a task needs to be executed, if the number of threads in the thread pool is less than the number of core threads in corePoolSize, a new thread will be created; If the number of core threads is greater than corePoolSize, it will join the waiting queue. If the queue is full, you cannot join. If the number of threads is less than the maximum number of threads specified in maxinumPoolSize, a new thread will be created to execute. If the number of threads is greater than the maximum number of threads specified in maxinumPoolSize, the reject policy will be executed.
3) the unbounded task queue is implemented by the LinkedBlockingQueue object. Compared with the bounded queue, the unbounded queue does not fail to join the queue unless the system resources are exhausted. When there are new tasks, when the number of system threads is less than the number of core threads of corePoolSize, a new thread is created to execute the task. When the number of threads in the thread pool is greater than the number of core threads of corePoolSize, the task is added to the blocking queue.
4) priority task queue is realized through PriorityBlockingQueue. It is a queue with task priority and a special unbounded queue. Both ArrayBlockingQueue queue and LinkedBlockingQueue process tasks according to the first in first out algorithm. In the PriorityBlockingQueue queue, tasks can be executed in order of priority.
1.2. 4 reject policy
The last parameter of the ThreadPoolExecutor constructor specifies the rejection policy. How to handle when the amount of tasks submitted to the thread pool exceeds the actual carrying capacity? That is, the threads in the thread pool have been used up, and the waiting queue is full, so it cannot serve the newly submitted tasks. You can handle this problem through the reject policy.
JDK provides four rejection policies:
AbortPolicy policy policy, an exception will be thrown
CallerRunsPolicy policy: as long as the thread pool is not closed, the currently discarded tasks will be run in the caller thread
DiscardOldestPolicy discards the oldest task in the task queue and tries to submit a new task again
DiscardPolicy directly discards this unprocessable task
The default rejection policy of the thread pool returned by the static method provided by the Executors tool class is AbortPolicy. If the built-in rejection policy cannot meet the actual requirements, the RejectedExecutionHandler interface can be extended
Example: use ThreadPoolExecutor to create a thread pool with a fixed size of 2 and assign him 5 tasks
public class TestOne { public static void main(String[] args) { //The ThreadPoolExecutor is used to create a thread pool with a fixed size of 2. The use of new synchronousqueue < > () means that when all threads in the thread pool are working, the new task will be rejected directly ThreadPoolExecutor threadPool=new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //r is the request task, and the executor is the current thread pool System.out.println(r+"Thread pool rejected"); } }); //Create 5 tasks and assign them to the thread pool for (int i=0;i<5;i++){ //Perform tasks threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getId()+"The thread executed the task...."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
result:
Analysis:
We only create a maximum of 2 threads in the thread pool and use the direct submission work queue of new synchronousqueue < > (), which means that when all the threads eaten by the thread are working, the new task is directly rejected. Therefore, when five tasks enter the thread pool, only two can be executed by the threads in the thread pool, and the remaining three tasks are directly rejected
1.2.5 ThreadFactory
Where do the threads in the thread pool come from? The answer is ThreadFactory.
ThreadFactory is an interface. There is only one method for creating threads: Thread} newThread(Runnable r); This method is called when a Thread needs to be created in the Thread pool
Example: rewrite the thread factory when creating a thread pool: set the created thread as a daemon thread
public class TestOne { public static void main(String[] args) throws InterruptedException { //Define task Runnable r=new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getId()+"The thread is executing a task...."); while (true){ //Simulation task } } }; ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { //According to the parameter r task, create a thread to execute the task Thread thread=new Thread(r); System.out.println("The thread pool creates a daemon thread"+thread.getId()); //Set thread as daemon thread.setDaemon(true); return thread; } }); //Submit 5 tasks. When more than 5 tasks are submitted to the current thread pool, the thread pool will throw an exception by default for (int i = 0; i < 5; i++) { poolExecutor.execute(r); } //The main thread sleep 4s ensures that the thread performs tasks Thread.sleep(4000); } }
Results:
1.2. 6 monitoring thread pool
ThreadPoolExecutor provides a set of methods to monitor the thread pool, so as to better obtain the internal state of the thread pool and ensure system security
int getActiveCount() gets the number of currently active threads in the thread pool
long getCompletedTaskCount() returns the number of tasks completed by the thread pool
int getCorePoolSize() the number of core threads in the thread pool
int getLargestPoolSize() returns the maximum number of threads that the thread pool has ever reached
int getMaximumPoolSize() returns the maximum capacity of the thread pool
int getPoolSize() the size of the current thread pool
BlockingQueue getQueue() returns the blocking queue long
getTaskCount() returns the total number of tasks received by the thread pool
Example: use the above method to monitor the thread pool
public class TestOne { public static void main(String[] args) throws InterruptedException { //Define task Runnable runnable=new Runnable() { int i=0; @Override public void run() { //The simulation is performing a task System.out.println(" "); } }; //Create thread pool ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,5,0,TimeUnit.SECONDS,new SynchronousQueue<>(),new ThreadPoolExecutor.AbortPolicy()); //Create 10 tasks for (int i=0;i<5;i++){ System.out.println("Submission No"+(i+1)+"Secondary task==============================="); poolExecutor.execute(runnable); System.out.println("Number of core threads in the thread pool:"+poolExecutor.getCorePoolSize()); System.out.println("Maximum capacity in thread pool:"+poolExecutor.getMaximumPoolSize()); System.out.println("Number of active threads in the thread pool:"+poolExecutor.getActiveCount()); System.out.println("Current thread pool size:"+poolExecutor.getPoolSize()); System.out.println("Number of tasks played by thread pool:"+poolExecutor.getCompletedTaskCount()); System.out.println("Total number of tasks received by thread pool:"+poolExecutor.getTaskCount()); Thread.sleep(2000); } } }
result:
1.2. 7) optimize thread pool size
The thread pool size has a certain impact on the system performance. If it is too large or too small, it will not give full play to the optimal system performance. The thread pool size does not need to be very accurate, as long as the maximum or minimum situation is avoided. Generally speaking, the thread pool size needs to consider the number of CPU s, memory size and other factors.
A formula for estimating the size of thread pool is given in the book:
Thread pool size = number of CPUs * utilization of target CPU * (1 + ratio of waiting time to calculation time)
1.2. 8. Thread pool deadlock
If task A executed in the thread pool submits Task B to the thread pool during execution, Task B is added to the waiting queue of the thread pool. If task A ends, it needs to wait for the execution result of task B. This may happen: all working threads in the thread pool are waiting for task processing results, and these tasks are waiting for execution in the blocking queue. There are no threads in the thread pool that can process the tasks in the blocking queue. This waiting will continue, resulting in deadlock.
It is suitable to submit independent tasks to the thread pool rather than dependent tasks. For tasks that depend on each other, you can consider submitting them to different thread pools for execution.
1.2. 9. Simple handling of exceptions
When submitting a task using ThreadPoolExecutor, some tasks throw an exception, but the thread pool does not prompt, that is, the thread pool eats up the exception in the task. You can change the submit to execute
Example: using thread pool to execute the results of 4 mathematical calculation tasks {x/y}
public class TestOne { //Define the internal task class to calculate the sum of x/y static class MyRun implements Runnable{ private int x; private int y; private float result; public MyRun(int x,int y){ this.x=x; this.y=y; } @Override public void run() { result=x/y; System.out.println("input X="+x+" y="+y+"Task result is: "+result); } } public static void main(String[] args) throws InterruptedException { //Create thread pool ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2,5,0,TimeUnit.SECONDS,new LinkedBlockingDeque<>(),new ThreadPoolExecutor.AbortPolicy()); //Create 4 tasks poolExecutor.submit(new MyRun(0,5)); poolExecutor.submit(new MyRun(5,1)); poolExecutor.submit(new MyRun(5,2)); poolExecutor.submit(new MyRun(5,0)); } }
result:
It can be found that there are four tasks, but only three results are displayed after calculation. Because one task is 5 / 0, this is not allowed. An exception By Zero occurs, but the submit method is used. The exception is hidden and we can't see it
Here, we just need to change submit to execute
Result: an exception was thrown successfully
1.2. 10. ForkJoinPool thread pool
"Divide and conquer" is an effective method to deal with big data. The famous big data cornerstone MapReduce adopts this idea of divide and conquer. To put it simply, if there are 1000 data to be processed, but we do not have the ability to process 1000 data, we can only process 10 data. We can process the 1000 data 100 times in stages, 10 at a time, and synthesize the processing results of 100 times to form the processing results of the last 1000 data.
A large task is decomposed into several small tasks by calling fork() method, and the processing results of small tasks are combined into the results of large tasks by join()
The system optimizes the ForkJoinPool thread pool. The number of tasks submitted and the number of threads are not necessarily one-to-one. In most cases, a physical thread actually needs to handle multiple logical tasks. In other words, some threads may handle one task, while others may handle multiple tasks
The most common method in the ForkJoinPool thread pool is: < T > submit (ForkJoinTask task) in the < T > ForkJoinTask class to submit a ForkJoinTask task to the thread pool. The ForkJoinTask task supports fork() to decompose and join() to wait for tasks.
ForkJoinTask has two important subclasses: RecursiveAction and RecursiveTask. The difference between them is that the RecursiveAction task has no return value, and the RecursiveTask task can have a return value
Example: this example is borrowed from others. The notes are written in detail and need to be taken by yourself
public class TestOne { //Calculate the sum of the sequence and return the result. You can define the task to inherit the recursive task private static class CountTask extends RecursiveTask<Long> { private static final int THRESHOLD = 10000; //Define the threshold of data scale, allowing the calculation of the sum within 10000 numbers. The number series exceeding the threshold needs to be decomposed private static final int TASKNUM = 100; //The definition decomposes large tasks into 100 small tasks at a time private long start; //Calculate the starting value of the sequence private long end; //Calculate the end value of the sequence public CountTask(long start, long end) { this.start = start; this.end = end; } //Override the compute() method of the RecursiveTask class to calculate the result of the sequence @Override protected Long compute() { long sum = 0; //Save calculated results //Judge whether the task needs to continue decomposition. If the number of the current sequence end and start ranges exceeds the THRESHOLD threshold, it needs to continue decomposition if (end - start < THRESHOLD) { //Less than the threshold can be calculated directly for (long i = start; i <= end; i++) { sum += i; } } else { //The range of the sequence exceeds the threshold and needs to be decomposed //It is agreed to decompose each task into 100 small tasks and calculate the calculation amount of each task long step = (start + end) / TASKNUM; //Start = 0, end = 200000, step = 2000. If the sum of the sequence within the range of [0200000] is calculated, the sequence within the range is decomposed into 100 small tasks, and 2000 tasks can be calculated for each task //Note that if the level of task division is very deep, that is, the THRESHOLD of THRESHOLD threshold is too small, and the amount of computation of each task is very small, the level division will be very deep. There may be two situations: first, the number of threads in the system will accumulate, resulting in serious performance degradation; Second, too many decomposition times and too many method calls may lead to stack overflow //Create a collection of storage tasks ArrayList<CountTask> subTaskList = new ArrayList<>(); long pos = start; //Starting position of each task for (int i = 0; i < TASKNUM; i++) { long lastOne = pos + step; //End bit of each task //Adjust the end position of the last task if (lastOne > end) { lastOne = end; } //Create subtasks CountTask task = new CountTask(pos, lastOne); //Add task to collection subTaskList.add(task); //Call for() to submit the subtask task.fork(); //Adjust the starting position of the next task pos += step + 1; } //After all subtasks are completed, consolidate the calculation results for (CountTask task : subTaskList) { sum += task.join(); //join() will wait until the subtask is completed and return the execution result } } return sum; } } public static void main(String[] args) { //Create ForkJoinPool thread pool ForkJoinPool forkJoinPool = new ForkJoinPool(); //Create a big task CountTask task = new CountTask(0L, 200000L); //Submit large tasks to the thread pool ForkJoinTask<Long> result = forkJoinPool.submit(task); try{ Long res = result.get(); //Call the get() method of the task to return the result System.out.println("The calculated sequence result is:" + res); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } //verification long s = 0L; for (long i = 0; i <= 200000 ; i++) { s += i; } System.out.println("The inspection results are:"+s); } }
result: