Thread pool from design idea to source code interpretation

Let's talk about thread pool today, from design idea to source code analysis.

preface

Dear friends, the Spring Festival is over. Here is a late new year's call for a spring festival holiday. I hope readers and friends can have a harvest. More likes, comments and collections!

First knowledge of thread pool

We know that the creation and destruction of threads need to be mapped to the operating system, so the cost is relatively high. In order to avoid frequent creation and destruction of threads and facilitate thread management, thread pool came into being.

Thread pool advantage

  • "Reduce resource consumption": the thread pool usually maintains some threads (the number is "corePoolSize"), which are reused to perform different tasks and will not be destroyed after the task is completed. When there are a large number of tasks to be processed, through the reuse of thread resources, the frequent creation and destruction of threads are avoided, so as to reduce the consumption of system resources.
  • "Improve response speed": since the thread pool maintains a number of "live" threads, when a task arrives, it is not necessary to create another thread, but these threads directly execute the task, thus reducing the waiting time of the task.
  • "Improve thread manageability": thread pool can be used to uniformly allocate, tune and monitor threads.

Thread pool design idea

There is a saying that art comes from life, and so does programming language. Many design ideas can be mapped to daily life, such as object-oriented thought, encapsulation, inheritance, and so on. Today we will talk about thread pool, which can also find the corresponding entity factory in the real world.

Imagine the production process of a factory:

Thread pool design idea

There is a fixed group of workers in the factory, called formal workers, who complete the orders received by the factory. When orders increase and regular workers are already too busy, the factory will temporarily pile up production raw materials in the warehouse and process them when there are idle workers (because workers will not actively process production tasks in the warehouse when they are idle, so the dispatcher needs to dispatch them in real time). After the warehouse is full, what if the orders are still increasing? The factory can only temporarily recruit a group of workers to cope with the production peak, and these workers will be dismissed after the peak, so they are called temporary workers. At that time, after the temporary workers were fully recruited (limited by the station limit, there was an upper limit on the number of temporary workers), the subsequent orders could only be reluctantly rejected.

We do the following mapping:

  • Factory - thread pool
  • Order - task (Runnable)
  • Regular worker - core thread
  • Temporary worker - normal thread
  • Warehouse - task queue
  • Dispatcher - getTask()

getTask() is a method to schedule tasks in the task queue to idle threads. It is described in detail in interpreting the thread pool

After mapping, a thread pool is formed. The flow chart is as follows. Are they similar?

Thread pool flowchart

In this way, the working principle or process of thread pool can be well understood and refined into a diagram:

How thread pools work

Deep thread pool

Then, the question comes. How does the thread pool implement this working mechanism? From the Java thread pool Executor framework, we can see that the real implementation class of thread pool is ThreadPoolExecutor, so we will focus on this class next.

Thread pool working mechanism

Construction method

To study a class, start with its construction method. ThreadPoolExecutor provides four parameterized construction methods:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

Explain the parameters involved in the construction method:

  • "corePoolSize" (required): number of core threads. That is, the number of threads that remain alive in the pool, even if they are idle. However, when the allowCoreThreadTimeOut parameter is set to true, the core thread will also be recycled if it is idle for more than a period of time.
  • "maximumPoolSize" (required): the maximum number of threads allowed in the pool. When all core threads are busy and the task queue is full, the thread pool will temporarily add threads until the number of bus processes reaches the maximum poolsize.
  • "keepAliveTime" (required): thread idle timeout. When a non core thread is idle for more than this time, the thread will be recycled. When the allowCoreThreadTimeOut parameter is set to true, the core thread will also be recycled.
  • Unit (required): the time unit of the keepAliveTime parameter. Yes: timeunit Days, timeunit Hours, timeunit Minutes, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, timeunit Microseconds, timeunit Nanoseconds (nanoseconds)
  • "workQueue" (required): task queue, implemented by blocking queue. When all the core threads are busy, the subsequent Runnable submitted by the execute method will be stored in the task queue and wait to be processed by the thread.
  • "threadFactory" (optional): thread factory. Specifies how threads are created by the thread pool.
  • "Handler" (optional): reject policy. When the number of threads in the thread pool reaches maximumPoolSize and the workQueue is full, subsequent submitted tasks will be rejected. The handler can specify how to reject tasks.

