An article to understand ThreadPoolExecutor

preface

This article belongs to the column "100 problems to solve Java concurrency". This column is original by the author. Please indicate the source of quotation. Please help point out the deficiencies and errors in the comment area. Thank you!

Please refer to table of contents and references for this column 100 problems to solve Java concurrency

text

Constructor

Let's take a look at the most important constructor of the ThreadPoolExecutor class:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

Function parameters have the following meanings.

  1. corePoolSize: Specifies the number of threads in the thread pool.
  2. maximumPoolSize: Specifies the maximum number of threads in the thread pool.
  3. keepAliveTime: when the number of threads in the thread pool exceeds the corePoolSize, the survival time of redundant idle threads, that is, how long idle threads exceeding the corePoolSize will be destroyed.
  4. Unit: the unit of keepalivetime.
  5. workQueue: task queue, a task submitted but not yet executed.
  6. threadFactory: thread factory, which is used to create threads. Generally, the default is OK.
  7. handler: reject policy. How to reject a task when there are too many tasks to handle.

Most of the above parameters are very simple. Only the parameters workQueue and handler need to be described in detail.

workQueue

The parameter workQueue refers to the task queue submitted but not executed. It is an object of BlockingQueue interface and is only used to store Runnable objects.

According to the classification of queue function, the following Blockingqueue interfaces can be used in the constructor of ThreadPoolExecutor class.

Queue for direct submission

This functionality is provided by the Synchronousqueue object.

Synchronous queue is a special Blockingqueue.

The Synchronousqueue has no capacity. Each insert operation must wait for a corresponding delete operation. On the contrary, each delete operation must wait for a corresponding insert operation.

If Synchronousqueue is used, the submitted task will not be truly saved, but new tasks will always be submitted to the thread for execution. If there are no idle threads, try to create a new thread. If the number of threads has reached the maximum, execute the rejection policy.

Therefore, when using the Synchronousqueue queue, it is usually necessary to set a large maximumPoolSize value, otherwise it is easy to implement the rejection policy.

Bounded task queue

Bounded task queues can be implemented using the ArrayBlockingQueue class

The constructor of the ArrayBlockingQueue class must take a capacity parameter indicating the maximum capacity of the queue

public ArrayBlockingQueue ( int capacity )

When using a bounded task queue, if a new task needs to be executed, if the actual number of threads in the thread pool is less than corePoolSize, the new thread will be created first. If it is greater than corePoolSize, the new task will be added to the waiting queue.

If the waiting queue is full and cannot be joined, create a new process to execute the task on the premise that the number of bus processes is not greater than maximumPoolSize.

If it is greater than maximumPoolSize, the reject policy is executed.

It can be seen that the bounded queue can only increase the number of threads above the corePoolSize when the task queue is full. In other words, unless the system is very busy, ensure that the number of core threads is maintained at the corePoolSize.

Unbounded task queue

Unbounded task queues can be implemented through the LinkedBlockingqueue class.

Compared with bounded queue, there is no task queue failure in unbounded task queue unless the system resources are exhausted.

When a new task arrives and the number of threads in the system is less than corePoolSize, the thread pool will generate a new thread to execute the task. However, when the number of threads in the system reaches corePoolSize, it will not continue to increase.
If a new task is added and there are no idle thread resources, the task will directly enter the queue and wait.
If the speed of task creation and processing varies greatly, the unbounded queue will keep growing rapidly until the system memory is exhausted. Priority task queue:

Priority task queues are queues with execution priority.

It is implemented through the PriorityBlockingqueue class, which can control the execution order of tasks.

It is a special unbounded queue.

Both the bounded queue ArrayBlockingQueue class and the unbounded queue LinkedQueue class with unspecified size process tasks according to the first in first out algorithm.

The PriorityBlockingqueue class can be executed according to the priority of the task itself. It can not only ensure the system performance, but also have good quality assurance (always ensure that high priority tasks are executed first).

Executors source code implementation

Please refer to my blog—— Executors source code analysis (JDK8)

newFixedThreadPool()

Review the implementation of the newFixedThreadPool() method, which returns a thread pool with the same size of corePoolSize and maximumPoolSize and uses the LinkedBlockingqueue task queue.

Because there is no dynamic change in the number of threads for a fixed size thread pool, the corePoolSize and maximumPoolSize can be equal.

At the same time, it uses an unbounded queue to store tasks that cannot be executed immediately. When tasks are submitted very frequently, the queue may expand rapidly, thus exhausting system resources.

newSingleThreadExecutor()

The singleton thread pool returned by the newsinglethreadexecution () method is a degradation of the newFixedThreadPool() method, which simply sets the number of threads in the thread pool to 1.

newCachedThreadPool()

The newCachedThreadPool() method returns a thread pool with corePoolSize of 0 and maximumPoolSize of infinity, which means that when there is no task, there are no threads in the thread pool. When the task is submitted, the thread pool will use idle threads to execute the task. If there are no idle threads, the task will be added to the SynchronousQueue queue, The synchronous queue queue is a direct submission queue, which always forces the thread pool to add new threads to perform tasks.

After the task is executed, because the corePoolsize is 0, the idle thread will be recycled within the specified time (60 seconds).

