Interviewer: tell me about thread pool (middle)

Prospect review

In the previous article, we specifically analyzed the submit, invokeAll and invokeAny methods in the abstract parent class AbstractExecutorService of the thread pool through the inheritance relationship of the thread pool. In this article, we will focus on the specific implementation of ThreadPoolExecutor. Through source code analysis, we will understand how the seven parameters operate in the source code.

Usage scenario

Let's first review the business code in the actual scenario. Next, we simulate 10 threads processing tasks in parallel, then stop the thread pool acceptance, and finally wait for the thread pool to close.

public static void main(String[] args) throws InterruptedException {
        // Open thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // Enable parallel processing of 10 tasks
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                // Simulated service code
                try {
                       System.out.println("End of task");
                } catch (InterruptedException e) {
        // Pause thread pool task reception
        // Wait for the end of the thread pool


A total of 4 constructors are overloaded and default parameters are set. This design idea can be used for reference. Only two important constructors are shown below.

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             // Set the default factory. The returned threads in the factory have normal priority and are non daemon threads
             // The default rejection policy is to throw an exception directly when rejection occurs

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
              // Judge parameter boundary
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
              // Setting the security manager is beyond the scope of this article
        this.acc = System.getSecurityManager() == null ?
                null :
                 // Set the number of core threads
        this.corePoolSize = corePoolSize;
              // Set the maximum number of threads
        this.maximumPoolSize = maximumPoolSize;
              // Set up work queues
        this.workQueue = workQueue;
              // Set thread idle time
        this.keepAliveTime = unit.toNanos(keepAliveTime);
              // Set thread factory
        this.threadFactory = threadFactory;
              // Set rejection policy
        this.handler = handler;

execute method

public void execute(Runnable command) {
              // Boundary judgment
        if (command == null)
            throw new NullPointerException();
                  Judge whether the current number of worker threads is less than the number of core threads
                  ctl here can be considered as saving the number of worker threads and thread pool status of the thread pool
                  Why can a variable represent two states? It will be explained later
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
              // If a worker thread is added, it returns whether it is successfully created. If it is successfully created, it returns
            if (addWorker(command, true))
                  If the creation fails, the thread pool status will be retrieved. The specific status of the thread pool will be described below
                  The reason for this is that execute is a thread safe method
                  Then there will be multithreaded calls, during which the thread pool state may change, close or add new tasks
                  So get the thread pool state again and keep it up to date
            c = ctl.get();
                  Running to this indicates that the current number of worker threads is greater than the number of core threads or the creation of worker threads is unsuccessful (thread pool is not Running)
                  Judge whether the current thread is running and whether the task queue is successfully added
        if (isRunning(c) && workQueue.offer(command)) {
              // Recheck thread pool status
            int recheck = ctl.get();
              // If it is not running and can be deleted, the task will be rejected
            if (! isRunning(recheck) && remove(command))
              // If the number of worker threads is 0, the worker thread is added. This happens when the worker thread is destroyed in idle time
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
                  Running this description, the thread pool status is not running or adding the task queue is not successful
                  Then try to add a worker thread. If the addition is unsuccessful, reject the task
        else if (!addWorker(command, false))

Summary: after reading the execute method, we can conclude that the thread pool will first add core working threads when the number of core threads is less than, add non core working threads when the task queue cannot add tasks, and reject tasks when the thread pool is in non running state or when the task queue is full and the working threads are full.

Review the question we raised in the previous article: when we create a thread pool with 10 core threads and 20 maximum threads, the task queue is an unbounded queue, and 30 tasks come at the same time.

Question 1: what is the number of threads in the thread pool?

Question 2: if I change the task queue to a queue with a size of 20, how many requests can I receive at most now?

Through reading the source code, we can now easily answer these two questions.

  • Question 1: according to the source code, the first 10 tasks directly create the core worker thread. Because the task queue is unbounded, the last 20 tasks directly join the task queue and wait for the core worker thread to consume.
  • Problem 2: if the task queue is changed to a queue with a capacity of 20, the maximum number of requests (maximum threads + queue capacity) = 40 can be accepted.

When reading the execute method, we regard the ctl attribute and addWorker as a black box. We only judge what the method does through the author's comments and method naming. We all know that execute is a thread safe method that can be called by different threads, but we don't find the locked part in the source code, My friends must be very curious about how these underlying methods do this.


    // The type is atomic integer class. Addition, deletion, modification and query are atomic operations
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // There are 32-3 = 29 bits in total
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // Represents a maximum capacity of 2 ^ 29-1 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // Store the thread pool status in the high bit of the integer byte, and the high 3 bits represent the thread pool status
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Since the upper 3 bits represent the thread pool state, this method changes the lower 29 bits to 0 to get the upper 3 bits
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // As above, change the upper 3 bits to 0 to get the number of worker threads
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // Or operation of rs and wc
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // Judge whether c is less than s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    // Judge whether c is greater than or equal to s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    // Because only Running is less than shutdown, it can be judged by this method
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    // Try to use CAS to give ctl+1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    // Try to use CAS to give ctl-1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);

From the source code of ctl above, we can see that the author divides an integer variable into two parts. One part is used to represent the thread pool state, the other part is used to represent the current number of working threads, the higher 3 bits represent the current state of the thread pool, and the last 29 bits represent the size of the thread pool. Through here, the interviewer can ask how much the maximum number of threads can be set, and kill another one.

There may be small partners who do not understand bit operations, do not understand the logic of this section, and shift left and right bit operations, but in fact, they understand the general functions of the method first, which does not affect the reading of the later source code.

Due to the limited space of this article, it is recommended that you want to inquire about the data of query operation.

Thread pool status

In the ctl attribute section, we will find the following enumeration states. What do they mean?

  • RUNNING: allows you to receive new tasks and process tasks in the task queue.
  • SHUTDOWN: do not receive new tasks but process tasks in the task queue.
  • STOP: do not receive new tasks, do not process tasks in the task queue, or interrupt tasks in process
  • TIDYING: all tasks have ended, the number of worker threads is 0, and the terminated() hook method will be called
  • TERMINATED: terminated() hook method executed successfully

The status in the process pool flows like this:

  • Running - > shutdown: call the shutdown() method of the thread pool.
  • (running / shutdown) - > stop: call the shutdown now() method of the thread pool.
  • Shutdown - > tidying: when both task queue and worker thread are empty.
  • Stop - > tidying: when the worker thread is empty.
  • Tidying - > terminated: when the terminated() hook method is executed successfully.


This method will create worker threads and control the number of created threads to the number of core threads or the maximum number of threads. The firstTask is the first task executed after the worker thread is successfully created. The second parameter represents whether it is a core worker thread and finally returns whether the thread is successfully created.

private final HashSet<Worker> workers = new HashSet<Worker>();

private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
        // Flag the outermost loop, and the loop is dead
        for (;;) {
            // Get ctl value
            int c = ctl.get();
            // Get thread pool status
            int rs = runStateOf(c);
            if (
                // Judge whether it is one of SHUTDOWN, STOP, TIDYING and TERMINATED
                rs >= SHUTDOWN 
                    This logic returns true only when SHUTDOWN, no task needs to be executed and the task queue is not empty
                    Represents the need to continue adding work queues to perform tasks in the task queue
                ! (
                     // The status is SHUTDOWN
                   rs == SHUTDOWN &&
                   // No first task to perform
                   firstTask == null &&
                   // Task queue is not empty
                   ! workQueue.isEmpty())
                return false;
            // Dead cycle
            for (;;) {
                // Gets the current number of worker threads
                int wc = workerCountOf(c);
                if (
                    // If the current quantity exceeds the maximum capacity, it will be returned directly
                    wc >= CAPACITY 
                    // If it is created as a core worker thread, it is compared with the maximum core thread size; otherwise, it is compared with the maximum number of threads
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS increases the number of worker threads and adds more than the end of the outermost loop
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // CAS execution failed, the value changed, and the CTL value needs to be read again
                c = ctl.get();
                // If the thread pool state changes, re execute the outermost loop
                if (runStateOf(c) != rs)
                    continue retry;

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // Create worker thread
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // Because this code will operate on the HashSet, the reentry lock is used for locking
                final ReentrantLock mainLock = this.mainLock;
                try {
                    // Retrieve status
                    int rs = runStateOf(ctl.get());

                    if (
                        // rs is RUNNING
                        rs < SHUTDOWN 
                        // This is the case when the worker thread in the pool reaches the empty time and is destroyed, but there are still tasks in the task queue
                        (rs == SHUTDOWN && firstTask == null)) {
                        // Pre check whether the thread can start
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                           // Adds a worker thread to the workers collection
                        // Record the maximum number of worker threads reached
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // Worker thread addition indicates that the setting is successful
                        workerAdded = true;
                } finally {
                    // Reentrant lock unlock
                // If the worker thread is added successfully, start the worker thread
                if (workerAdded) {
                    workerStarted = true;
        } finally {
            // If the worker thread fails to start, remove the worker thread from the collection
            if (! workerStarted)
        // Returns whether the worker thread was added successfully
        return workerStarted;

Through this source code, it passes our task task to the worker thread and creates it at the appropriate time, and adds the successfully created worker thread to the collection. The CAS dead cycle model is a model that we can learn from in our development.


Through the source code, the task is passed to the worker and the start() method is called, which shows that the worker must be a thread and have its own run method, so it is necessary for us to explore how to encode it.

The above figure is the inheritance diagram of the Worker class. It can be seen that the Worker inherits AQS and implements the Runnable method. Then we can boldly guess that he implements a locking mechanism and can be executed by threads.

worker constructor
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

    Worker(Runnable firstTask) {
            // Let's first set the tag
            // Set the first task that will be performed
            this.firstTask = firstTask;
                Create threads from the thread pool factory that was initially passed in through the thread pool constructor
                Because the worker implements the Runnable interface, it can be passed into a new thread
                It can be inferred that thread. Is called Start () will execute the worker's run() method
            this.thread = getThreadFactory().newThread(this);

run method

public void run() {

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // The setting flag can be interrupted
        boolean completedAbruptly = true;
        try {
            while (
                // firstTask is not empty or the task can be obtained from the task queue
                task != null || (task = getTask()) != null) {
                // Interrupt judgment is introduced in the next chapter
               if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                try {
                    // Hook method, subclass implementation
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    // Call the run method of the task, catch all exceptions and handle them by the hook method
                    try {
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // Hook method, subclass implementation
                        afterExecute(task, thrown);
                } finally {
                    // Leave task blank
                    task = null;
                    // Completed tasks + 1
                    // Flag worker available
            completedAbruptly = false;
        } finally {
            // Execute worker thread exit
            processWorkerExit(w, completedAbruptly);

Through this code, it can be analyzed that this code always obtains tasks from getTask() through the while loop. Then, the getTask method is analyzed below.


private Runnable getTask() {
        boolean timedOut = false; 
        // Dead cycle
        for (;;) {
            // Get current status
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (
                rs >= SHUTDOWN && 
                // STOP, TIDYING, TERMINATED or task queue is empty
                (rs >= STOP || workQueue.isEmpty())) {
                // Number of worker threads - 1
                // return null, the upper runWorker will exit the while loop and the worker thread will exit
                return null;
            // Gets the current number of worker threads
            int wc = workerCountOf(c);

            // Judge whether timeout is allowed: when the allowed timeout of core threads is true or the current number exceeds the number of core threads
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (
                // The current number of worker threads exceeds the maximum number or timeout allowed and has timed out
                (wc > maximumPoolSize || (timed && timedOut))
                // The worker thread is greater than 1 or the task queue is empty
                (wc > 1 || workQueue.isEmpty())) {
                // Number of CAS worker threads attempted - 1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // CAS does not exceed, continue the next cycle

            try {
                // If timeout is allowed, call the poll method to wait for the set timeout. Otherwise, call the take method to block the waiting task acquisition
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // Get the task and return directly
                if (r != null)
                    return r;
                // This indicates that the task timeout is obtained and the timeout flag bit is set
                timedOut = true;
            } catch (InterruptedException retry) {
                // The thread is interrupted, set the timeout flag to false, and restart the next cycle
                timedOut = false;

Through the getTask method, we can see that when the core threads are allowed to timeout or the current number of threads is greater than the number of core threads, it means that the timeout is on. This switch can be used to determine whether to call the blocking method or non blocking method in the blocking queue. Once the timeout is returned null, the worker's run method will exit the loop and enter the worker destruction process, Thus, the number of threads in the thread pool can be dynamically modified.


This paper takes the execute method as the starting point to show you the CAS mode, lock mode and how to deal with the thread pool state.

Many people love to read the source code in the process of reading source code, but actually reading source code is a process of read without thorough understanding. In the actual reading of source code, the stack may reach 5-6 levels or even more layers. This way, reading source code is actually very inefficient. In the process of digging down deep, you will find that your time and energy are being consumed constantly. Finally, I only understand some logical branches of the source code, which is completely different from our original intention of reading the source code.

So I recommend reading the source code, first read the 1-2 layers of the call stack, and then don't go deep. When the overall logic is clear, you can go back and learn what specific details.

Keywords: Java

Added by bouncer on Wed, 29 Dec 2021 22:39:25 +0200