Super detailed thread pool usage resolution

preface

Java thread pool is the most used concurrent framework. Almost all programs that need to execute tasks asynchronously or concurrently can use thread pool. Rational use of thread pool can bring several benefits:

(1) reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing the created threads.

(2) improve the response speed. When processing and executing a task, the task can be executed immediately without waiting for the creation of a thread.

(3) improve the manageability of threads. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. Using thread pool can be used for unified allocation, tuning and monitoring.

1. Implementation principle of thread pool

The processing flow of thread pool is shown in the figure above

In the thread pool, the ctl field is used to represent the current state in the thread pool. The main pool control state ctl is of AtomicInteger type, which encapsulates two conceptual fields: workerCount and runState. workerCount represents the number of effective threads, and runState indicates whether it is running or closing. The ctl field is used to represent two concepts. The first three bits of the ctl represent the thread pool status. In the thread pool, the workerCount is limited to (2 ^ 29) - 1 (about 500 million) threads instead of (2 ^ 31) - 1 (2 billion) threads. workerCount is the number of working programs that are allowed to start and not allowed to stop. This value may be temporarily different from the actual number of active threads, for example, when ThreadFactory fails to create a thread when asked, and when the exiting thread is still executing timing before termination. The pool size visible to the user is reported as the current size of the working set. runState provides the main life cycle control. The values are shown in the following table:

Field namemeaning
RUNNINGAccept new tasks and handle queued tasks
SHUTDOWNDo not accept new tasks, but process queued tasks
STOPDo not accept new tasks, do not process queued tasks, and interrupt ongoing tasks
TIDYINGAll tasks have been terminated, and the workerCount is zero. The thread transitioning to the state TIDYING will run the terminate() method
TERMINATEDThe execute of the terminate() method is complete

The runState changes over time, and the thread waiting in the awaitTermination() method will return when the state reaches TERMINATED. The state transition is:

Running - > shutdown may be implied in finalize() when shutdown() is called

(RUNNING or shutdown) - > stop when calling shutdown now()

Shutdown - > tidying when both queue and thread pool are empty

Stop - > tidying when the thread pool is empty

Tidying - > terminated when the terminate() method is completed

If developers need to handle the process pool when it changes to TIDYING state, they can overload the terminated() function.

The thread pool ThreadPoolExecutor execution process is described in combination with the above figure. The task submitted to the thread pool using the execute() method can be executed in four scenarios:

(1) if the number of threads running in the thread pool is less than the corePoolSize, create a new thread to execute the task.

(2) the number of running threads in the thread pool shall not be less than corePoolSize, and the task shall be added to the blocking queue BlockingQueue.

(3) if the task cannot be added to the blocking queue (the queue is full), create a new thread to process the task (you need to obtain the global lock here).

(4) when the number of new threads is created so that the number of currently running threads in the thread pool exceeds maximumPoolSize, the task is rejected in the thread pool and rejectedexecutionhandler is called Rejectedexecution() method.

Source code analysis:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  //If the number of threads is less than the basic number of threads, create a thread to execute
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  //If the number of threads is not less than the basic number of threads, add the task to the queue
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  //If the queue is full, create a new thread to process
  else if (!addWorker(command, false))
    //Execute reject policy
    reject(command);
}	
Copy code

When the thread pool creates a thread, it will encapsulate the thread into a Worker thread. After executing the task, the Worker will cycle to obtain the task in the work queue for execution.

2. Creation and use of thread pool

Create thread pool

Before creating a thread pool, you must first know the core parameters in creating a thread pool:

corePoolSize: when submitting a task to the thread pool, the thread pool will create a thread to execute the task. Even if other idle basic threads can execute new tasks, the thread will be created until the number of tasks to be executed is greater than the number of core threads.

Runnabletask queue: used to save the blocking queue of tasks waiting to be executed. Generally, the following are selected:

ArrayBlockingQueue: an array based bounded blocking queue that sorts elements according to FIFO principles.

LinkedBlockingQueue: a blocking queue based on linked list, which sorts elements according to FIFO principles.

Synchronous queue: synchronous blocking queue, which is also a blocking queue that does not store elements. Each insert operation must wait until another thread calls the remove operation, otherwise the insert operation will always be blocked.

PriorityBlockingQueue: priority blocking queue, an infinite blocking queue with priority.

Maximum poolsize: the maximum number of threads allowed to be created in the thread pool. When the queue is full and the number of threads in the thread pool is less than the maximum number of threads, the thread pool will create new threads to execute tasks. This parameter is useless when using unbounded queues.

RejectedExecutionHandler (reject Policy): when the task queue and thread pool are full, it indicates that the thread pool is saturated, so the reject policy must be used to process the newly submitted task. There are four built-in rejection policies in JDK:

AbortPolicy: throw an exception directly

CallerRunsPolicy: uses the thread of the caller to execute the task

DiscardOldestPolicy: discards the latest task in the queue to execute the current task

DiscardPolicy: discard directly without processing

You can implement the RejectedExecutionHandler interface and customize the processing policy according to the application scenario.

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
Copy code