Put it together and have a look:

Factory and thread pool

Task queue

To use ThreadPoolExecutor, you need to specify a task waiting queue that implements the BlockingQueue interface. In the API document of ThreadPoolExecutor thread pool, three kinds of waiting queues are recommended: synchronous queue, LinkedBlockingQueue and ArrayBlockingQueue;

  1. SynchronousQueue: synchronization queue. This is a blocking queue without any internal capacity. The element of any insertion operation must wait for the relative delete / read operation, otherwise the thread performing the insertion operation must wait all the time, and vice versa.
  2. "LinkedBlockingQueue": unbounded queue (strictly speaking, it is not unbounded, and the upper limit is Integer.MAX_VALUE), which is based on the linked list structure. After using unbounded queue, when the core threads are busy, subsequent tasks can join the queue indefinitely, so the number of threads in the thread pool will not exceed the number of core threads. This queue can improve thread pool throughput, but at the expense of memory space, and even lead to memory overflow. In addition, the capacity can be specified when using it, so it is a bounded queue.
  3. "ArrayBlockingQueue": bounded queue, implemented based on array. When the thread pool is initialized, the capacity of the queue is specified and cannot be adjusted later. This bounded queue helps to prevent resource depletion, but may be more difficult to adjust and control.

In addition, Java provides four other queues:

  1. "PriorityBlockingQueue": an unbounded blocking queue that supports priority sorting. The elements stored in PriorityBlockingQueue must implement the Comparable interface so that they can be sorted by implementing the compareTo() method. The element with the highest priority will always be at the head of the queue; PriorityBlockingQueue does not guarantee the sorting of elements with the same priority, nor does it guarantee that the elements in the current queue other than the elements with the highest priority are in the correct sorting position at any time.
  2. "DelayQueue": delay queue. Based on binary heap, it has the characteristics of unbounded queue, blocking queue and priority queue. The objects stored in the DelayQueue delay queue must be class objects that implement the Delayed interface. The task is extracted from the queue through the execution delay. The task cannot be retrieved before the time is up. For more information, see DelayQueue: Interviewer: talk about the principle and usage of blocking delay queue in Java.
  3. "Linked blocking deque": double ended queue. Based on the linked list implementation, you can insert / take out elements from the tail and insert / take out elements from the head.
  4. "LinkedTransferQueue": an unbounded blocking queue composed of a linked list structure. When the queue is special, a preemption mode is adopted, which means that when the consumer thread fetches elements, if the queue is not empty, the data will be directly fetched. If the queue is empty, a node (node element is null) will be generated to join the queue, and then the consumer thread will be waiting on this node. When the producer thread joins the queue, it will find a node with null element, The producer thread will not join the queue. It will directly fill the element into the node, wake up the waiting thread of the node, and the awakened consumer thread will take away the element.

Reject policy

Thread pool has an important mechanism: reject policy. When the thread pool workQueue is full and a new thread pool cannot be created, subsequent tasks will be rejected. The rejection policy needs to implement the RejectedExecutionHandler interface, but the Executors framework has implemented four rejection policies for us:

  1. AbortPolicy (default): discards the task and throws RejectedExecutionException.
  2. "CallerRunsPolicy": the run method that directly runs the task is not handled by the thread of the thread pool, but by the calling thread of the task.
  3. "DiscardPolicy": directly discard the task without throwing any exceptions.
  4. "DiscardOldestPolicy": forcibly take out the waiting task currently at the head of the waiting queue, and then try to submit the currently rejected task to the thread pool for execution.

The thread factory specifies the method of creating threads. This parameter is not required. The Executors class has kindly provided us with a default thread factory:

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

Thread pool status

Thread pools have five states:

volatile int runState;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

runState indicates the state of the current thread pool. It is a volatile variable to ensure the visibility between threads.

The following static final variables represent several possible values of runState, including the following states:

  • "RUNNING": after creating a thread pool, the thread pool is in RUNNING status at the beginning;
  • "SHUTDOWN": if the shutdown() method is called, the thread pool is in the SHUTDOWN state. At this time, the thread pool cannot accept new tasks, and it will wait for all tasks to be executed;
  • "STOP": if the shutdown now() method is called, the thread pool is in the STOP state. At this time, the thread pool cannot accept new tasks and will try to terminate the executing tasks;
  • "TERMINATED": when the thread pool is in SHUTDOWN or STOP state, all working threads have been destroyed, the task cache queue has been emptied or the execution is completed, the thread pool is set to TERMINATED state.