For the newCachedThreadPool() method, if a large number of tasks are submitted at the same time and the task execution is not so fast, the system will start the same amount of thread processing, which may quickly exhaust the resources of the system.

Note: when using a custom thread pool, select an appropriate concurrent queue as the task buffer according to the specific situation of the application. When thread resources are tight, different concurrent queues have different effects on system behavior and performance.

Reject policy

The last parameter of the ThreadPoolExecutor class specifies the rejection policy.

That is, when the number of tasks exceeds the actual carrying capacity of the system, the rejection strategy will be used.

Rejection policy can be said to be a remedial measure when the system is overloaded. It is usually caused by too much pressure, that is, the threads in the thread pool have run out and can not continue to serve new tasks.

At the same time, the waiting queue is full and there are no more new tasks.

At this time, we need a set of mechanisms to deal with this problem reasonably.

The built-in rejection policy of JDK is as follows.

  1. AbortPolicy policy: this policy will directly throw exceptions to prevent the system from working normally.
  2. CallerRunsPolicy policy: as long as the thread pool is not closed, this policy runs the currently discarded tasks directly in the caller thread. Obviously, this will not really discard the task, but the performance of the task submission thread is likely to decline sharply.
  3. DiscardoOldestPolicy: this policy will discard the oldest request, that is, a task to be executed, and try to submit the current task again.
  4. Discard policy: this policy silently discards the tasks that cannot be processed without any processing. If you allow the task to be lost, I think this may be the best solution!

The above built-in policies implement the RejectedExecutionHandler interface. If the above policies still cannot meet the needs of practical applications, you can extend the RejectedExecutionHandler interface yourself.

Here is an example:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectThreadPoolDemo {
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + ":Thread ID:"
					+ Thread.currentThread().getId());
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		MyTask task = new MyTask();
		ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler(){
					@Override
					public void rejectedExecution(Runnable r,
							ThreadPoolExecutor executor) {
						System.out.println(r.toString()+" is discard");
					}
		});
		for (int i = 0; i < Integer.MAX_VALUE; i++) {
			es.submit(task);
			Thread.sleep(10);
		}
	}
}

ThreadFactory

After reading so many introductions about thread pool, I wonder if you have thought about a basic problem:

Where do the threads in the thread pool come from?

The main function of thread pool is to reuse threads, that is, to avoid frequent creation of threads.

But where did the first threads come from? The answer is ThreadFactory.

ThreadFactory is an interface that has only one method for creating threads.

 Thread newThread (Runnable r) ; 

This method is called when the thread pool needs to create a new thread.

Custom thread pools can help us do a lot of things.

For example, we can track when and how many threads are created in the thread pool, customize the name, group and priority of threads, and even set all threads as guardian threads at will.

In short, using a custom thread pool allows us to set the state of all threads in the thread pool more freely.

The following example uses a custom ThreadFactory. On the one hand, it records the creation of threads, and on the other hand, it sets all threads as guardian threads. In this way, when the main thread exits, the thread pool will be forcibly destroyed.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TFThreadPoolDemo {
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + ":Thread ID:"
					+ Thread.currentThread().getId());
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		MyTask task = new MyTask();
		ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory(){
					@Override
					public Thread newThread(Runnable r) {
						Thread t= new Thread(r);
						t.setDaemon(true);
						System.out.println("create "+t);
						return t;
					}
				}
               );
		for (int i = 0; i < 5; i++) {
			es.submit(task);
		}
		Thread.sleep(2000);
	}
}

ThreadPoolExecutor core source code (JDK8)

	/**
     * Perform a given task at some time in the future.
     * Tasks can be executed in a new thread or an existing pool thread.
     * If the task cannot be submitted for execution because this actuator is closed or has reached its capacity, the task will be executed by the current
     * RejectedExecutionHandler handle.
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * There are three steps:
         * 1.If fewer threads are running than corePoolSize, try starting a new thread task pair with the given command as the first thread
         * addWorker The call of checks the running state and workerCount atomically, so as to prevent possible increase in the situation that should not be
         * In this case, the thread is executed by returning false.
         * 2.If the task can be queued successfully, we still need to check again whether the thread should be added (because there are some threads since the last check)	
         * The surviving thread may have hung) or the thread pool has been closed since entering this method. So we re check the status and, if necessary, in
         * Roll back to the queued phase if it is stopped, or if there are no threads, start a new thread.
         * 3.If we cannot queue the task, we will try to add a new task. If it fails, we know that we have been closed
         * Or saturated, so refuse the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

The workerCountOf function in the code gets the total number of threads in the current thread pool.

When the total number of threads is less than the number of corePoolSize core threads, the task will be directly scheduled and executed through the addWorker method.

Otherwise, workQueue.offer enters the waiting queue.

If it fails to enter the waiting queue (for example, the bounded queue reaches the upper limit, or the SynchronousQueue class is used), the task will be submitted directly to the thread pool.

If the current number of threads has reached the maximum pool size, the submission fails and the reject policy is executed.

ThreadPoolExecutor core process

Keywords: Java Concurrent Programming thread pool

Added by UQ13A on Tue, 09 Nov 2021 20:45:51 +0200