keepAliveTime: the time that the worker thread of the thread pool remains alive after it is idle.

TimeUnit: optional units: DAYS, HOURS, MINUTES, MILLISECONDS, MICROSECONDS, NANOSECONDS.

ThreadFactory: you can set a meaningful name for the created thread through the thread factory.

Creating a thread pool is mainly divided into two categories. The first is to create a thread pool through the Executors factory class, and the second is to create a custom thread pool. According to the specifications in Alibaba java development manual, Executors are not allowed to create thread pools because they avoid the risk of resource depletion.

Create using Executors factory class

Create a singleton thread pool

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}
Copy code

Create a thread pool with a fixed number of threads

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
Copy code

The above two methods of creating thread pools use the linked list blocking queue to store tasks. In the actual scenario, a large number of requests may accumulate, resulting in OOM

Create a cacheable thread pool

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}
Copy code

The maximum number of threads allowed to be created is integer MAX_ Value, when a large number of threads are created, the CPU will be in a heavy load state and OOM will occur

Custom create thread pool

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
}
Copy code

Submit task to thread pool

Two methods can be used to submit tasks to the thread pool: execute() and submit().

The execute() method is used to submit a task that does not need a return value, so it is impossible to judge whether the task is successfully executed by the thread pool. What is passed in the execute() method is an instance of the Runnable class.

public static void main(String [] args){
  ...
  threadPool.execute(new Runnable{
  public void run(){
    //do something...
  }
});
  ...
}
Copy code

The submit() method is used to submit the task that needs the return value. The thread pool will return an object of type future. You can judge whether the task is executed successfully through the future object, and you can get the return value through the get() method of future. The get() method will block the current thread until the task is completed. Using the get (long timeout, timeunit) method will block the current thread and return immediately after a period of time. At this time, the task may not be completed.

public static void main(String [] args){
  ...
  Future<Object> future = threadPool.submit(handleTask);
  try{
  	Objects res = future.get();
  }catch(InterruptedException e){
  	//Handling interrupt exceptions
  }catch(ExecutionException e){
  	//Handling an executable exception
  }finally{
  	threadPool.shutdown();
  }
  ...
}
Copy code

Close thread pool

You can close the thread pool by calling the shutdown() or shutdown now () methods of the thread pool. Their principle is to traverse the working threads in the thread pool, and then call the interrupt() method one by one to interrupt the threads, so the interrupt cannot be responded to, and the task may never be terminated.

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(SHUTDOWN);
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
}

public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    interruptWorkers();
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}
Copy code

The difference between shutdown() and shutdown now () methods is that the shutdown now method first sets the state of the thread pool to STOP, then attempts to STOP the thread executing or pausing the task, and returns the list of tasks waiting to be executed. Shutdown just sets the state of the thread pool to shutdown, and then interrupts all threads that are not executing the task.

//shutdownNow()
private void interruptWorkers() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers)
      w.interruptIfStarted();
  } finally {
    mainLock.unlock();
  }
}
...
void interruptIfStarted() {
  Thread t;
  if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
    try {
      t.interrupt();
    } catch (SecurityException ignore) {
    }
  }
}
Copy code
//shutdown()
private void interruptIdleWorkers() {
  interruptIdleWorkers(false);
}
...
  private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}
Copy code

3. Recommended thread pool parameter settings

The core problem faced by the use of thread pool is that the parameters of thread pool are not easy to configure. On the one hand, the operation mechanism of thread pool is not well understood, and the reasonable configuration needs to rely heavily on the personal experience and knowledge of developers; On the other hand, the execution of thread pool is highly related to the type of task, and the operation of IO intensive and CPU intensive tasks is very different, which leads to the lack of some mature experience strategies in the industry to help developers refer to.

(1) Simple assessment based on task type:

Assume the setting of thread pool size (N is the number of CPU s)

For purely computational tasks, multithreading does not bring performance improvement, because CPU processing capacity is a scarce resource. On the contrary, it leads to more thread switching costs. At this time, the recommended number of threads is the number of CPUs or + 1---- Why + 1? Because it can prevent one of the N threads from being interrupted or exited unexpectedly, the CPU will not wait idle.

For IO intensive applications, the thread pool size is set to 2N+1 Number of threads = number of CPU cores × Target CPU utilization × (1 + average waiting time / average working time)

(2) Ideal state assessment based on the number of tasks:

1) default value

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
Copy code