Initialization & capacity adjustment & shutdown

"1. Thread initialization"

By default, after the thread pool is created, there are no threads in the thread pool, and the threads will not be created until the task is submitted.

In practice, if you need to create threads immediately after the thread pool is created, you can do it in the following two ways:

  • "prestartCoreThread()": boolean prestartCoreThread(), initializing a core thread
  • "prestartAllCoreThreads()": int prestartAllCoreThreads(), initializes all core threads and returns the number of initialized threads
public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //Note that the parameter passed in is null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//Note that the parameter passed in is null
        ++n;
    return n;
}

"2. Thread pool shutdown"

ThreadPoolExecutor provides two methods for closing the thread pool:

  • "shutdown()": the thread pool will not be terminated immediately, but will not be terminated until all tasks in the task cache queue have been executed, but will never accept new tasks
  • "shutdownNow()": immediately terminate the thread pool, try to interrupt the executing task, empty the task cache queue, and return the unexecuted task

"3. Thread pool capacity adjustment"

ThreadPoolExecutor provides a method to dynamically adjust the size of thread pool capacity:

  • "setCorePoolSize": sets the size of the core pool
  • "setMaximumPoolSize": sets the maximum number and size of threads that can be created in the thread pool

When the above parameters increase from small to large, ThreadPoolExecutor assigns a thread value, and may immediately create a new thread to execute the task.

Use thread pool

ThreadPoolExecutor

Using ThreadPoolExecutor through construction method is the most direct way to use thread pool. Here is an example:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyTest {
 public static void main(String[] args) {
  //Create thread pool
  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(5));
  //Submit task to thread pool
  for (int i = 0; i < threadPool.getCorePoolSize(); i++) {
   threadPool.execute(new Runnable() {
    @Override
    public void run() {
     for (int x = 0; x < 2; x++) {
      System.out.println(Thread.currentThread().getName() + ":" + x);
      try {
       Thread.sleep(2000);
      } catch (InterruptedException e) {
       e.printStackTrace();
      }
     }
    }
   });
  }

  //Close thread pool
  threadPool.shutdown(); //Set the status of the thread pool to SHUTDOWN, and then interrupt all threads that are not executing tasks
  // threadPool.shutdownNow(); / / set the status of the thread pool to STOP, then try to STOP all threads executing or pausing tasks, and return the list of tasks waiting to be executed. This method should be used with caution, which may cause uncontrollable consequences
 }
}

Operation results:

pool-1-thread-2:0
pool-1-thread-1:0
pool-1-thread-3:0
pool-1-thread-2:1
pool-1-thread-3:1
pool-1-thread-1:1

Executors encapsulate thread pools

In addition, Executors encapsulates four common functions: thread pools (still so intimate):

「1,FixedThreadPool」

Fixed capacity thread pool. Its characteristic is that the maximum number of threads is the number of core threads, which means that the thread pool can only create core threads, and keepAliveTime is 0, that is, the thread will be recycled immediately after executing the task. No capacity is specified for the task queue, which means the default value integer MAX_ VALUE. It is suitable for scenarios where concurrent threads need to be controlled.

//Use default thread factory
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
//Custom thread factory required
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

Use example:

// 1. Create a thread pool object and set the number of core threads and maximum threads to 5
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 2. Create Runnable (task)
Runnable task =new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->function");
  }
};
// 3. Submit task to thread pool
fixedThreadPool.execute(task);

「2, SingleThreadExecutor」

Single thread pool. The feature is that there is only one thread (core thread) in the thread pool. The thread is recycled immediately after executing the task, and the bounded blocking queue is used (the capacity is not specified, and the default value is Integer.MAX_VALUE)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
//In order to save space, the source code of custom thread factory is omitted

Use example:

// 1. Create a single thread pool
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. Create Runnable (task)
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->function");
  }
};
// 3. Submit task to thread pool
singleThreadExecutor.execute(task);

「3, ScheduledThreadPool」

Timed thread pool. Specify the number of core threads. The number of ordinary threads is unlimited. The thread will be recycled immediately after executing the task. The task queue is a delay blocking queue. This is a special thread pool, which is suitable for "executing timed or periodic tasks".

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//Inherited ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
        implements ScheduledExecutorService {
    //Constructor, omitting the constructor of the custom thread factory
 public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
           new DelayedWorkQueue());
 }
 
 //Delay task execution
 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        ...
    }
 //Perform tasks regularly
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {...}
}

Use example:

// 1. Create timed thread pool
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. Create Runnable (task)
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->function");
  }
};
// 3. Submit task to thread pool
scheduledThreadPool.schedule(task, 2, TimeUnit.SECONDS); //Execute task after 2s delay
scheduledThreadPool.scheduleAtFixedRate(task,50,2000,TimeUnit.MILLISECONDS);//Perform tasks every 2000ms after a delay of 50ms

「4,CachedThreadPool」

Cache thread pool. There is no core thread, and the number of ordinary threads is integer MAX_ Value (which can be understood as infinite). The thread is recovered after it is idle for 60s. The task queue uses the synchronization queue with no capacity. It is applicable to the scenario of "large amount of tasks but low time-consuming".

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Use example:

// 1. Create cache thread pool
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. Create Runnable (task)
Runnable task = new Runnable(){
  public void run() {
     System.out.println(Thread.currentThread().getName() + "--->function");
  }
};
// 3. Submit task to thread pool
cachedThreadPool.execute(task);

Interpretation of thread pool

OK, I believe the previous content is easy and pleasant to read, so you will enter the deep water area from here. If you can understand the later content thoroughly, you will really master the knowledge of thread pool.

We know that the execute() method of ThreadPoolExecutor is used to submit tasks to the Thread pool, but the processing of Thread tasks is quite complex, involving six methods of ThreadPoolExecutor, Worker and Thread:

Submit task to thread pool

execute()

In the ThreadPoolExecutor class, the entrance of the task submission is execute(Runnable command) method (submit() method also calls execute ()). This method is actually trying to do one thing: after various checks, addWorker(Runnable command,boolean core) method is called to create a thread for the thread pool and perform the task. There are two results of execute():

Parameter Description:

  1. "Runnable command": tasks to be executed

"Execution process:"

1. Through CTL Get() gets the current number of threads in the thread pool. If the number of threads is less than corePoolSize, call the {addworker (common, true) method to create a new thread to execute the task. Otherwise, execute step 2;

2. If step 1 fails, it means that no new thread can be created. Then consider putting the task into the blocking queue and waiting for the thread executing the task to process it. Based on this, judge whether the thread pool is in Running status (only the thread pool in Running status can accept new tasks). If the task is added to the task queue successfully, go to step 3; if it fails, go to step 4;

3. When you come to this step, you need to explain that the task has been added to the task queue. At this time, you need to verify the status of the thread pool again. There are the following situations:

  • The thread pool is no longer in Running status. You need to remove the task from the task queue. If the removal is successful, the task will be rejected
  • If the thread pool is in Running status, judge whether the working thread of the thread pool is 0. If yes, call addworker (common, true) to add a thread without an initial task (this thread will get and execute the current task that has been added to the task queue). Otherwise, enter step 4;
  • The thread pool is not in Running status, but it failed to remove the task from the task queue (it may have been obtained by a thread?), Go to step 4;

4. Expand the thread pool to maximumPoolSize and call the addworker (common, false) method to create a new thread to execute the task. If it fails, the task will be rejected.

Flow chart:

Create a new thread to execute the task

"Detailed reading of source code:"

/**
 * Perform a given task at some time in the future. Tasks can be executed in a new thread or in an existing pool thread.
 * If the task cannot be submitted for execution because this actuator is closed or has reached its capacity, the current {@ code {RejectedExecutionHandler} processes the task.
 * 
 * @param command the task to execute  Task command to be executed
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 
     * 1. If fewer threads are running than corePoolSize, it attempts to start a new thread with the given command as the first task.
     *
     * 2. If a task can be queued successfully, we still need to carefully check two points. First, should we add a thread
     * (Since the last check, some existing threads have died). Second, the thread pool state has changed to non running state. Therefore, we recheck the status. If the check fails, the listed tasks will be removed. If the check passes and the number of threads in the thread pool is 0, a new thread will be started.
     * 
     * 3. If the task cannot be added to the task queue, expand the thread pool to the limit capacity and try to create a new thread. If it fails, reject the task.
     */
    int c = ctl.get();
   
    //Step 1: judge whether the number of threads in the thread pool is less than the thread pool size
    if (workerCountOf(c) < corePoolSize) {
        //Add a worker thread and add a task. If it is successful, it returns. Otherwise, proceed to step 2
        //true means to use coreSize as the boundary constraint; otherwise, use maximumPoolSize
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //Step 2: if workerCountOf(c) < corepoolsize or addWorker fails, go to step 2
    //Verify whether the thread pool is Running and whether the task is successfully put into the workQueue (blocking queue)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //Verify again. If the thread pool is not Running and the task is successfully removed from the task queue, the task will be rejected
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //If the number of worker threads in the thread pool is 0, create a new thread for an empty task
        else if (workerCountOf(recheck) == 0)
            //If the thread pool is not Running, it cannot be added
            addWorker(null, false);
    }
    //Step 3: if the thread pool is not Running or the task listing fails, try to expand maxPoolSize and add worker again. If it fails, reject the task
    else if (!addWorker(command, false))
        reject(command);
}