2) how to set * needs to be determined according to relevant values - tasks: the number of tasks per second, assumed to be 500~1000 - taskCost: the time spent on each task, assumed to be 0.1s - responsetime: the maximum response time allowed by the system, assumed to be 1s

  • Calculation acquisition
    • corePoolSize = how many threads per second?

      • threadcount = tasks / ( 1 / taskCost ) =tasks*taskcout = (5001000)x0.1 = 50100 threads. The corePoolSize setting should be greater than 50
      • According to the 8020 principle, if 80% of the tasks per second are less than 800, then the corePoolSize can be set to 80
    • queueCapacity = ( coreSizePool / taskCost ) * responsetime

      • It can be calculated that queueCapacity = 80/0.1*1 = 800. It means that the thread in the queue can wait for 1s. If it exceeds, a new thread needs to be opened to execute
      • Remember that cannot be set to integer MAX_ Value. In this way, the queue will be large, and the number of threads will only remain at the size of corePoolSize. When the task increases sharply, new threads cannot be opened to execute, and the response time will increase sharply.
    • maxPoolSize = (max(tasks) - queueCapacity) / ( 1 / taskCost)

      • Calculated maxPoolSize = (1000-800)/10 = 20 (+ 50)
      • (maximum number of tasks - queue capacity) / processing capacity per second per thread = maximum number of threads
    • rejectedExecutionHandler: according to the specific situation, the task is not important and can be discarded. If the task is important, some buffer mechanisms should be used to process it

    • keepAliveTime and allowCoreThreadTimeout are usually satisfied by default

The above are ideal values, which should be determined according to the performance of the machine. If the cpu load of the machine is full when the maximum number of threads is not reached, you need to upgrade the hardware and optimize the code to reduce taskCost.

(it is only a simple evaluation of the ideal state and can be used as a reference for thread pool parameter setting)

4. Thread pool usage scenario recommendation

Scene 1

The slave business that has no direct data dependence on the master business can be processed using the asynchronous thread pool. When the project is initialized, the thread pool is created and handed over to the asynchronous thread pool to submit the tasks in the slave business for execution, which can shorten the response time.

/**
*  Create a thread pool and use asynchronous annotation to call
*/
@Bean
public ThreadPoolTaskExecutor asyncExecutorPool() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  /**Configure the number of core threads*/
  executor.setCorePoolSize(20);
  /**Configure the maximum number of threads*/
  executor.setMaxPoolSize(100);
  /**Configure queue size*/
  executor.setQueueCapacity(500);
  /**Waiting for the task to complete at shutdown -- indicates waiting for all threads to complete execution*/
  executor.setWaitForTasksToCompleteOnShutdown(true);
  /** The waiting time (0 by default, which will stop immediately) is not forced to stop after xx seconds*/
  executor.setAwaitTerminationSeconds(60);
  /**Configure the name prefix of threads in the thread pool*/
  executor.setThreadNamePrefix("test-async-thread-");
  /**rejection-policy: How to handle new tasks when the pool has reached max size (CALLER_RUNS: the task is not executed in the new thread, but in the thread of the caller)*/
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  /**Initialize actuator*/
  executor.initialize();

  return executor;
}

//Use @ Async asynchronous annotation
@Async("asyncExecutorPool")
public void processTask1() {
  //doSomething
}
Copy code

It is forbidden to thread in business code!!!

/**
*  Create thread pool for direct use
*/
public class ThreadPoolExecutorTest{
  
  private ThreadPoolExecutor executor;
  
  @PostConstruct
  public void init() {
    /** Thread pool initialization */
    ThreadPoolExecutor threadPoolExecutor = 
      new ThreadPoolExecutor(30, 60, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
    threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    ThreadFactory threadFactory = new CustomizableThreadFactory("test-creates-thread-");
    threadPoolExecutor.setThreadFactory(threadFactory);
    this.executor = threadPoolExecutor;
  }
  ...
   public void processTask(){
    Future<> future = executor.submit(
    //doSomething...
    );
  }
  
}
Copy code

Scene 2

When tasks need to be executed in the specified order (FIFO, LIFO, priority), it is recommended to create a thread pool using singleton.

public class SingleExecutorTest{
  
  private HashMap<Long,ThreadPoolExecutor> executorMap = new HashMap<>();
  ...
  public void init() {
  /** Thread pool initialization */
    for (int i = 0; i < 5; i++) {
      /**Task queue capacity: 1000*/
      ThreadPoolExecutor threadPoolExecutor = 
        new ThreadPoolExecutor(1, 1,0, 
                               TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
      /**Reject strategy: discard silently without throwing exceptions*/
      threadPoolExecutor.setRejectedExecutionHandler(
        new ThreadPoolExecutor.DiscardPolicy());
      ThreadFactory threadFactory = new CustomizableThreadFactory("testSingle-"+ i +"-");
      threadPoolExecutor.setThreadFactory(threadFactory);
      executorMap.put(Long.valueOf(i),threadPoolExecutor);
    }
	}
  ...
  /** Tasks that need to be performed in sequence */ 
  public void processTask(){
    ...
    /** Get single thread pool */
    ThreadPoolExecutor executor = executorMap.get(Long.valueOf(id % 5));
    /** Submit tasks to the thread pool */
    excutor.submit(
      //doSomething...
    );
  }
}
Copy code

summary

This article mainly explains the execution principle and creation method of thread pool, as well as the recommended thread pool parameter settings and general usage scenarios. In development, developers need to reasonably create and use thread pools according to business to reduce resource consumption and improve response speed.


Original link: https://juejin.cn/post/7067324722811240479
 

Keywords: Multithreading Programmer thread pool

Added by wilbur on Wed, 23 Feb 2022 13:50:37 +0200