addWorker()

addWorker(Runnable firstTask, boolean core) method, as the name suggests, adds a worker thread with a task to the thread pool.

Parameter Description:

  1. "Runnable firstTask": the task that the newly created thread should run first (if not, it is empty).
  2. "boolean core": this parameter determines the constraint condition of thread pool capacity, that is, what is the limit value of the current number of threads. If the parameter is true, corePollSize is used as the constraint value; otherwise, maximumPoolSize is used.

"Execution process:"

1. The outer loop determines whether the state of the thread pool can add new worker threads. This layer of verification is based on the following two principles:

  • When the thread pool is in Running status, you can accept new tasks or process tasks
  • When the thread pool is closed, only the worker with empty tasks can be added to process the tasks in the task queue, and cannot accept new tasks

2. The inner loop adds a worker thread to the thread pool and returns the result of whether the addition is successful.

  • First, check whether the number of threads has exceeded the limit. If yes, return false. Otherwise, go to the next step
  • Make the number of working threads + 1 through CAS. If successful, go to step 3. If failed, verify whether the thread pool is running again. If yes, continue the inner loop, and if not, return to the outer loop

3. Number of core threads + 1 successful subsequent operations: add to the worker thread collection and start the worker thread

  • First, after obtaining the lock, verify the thread pool status again (see code comments for specific verification rules). If it passes, go to the next step. If it fails, adding a thread fails
  • After the thread pool status verification passes, check whether the thread has been started. If yes, an exception will be thrown. Otherwise, try to add the thread to the thread pool
  • Check whether the thread is started successfully. If successful, return true. If failed, enter the addWorkerFailed method

Flow chart:

Add a worker thread with tasks to the thread pool

"Detailed reading of source code:"

private boolean addWorker(Runnable firstTask, boolean core) {
    //Outer loop: judge thread pool status
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 
         * 1.The thread pool is in non Running status (in Running status, you can add core threads or accept tasks)
         * 2.The thread is in shutdown state and firstTask is empty and the queue is not empty
         * 3.If condition 1 is satisfied and condition 2 is not satisfied, false is returned
         * 4.Interpretation of condition 2: when the thread pool is in the shutdown state and the task queue is not empty, you can add a thread of empty tasks to process the tasks in the queue
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

  //Inner loop: add the core thread to the thread pool and return the result of whether the addition is successful
        for (;;) {
            int wc = workerCountOf(c);
            //Verify whether the number of existing threads in the thread pool exceeds the limit:
            // 1. Maximum thread pool CAPACITY
            // 2.corePoolSize or maximumPoolSize (depending on the input core)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            //Make the number of working threads + 1 through CAS operation to jump out of the outer loop
            if (compareAndIncrementWorkerCount(c)) 
                break retry;
            //Thread + 1 failed, reread ctl
            c = ctl.get();   // Re-read ctl
            //If the thread pool state is no longer running at this time, the outer loop is restarted
            if (runStateOf(c) != rs)
                continue retry;
            //Other CAS failed because the number of working threads changed. Continue the inner loop and try to add 1 to the number of CAS threads
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * Number of core threads + 1 successful subsequent operations: add to the worker thread collection and start the worker thread
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //The following code needs to be locked: thread pool master lock
            mainLock.lock(); 
            try {
                //When the thread factory fails to create a thread or closes before acquiring a lock, it exits
                int c = ctl.get();
                int rs = runStateOf(c);

    //Verify again whether the thread pool is running or thread pool shutdown, but the thread task is empty
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //If the thread has been started, an illegal thread state exception is thrown
                    //Why does this state exist? Unresolved
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); //Join thread pool
                    int s = workers.size();
                    //If the current number of worker threads exceeds the maximum number of threads that have ever occurred in the thread pool, refresh the latter value
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();  //Release lock
            }
            if (workerAdded) { //The worker thread was added successfully. Start the thread
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //If the thread fails to start, it enters addWorkerFailed
        if (! workerStarted) 
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker class

Worker class is an internal class, which not only implements Runnable, but also inherits AbstractQueuedSynchronizer (hereinafter referred to as AQS), so it is both an executable task and a lock effect.

The Worker class mainly maintains the interrupt control status of the running task thread and other secondary records. This class appropriately inherits the AbstractQueuedSynchronizer class to simplify the process of obtaining and releasing locks that act on the execution code of each task. This prevents interrupting a running task, which will only interrupt the thread waiting to get the task from the task queue.

We implemented a simple non reentrant mutex instead of using a reentrant lock, because we don't want the work task to regain the lock when calling pool control methods such as setCorePoolSize. In addition, in order to prohibit interrupts before the thread actually starts running the task, we initialize the lock state to a negative value and clear it at startup (in runWorker).

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
 
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread; 
     
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
     
    /** Per-thread task counter */
    volatile long completedTasks;
 
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    //Initialized by constructor,
    Worker(Runnable firstTask) {
        //Set synchronization status of AQS
        // State: lock state, - 1 is the initial value, 0 is the unlock state, and 1 is the lock state
        setState(-1); //inhibit interrupts until runWorker: disable interrupts before calling runWorker
       
        this.firstTask = firstTask;
        //The thread factory creates a thread
        this.thread = getThreadFactory().newThread(this); 
    }
 
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this); //runWorker() is the method of ThreadPoolExecutor
    }
 
    // Lock methods
    // The value 0 represents the unlocked state. 0 represents "not locked" status
    // The value 1 represents the locked state. 1 stands for "locked" status
 
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
 
    /**
     * Method of trying to acquire lock
     * Override tryAcquire() of AQS, which is implemented by subclasses
     */
    protected boolean tryAcquire(int unused) {
        //It is judged that the original value is 0 and reset to 1, so when the state is - 1, the lock cannot be obtained.
        //It is 0 - > 1 every time, which ensures the non reentrance of the lock
        if (compareAndSetState(0, 1)) {
            //Set exclusiveOwnerThread = current thread
            setExclusiveOwnerThread(Thread.currentThread()); 
            return true;
        }
        return false;
    }
 
    /**
     * Attempt to release the lock
     * Not state-1, but set to 0
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null); 
        setState(0);
        return true;
    }
 
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
 
    /**
     * Interrupt (if running)
     * shutdownNow The loop executes on the worker thread
     * There is no need to obtain the worker lock, and it can be interrupted even when the worker is running
     */
    void interruptIfStarted() {
        Thread t;
        //If state > = 0, t= Null and t not interrupted
        //state==-1 when new Worker(), indicating that interrupt is not allowed
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

runWorker()

It can be said that runWorker(Worker w) is a real method for processing tasks in the thread pool. The previous execute() and addWorker() are preparing and paving the way for this method.

Parameter Description:

  1. "Worker w": the encapsulated worker carries many elements of the working thread, including Runnable (pending tasks), lock (locks), and completedTasks (record the number of completed tasks in the thread pool)

"Execution process:"

1. Judge whether the current task or the task obtained from the task queue is not empty. If both are empty, go to step 2, otherwise go to step 3

2. If the task is empty, set completedAbruptly to false (that is, the thread does not terminate suddenly), and execute the processWorkerExit(w,completedAbruptly) method to enter the thread exit program

3. If the task is not empty, enter the cycle and lock it

4. Judge whether to add an interrupt ID to the thread. If one of the following two conditions is met, add an interrupt ID:

  • Thread pool status > = STOP, i.e. STOP or TERMINATED
  • First, judge the thread pool status < STOP, and then check and find thread Interrupted() is true, that is, the thread has been interrupted. Check whether the thread pool status is > = STOP again (to eliminate the instantaneous shutdown method from taking effect and make the thread pool STOP or TERMINATED)

5. Execute the pre method before execute (WT, task) (this method is empty and implemented by subclasses) and then execute task The run () method executes the task (if the execution is unsuccessful, the corresponding exception will be thrown)

6. After executing the post method {afterexecute (task, throw) (this method is empty and implemented by subclasses), the number of completed tasks in the thread pool will be + 1 and the lock will be released.

7. Judge the cycle condition again.

Flow chart:

Thread pool flowchart

"Detailed reading of source code:"

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // allow interrupts
    //new Worker() is state==-1. Here is to call the tryRelease() method of Worker class and set state to 0. In interruptIfStarted(), only state > = 0 is allowed to call interrupt
    w.unlock(); 
            
    //The reason why the thread exits. true is caused by the task, and false is caused by the normal exit of the thread
    boolean completedAbruptly = true; 
    try {
        //If the current task and the task obtained from the task queue are empty, the cycle will be stopped
        while (task != null || (task = getTask()) != null) {
            //Locking prevents a running worker from being terminated during shutdown(), rather than dealing with concurrency
            w.lock(); 
             
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * Judgment 1: ensure that only when the thread is in the stop state and the wt is not interrupted, the wt will be set with the interrupt flag
             * Condition 1: thread pool status > = STOP, that is, STOP or TERMINATED
             * Condition 2: at first, judge that the thread pool status is < STOP, and then check and find thread Interrupted() is true, that is, the thread has been interrupted. Check whether the thread pool status > = STOP again (to eliminate the instantaneous shutdown method from taking effect and make the thread pool STOP or TERMINATED),
             * If either condition 1 or condition 2 is satisfied and wt is not in the interrupt state, interrupt wt, otherwise enter the next step
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); //The current thread calls interrupt()
             
            try {
                //Before execution (empty method, implemented by subclass override)
                beforeExecute(wt, task);
                 
                Throwable thrown = null;
                try {
                    task.run();
                } 
                catch (RuntimeException x) {
                    thrown = x; throw x;
                } 
                catch (Error x) {
                    thrown = x; throw x;
                } 
                catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } 
                finally {
                    //After execution (empty method, implemented by subclass override)
                    afterExecute(task, thrown); 
                }
            } 
            finally {
                task = null; 
                w.completedTasks++; //Number of completed tasks + 1
                w.unlock(); //Release lock
            }
        }
        // 
        completedAbruptly = false;
    } 
    finally {
        //Handle worker exit
        processWorkerExit(w, completedAbruptly);
    }
}

「5,getTask()」

As can be seen from the function call diagram, in the implementation of ThreadPoolExecutor class, the Runnable getTask() method serves the void runWorker(Worker w) method, and its function is to obtain the task (Runnable) in the task queue (workQueue).

Parameter Description: no parameter

"Execution process":

  1. Set timedOut (whether the last task timed out) to false (the method is executed for the first time, no last time, naturally false), and enter an infinite loop
  2. If the thread pool is in the shutdown state and the task queue is empty (the shutdown state of the thread pool can process tasks in the task queue and will not accept new tasks, which is the key) or the thread pool is in the STOP or TERMINATED state, it means that the thread pool does not need to obtain tasks anymore. The current number of working threads is - 1 and returns null. Otherwise, go to step 3
  3. If the number of thread pools exceeds the limit or the time exceeds the limit and (the task queue is empty or the current number of threads > 1), proceed to step 4, otherwise proceed to step 5.
  4. Remove the worker thread, return null if successful, and enter the next cycle if unsuccessful.
  5. Try to get the task with poll() or take() (which depends on the value of timed). If the task is not empty, return the task. If it is empty, set timeOut to true to enter the next cycle. If an exception occurs in the process of obtaining a task, set {timeOut to false and enter the next cycle.

"Flow chart":

Thread pool flowchart

"Detailed reading of source code:"

private Runnable getTask() {
    //Whether the latest poll timed out
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * Condition 1: thread pool status SHUTDOWN, STOP, TERMINATED
         * Condition 2: thread pool STOP, TERMINATED status or workQueue is empty
         * If both condition 1 and condition 2 are true, then workerCount-1 and null will be returned
         * Note: condition 2 is that the thread pool in SHUTDOWN state will not accept the task, but will still process the task
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /**
         * If either of the following two conditions is met, set the blocking time limit for the worker thread currently trying to obtain the task (timeout will be destroyed? Not sure). Otherwise, the thread can remain active all the time
         * 1.allowCoreThreadTimeOut: Whether the current thread waits for a task with keepAliveTime as the timeout
         * 2.The current number of threads has exceeded the number of core threads
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
        //If both conditions are true, make the number of working threads - 1 through CAS, that is, eliminate the working threads
        //Condition 1: the number of worker threads is greater than maximumPoolSize, or (the worker thread blocking time is limited and the last pull task in the task queue timed out)
        //Condition 2: WC > 1 or task queue is empty
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //Remove the worker thread, return null if successful, and enter the next cycle if unsuccessful
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

     //After execution, it indicates that the task has been verified and started to be obtained
        try {
            //If the worker thread blocking time is limited, use poll(), otherwise use take()
            //poll() sets the blocking time, while take() has no time limit until the result is obtained
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            //If r is not empty, the Runnable is returned
            if (r != null)
                return r;
            //If the Runable cannot be obtained, set the timeout of the most recently obtained task to true
            timedOut = true;
        } catch (InterruptedException retry) {
            //In response to the interrupt, set the timeout status of the most recently obtained task to false before entering the next cycle
            timedOut = false;
        }
    }
}

processWorkerExit()

Processworkerexit (worker W, Boolean completed abortly) is a method for executing thread exit

Parameter Description:

  1. "Worker w": the worker thread to end.
  2. boolean completedAbruptly: whether to complete suddenly (caused by exceptions). If the worker thread dies due to user exceptions, the completedAbruptly parameter is true.

"Execution process:"

1. If {completedAbruptly} is} true, that is, the worker thread suddenly dies because of an exception, the worker thread - 1 operation is executed.

2. After the main thread obtains the lock, the number of tasks completed by the thread pool is added to the number of tasks completed by w (current worker thread), and the current worker is removed from the worker's set collection.

3. Judge whether to execute tryTerminate() to end the thread pool according to the thread pool status.

4. Whether to add worker threads. If the thread pool has not been completely terminated, a certain number of threads still need to be maintained.

  • If the current thread terminates suddenly, addWorker() is called to create the worker thread
  • If the current thread does not terminate suddenly, but the current number of worker threads is less than the number of threads to be maintained in the thread pool, a worker thread is created. The number of threads to maintain is corePoolSize (depending on whether the member variable allowCoreThreadTimeOut is false) or 1.
  • Detailed reading of source code:
/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1.Worker-1 operation
     * 1)If completedAbruptly {is true, it indicates that the worker thread has an exception, and the number of working threads will be - 1
     * 2)If completedAbruptly {is false, it indicates that the worker thread has no tasks to execute, and the worker-1 operation is performed by getTask()
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2. Remove the worker thread from the thread set collection, which requires locking
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //Append the number of tasks completed by the worker to the number of tasks completed by the thread pool
        completedTaskCount += w.completedTasks;
        //Remove the worker from HashSet < worker >
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
 // 3. Judge whether to end the thread pool according to the thread pool status
    tryTerminate();
 
 /**
     * 4.Need to add worker threads
     * The thread pool status is running or shutdown
     * If the current thread terminates suddenly, addWorker()
     * If the current thread does not terminate suddenly, but the number of current threads < the number of threads to maintain, addWorker()
     * Therefore, if the thread pool shutdown() is called until the workQueue is unprecedented, the thread pool will maintain corePoolSize threads, and then gradually destroy these corePoolSize threads
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

Well, that's all about the Java thread pool. How did you get after reading it? If you think it's helpful, just give it a compliment. I wish you a new year, a promotion and a raise!

Keywords: Java thread pool

Added by Trey395 on Mon, 17 Jan 2022 14:57:39 +0200