[Java Concurrency] Details ThreadPool Executor

Preface

Thread pool is a common optimization method in concurrency. Thread reuse can reduce thread creation, reduce resource consumption and improve program response speed. In Java, we usually create thread pools through the factory method provided by Exectuors, but the final implementation class of thread pools is ThreadPoolExecutor. Let's analyze the implementation of ThreadPoolExecutor in detail.

Basic use

Let's first look at the basic use of thread pools. In the following code, we create a fixed size thread pool, which contains up to five threads. When the number of tasks exceeds the number of threads, we add tasks to the task queue, and then retrieve tasks from the task queue when the threads are idle.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Jikai Zhang on 2017/4/17.
 */
public class ThreadPoolDemo {

    static class WorkThread implements Runnable {
        private String command;

        public WorkThread(String command) {
            this.command = command;
        }

        @Override
        public void run() {
            System.out.println("Thread-" + Thread.currentThread().getId() + " start. Command=" + command);
            processCommand();
            System.out.println("Thread-" + Thread.currentThread().getId() + " end.");
        }

        private void processCommand() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {

            Runnable work = new WorkThread("" + i);
            executor.execute(work);
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finish all threads.");
    }
}

Summary

Before analyzing the specific implementation of thread pool, we first look at the specific workflow of thread pool. Only by familiarizing ourselves with the workflow can we better understand the implementation of thread pool. Thread pools usually associate a task queue to cache tasks. When a thread completes a task, it takes the next task from the task queue. ThreadPool Executor uses blocking queues as task queues, which block threads requesting tasks when they are empty. Below is the overall picture of ThreadPool Executor:

Pictures from the art of concurrent programming in Java

Let's focus on the process of ThreadPool Executor adding tasks and closing thread pools. The following figure shows the process of adding tasks to ThreadPool Executor:

Let's first look at the specific process of adding tasks:

  • If the number of threads in the thread pool is less than corePoolSize, create a new thread directly (regardless of whether there are idle threads in the thread pool), then assign the task to the new thread and add the thread to the thread pool.
  • If the number of threads in the thread pool is greater than or equal to corePoolSize, the task is added to the task queue
  • If the task queue is saturated (for bounded task queues), see if the number of threads in the thread pool is less than maximumPoolSize. If less, create new threads, assign current tasks to new threads, and add threads to the thread pool. Otherwise, the reject policy is executed for the task.

In ThreadPool Executor, two quantities are used to control the size of the thread pool: core Pool Size and maximum Pool Size. CorePoolSize denotes the number of live threads that should be held in a normal thread pool, and maximumPoolSize denotes the maximum number of threads that a thread pool can hold. When the number of threads in the thread pool does not exceed corePoolSize, threads in the thread pool are considered core threads. By default, the thread pool does not have timeout control over core threads, that is, core threads will remain in the thread pool until the thread pool is closed (the abnormal closure of threads is ignored here). . When the number of threads in the thread pool exceeds corePoolSize, additional threads are considered non-core threads, which are timed out by the thread pool and destroyed when the thread is idle for a period of time. Non-core threads are mainly used to deal with concurrent tasks in a certain period of time, that is, the previous thread configuration can not handle so many tasks in time, and need additional threads to help. When this batch of tasks is completed, the additional threads are redundant (the more threads are, the more resources they occupy), so they need to be destroyed in time.

ThreadPoolExecutor defines a maximum number of threads of 2 ^ 29 - 1 = 536870911 (why this number is mentioned later), while users can customize the maximum number of threads, and ThreadPoolExecutor processes with a smaller value between the two. When the number of threads in the thread pool is equal to maximum Pool Size, the thread pool is saturated. At this time, reject policy is implemented for new tasks. Four reject strategies are defined in JDK:

  • AbortPolicy: Throw an exception directly, default policy
  • CallerRunsPolicy: Executing tasks using the thread where the caller is located
  • DiscardOldestPolicy: Discard the first task in the current task queue and execute the method to add new tasks
  • DiscardPolicy: Discarding Tasks Directly

Let's look at the closure of the thread pool. Thread pool closures are divided into two types: shutdown and shutdown now. When the shutdown method is invoked, the thread pool no longer accepts new tasks, but it still executes the existing tasks in the task queue. When the shutdownNow method is invoked, the thread pool will not only no longer accept new tasks, nor execute the remaining tasks in the task queue, but also try to stop the thread executing the task by interrupting (we know that for interruption, the thread may or may not respond, so we can not guarantee that it will stop. Thread).

Specific realization

Next, we will analyze the implementation of ThreadPool Executor from the point of view of source code.

Worker

Each thread in ThreadPool Executor associates a Worker object, and ThreadPool actually stores a thread-associated Worker object. The Worker class wraps threads. It not only saves information about related threads, but also saves some other information, such as the first task allocated when a thread is created, and the number of tasks completed by a thread. Worker implements the Runnable interface, which is the parameter passed to the Thread class when creating a thread, so the thread executes the run method of Worker after creating it. At the same time, the Worker class inherits AbstractQueued Synchronizer to make itself a non-reentrant mutex lock (hereinafter referred to as Worker lock, note that Worker lock is non-reentrant, that is to say, the lock can only be acquired once by one thread), so each thread is actually associated with a mutex. When a thread executes a task, it needs to obtain the associated Worker lock first, and release the lock after the task has been executed. The main function of the Worker lock is to determine whether the thread is idle (depending on whether the Worker lock can be obtained) when the thread pool is closed smoothly, which will be explained in detail later. Following is the implementation of the Worker class, we have only retained some necessary content:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    // Threads associated with the current Worker object
    final Thread thread;
    // Initial tasks after thread creation
    Runnable firstTask;
    // Number of tasks completed by threads
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        // Only when the state is 0, the thread can get the Worker lock. Here, the state is set to -1.
        // Indicates that no thread can acquire locks. In the shutdown method, if you want to interrupt threads, you need to acquire threads first.
        // The associated Worker lock, and before shutdownNow interrupts the thread, first determines whether the state is greater than or equal to 0
        // So set state to - 1 to prevent interruption of the current thread
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // Pass itself in when creating a thread
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    // This method runs after the thread is created
    public void run() {
        runWorker(this);
    }

    // Once the thread is started, the thread is interrupted for the shutdownNow method
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {}
        }
    }
}

We see that the state is set to - 1 in the Worker constructor, and the explanation given in the commentary is that interrupts are prohibited until the runWorker method is executed. In fact, there are two questions: 1. Why wait until the runWorker method is implemented 2. How to prohibit interruption? For the first problem, we know that interrupts are for running threads. When a thread is created, only the start method is called, and the start method is called in the runWorker method, that is to say, only when the runWorker method is executed, can the thread really start. For the second problem, this is mainly for shutdown and shutdownNow methods. In the shutdown method, the worker lock of the thread is first attempted before interrupting the thread. Only when the worker lock is obtained, the thread is interrupted. The premise of obtaining the Worker lock is that the state variable of the Worker lock is 0. When the state is set to - 1, no thread can get the lock, so it can not perform interruption operation on the thread. In the shutdownNow method, the Worker's interruptIfStarted method is called to interrupt threads, while the interruptIfStarted method interrupts threads only when state >= 0, so setting state to - 1 prevents threads from interrupting ahead of time. When the runWorker method is executed, an unlock operation is performed for the incoming Worker object (i.e., adding state to 1) to change the state of the Worker object to zero, thus leaving the thread in an interruptible state.

state variable

In ThreadPool Executor, a variable ctl of AtomicInteger type is defined to store the status of thread pool and the number of threads. The following is the definition of the variable:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl saves the number of threads in a low 29-bit thread pool (which is why the maximum number of threads in a thread pool is 2 ^ 29-1), while a high 3-bit thread pool saves the state of the thread pool. In order to extract these two information, ThreadPool Executor defines a variable CAPACITY with 29 bits and 1 bits. The number of threads can be obtained by performing an operation with CAPACITY, and the state of thread pool can be obtained by doing an operation with ~CAPACITY. The following is the implementation of the program:

// The number of bit s in which threads are stored is 29
private static final int COUNT_BITS = Integer.SIZE - 3;

// Used to extract the running status of thread pool and the number of threads, 29 bits are all 1, and 3 bits are 0.
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// Get the running status of the thread pool
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}

// Get the number of threads
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

ThreadPool Executor defines five states for thread pools:

  • RUNNING: Normal, accepts new tasks, and processes tasks in the task queue
  • SHUTDOWN: Do not accept new tasks, but handle tasks that are already on the task queue
  • STOP: Do not accept new tasks, do not handle tasks that are already on the task queue, and try to stop threads that are executing tasks
  • TIDYING: Thread pool and task queue are empty, in which state the thread executes terminated() method
  • TERMINATED: terminated() method completed

The following is the definition of these five variables in JDK:

// 11100000000000000000000000000000  -536870912
private static final int RUNNING = -1 << COUNT_BITS;

// 00000000000000000000000000000000  0
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 00100000000000000000000000000000  536870912
private static final int STOP = 1 << COUNT_BITS;

// 01000000000000000000000000000000  1073741824
private static final int TIDYING = 2 << COUNT_BITS;

// 01100000000000000000000000000000  1610612736
private static final int TERMINATED = 3 << COUNT_BITS;

The following are transitions between states:

  • RUNNING - > SHUTDOWN: Called the shutdown() method (perhaps implicitly in finalize())
  • (RUNNING or SHUTDOWN) - > STOP: The shutdownNow() method is called
  • SHUTDOWN - > TIDYING: Thread pool and task queue are empty
  • STOP - > TIDYING: Thread pool is empty
  • TIDYING - > TERMINATED: Execute the terminated() method

Adding tasks

By execute or submit method, you can add a task to the thread pool. Subit will return a Future object to get the return value of the thread. The following is the implementation of submit method:

public Future <?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture <Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

We see that in submit we just wrapped the Runnable object and finally invoked the execute method. Let's look at the implementation of the execute method.

public void execute(Runnable command) {
    // command cannot be null
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // With fewer threads than corePoolSize, a new thread is created to perform the task.
    if (workerCountOf(c) < corePoolSize) {
        // true indicates that the currently added thread is the core thread
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // If the number of threads is greater than or equal to corePoolSize, first try adding tasks to the task queue
    // WorQueue. offer adds tasks to the end of the queue
    if (isRunning(c) && workQueue.offer(command)) {
        // Re-check status
        int recheck = ctl.get();
        // If you find that the current thread pool is not in the Running state, remove the previous task
        // Lock protection during task removal
        if (!isRunning(recheck) && remove(command)) {
            reject(command);
        } else if (workerCountOf(recheck) == 0) {

            // WorkCountOf is used to count the current number of worker threads. There are two possibilities for the program to execute here:
            //  1. The current thread pool is in the Running state, but the number of worker threads is 0.
            //      Need to create new threads
            //  2. The removal task failed, but the number of worker threads was 0.
            //      New threads need to be created to complete the task of removing failures
            //
            //  Because the task was judged earlier, the task normally passed in to addWorker
            //  It can't be null. The null passed in here tells addWorker that a new thread needs to be created.
            //  In addWorker, null has special processing logic
            addWorker(null, false);
        }
    // The following else indicates that the thread pool is not in Running state or that the task queue is full.
    } else if (!addWorker(command, false)) {
        // This shows that the thread pool is not Running state or that the thread pool is saturated.
        reject(command);
    }
}

We mentioned the process of adding tasks to thread pools earlier, and I'll repeat it here.

  • If the number of threads in the thread pool is less than corePoolSize, a new thread is created, the current task is executed, and the task is added to the thread pool.
  • If the number of threads in the thread pool is greater than or equal to corePoolSize, the task is first added to the task queue
  • If the task queue is full, the thread is continued to be created, and if the thread pool reaches the saturation value maximumPoolSize, the reject policy is invoked to process the task.

The addWorker method creates and starts threads. When the thread pool is not in the Running state and the incoming task is not null, the addWorker cannot create threads successfully. Let's look at its implementation:

private boolean addWorker(Runnable firstTask, boolean core) {
    // Retry is similar to goto. Continuretry jumps to retry definition.
    // Brea retry jumps out of retry
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // We will elaborate on this condition below.
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // The number of threads is greater than the maximum number of threads specified by the system or greater than corePoolSize/maximumPoolSize
            // Indicates that new threads cannot be added to the thread pool, where WC >= CAPACITY prevents corePoolSize
            // Or maximum PoolSize is larger than CAPACITY
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
                return false;
            }
            // Increase the number of threads using CAS and jump out of retry if successful
            if (compareAndIncrementWorkerCount(c)) {
                break retry;
            }

            c = ctl.get(); // Re-read ctl
            // If the state of the thread pool changes, start again at retry.
            if (runStateOf(c) != rs)
                continue retry;

            // When the program is executed to this point and CAS is not successful, then CAS is executed again.
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // Create work
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // T!= null indicates that thread creation was successful
        if (t != null) {
            // The program stores threads in a HashSet, which is not thread-safe.
            // So the process of adding threads to HashSet needs to be locked.
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 1. RS < SHUTDOWN Describes the running state of the program
                // 2. rs = SHUTDOWN indicates that the current thread is in a flat shutdown state, while firstTask = null
                //    Describes that the thread currently created is intended to handle the remaining tasks in the task queue (deliberately passed in null)
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // Threads are alive, indicating that threads start early.
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // Start threads
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Here we focus on the conditions for returning false:

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
// Equivalent to
if(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))

Let's look at the above conditions in turn:

  • RS >= SHUTDOWN & rs!= SHUTDOWN: This means that the thread pool is in STOP, TIDYING or TERMINATED state, and in these three states, the thread pool can return directly after all the tasks have been processed or the remaining tasks are not executed.
  • Rs = SHUTDOWN & First Task!= null: If the above conditions are not valid, it means that the current thread pool must be in the SHUTDOWN state. In the execute method, we mentioned that if NULL is passed in, it means that the thread is created to perform the remaining tasks in the queue (there are no worker threads in the thread pool at this time). It should not be returned. If firstTask!= null, it means that it is not intended to handle the remaining tasks in the queue, it can be returned.
  • Rs = SHUTDOWN & workQueue. isEmpty (): Says that all tasks in the task queue have been executed and can be returned without creating new threads.

When a thread is created and started successfully, the run method of Worker is executed, which finally calls the runWorker method of ThreadPoolExecutor and passes itself in as a parameter. The following is the implementation of the runWorker method:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // Here set the state in Worker to 0 so that other threads can get locks
    // So you can interrupt the current thread
    w.unlock(); // allow interrupts
    // Used to mark whether a thread exits the loop normally or abnormally
    boolean completedAbruptly = true;
    try {
        // If the task is not empty, it means that the thread has just been created, and if the task is empty, it takes the task from the queue.
        // If the queue has no tasks, the thread will block here.
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                // Do some processing before the task is executed. Empty functions require user-defined processing logic.
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    // Because the runnable method cannot throw checkedException, here it is
                    // Wrap exceptions as Error s and throw them
                    throw new Error(x);
                } finally {
                    // Do some processing after the task is executed, default empty function
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

In the above code, the logic of the first if judgment is somewhat difficult to understand. Let's take it out and analyze it.

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
    && !wt.isInterrupted())
    wt.interrupt();

This if block has two functions:

  • If the current thread pool is less than STOP, that is, RUNNING or SHUTDOWN, ensure that the threads in the thread pool are in an uninterrupted state.
  • If the current thread pool state is greater than or equal to STOP, that is, STOP, TIDYING or TERMINATED, ensure that the thread in the thread pool is interrupted.

There are many parentheses in the if code above. First, we divide it into two major conditions:

  • runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) &&
  • !wt.isInterrupted()

Let's first look at the second condition:! Wt. is Interrupted (), which states that the current thread is not interrupted, and only if the thread is not interrupted, it is possible to interrupt the thread. Then we can divide the first condition into the following two conditions:

  • runStateAtLeast(ctl.get(), STOP) ||
  • Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)

Let's first look at the first condition, which states that the thread is in STOP and later state, and that the thread should be interrupted. If this condition does not hold, indicating that the current thread should not be interrupted, the Thread.interrupted() method is called, which first returns the interrupt state of the thread, then resets the interrupt state of the thread (set to false), and if the interrupt state is false, then you can jump out of the if block, but If the interrupt state is true, it means that the thread has been interrupted, then we need to determine whether the interruption of the thread is caused by the shutdownNow method (concurrent call, this method will interrupt the thread of the thread pool, and modify the thread pool state to STOP, which will be mentioned later), so we need to check the thread status again. If we find that the current thread pool has changed to STOP or later, it is really caused by shutdownNow method. We need to interrupt the thread again. If not, we don't need to interrupt the thread any more.

We see that getTask is called repeatedly in runWorker to get tasks. Let's look at the implementation of getTask

/**
 * getTask Return null to indicate that the current thread needs to be reclaimed
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // RS >= SHUTDOWN indicates that the current thread pool is at least closed and no longer accepts new tasks
        //  1. RS >= STOP: Indicates that tasks do not need to be processed (even if there are tasks)
        //  2. workQueue.isEmpty(): Indicates that the remaining tasks in the task queue have been processed
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // Timed is used to determine whether threads need to be timed out
        //  1. allowCoreThreadTimeOut: Explain for true that you can time-out control core threads
        //  2. WC > corePoolSize: Explains that there are non-core threads in the thread pool
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 1. wc > maximumPoolSize || (timed && timedOut)
        //     The number of threads is greater than the maximum PoolSize value or allows timeout control and timeout
        // 2. wc > 1 || workQueue.isEmpty()
        //     The number of active threads in a thread is greater than 1 or the task queue is empty (there is no need to perform the remaining tasks in the remaining threads)
        // If both 1 and 2 above are true, use CAS to reduce the number of threads by 1 and return null to reclaim the current thread
        // If CAS fails, try again
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // If timeout control is allowed, the poll method is executed, which responds to the timeout when keeping AliveTime time
            // If you still haven't got the task, you return to null. The take method does not respond to the timeout operation and waits until the task is retrieved.
            // In addition, both poll and take methods respond to interruptions if no new tasks are added to the queue.
            // InterruptedException will be thrown directly
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            // Execution to this point indicates timeout
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

When getTask returns null, the thread needs to be reclaimed. Let's summarize how null is returned in getTask:

  • The total number of worker threads in the thread pool is larger than maximumPoolSize (generally because we reset maximumPoolSize by calling the setMaximumPoolSize method)
  • Thread pool has been stopped (status >= STOP)
  • Thread pool is in SHUTDOWN state and task queue is empty
  • Threads timeout while waiting for tasks

When we combine runWorker with getTask, the whole process becomes clearer:

  1. Tasks are continuously retrieved from the task queue through the while loop, and threads are blocked if there are no tasks in the current task queue. If getTask returns null, it indicates that the current thread should be reclaimed and executes the logic of the reclaimed thread.
  2. If the task is successfully acquired, the state of the thread pool is first determined and the interrupt state of the current thread is set according to the state of the thread pool.
  3. Do some pre-processing before performing tasks (user implementation)
  4. Execution of tasks
  5. Do some post-processing after executing the task (user implementation)

The above two methods are the core part of the whole thread pool, in which task acquisition and thread blocking are completed. The following is a flow chart of thread submission - > processing task - > recovery:

Next, let's look at the processWorkerExit method, which is mainly used for thread recovery.

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // If completedAbruptly is true, it means that the thread jumps out of the loop because it throws an exception.
    // The logic to reduce the number of threads in getTask is not properly executed, so here you need to reduce the number of threads by one.
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Update the number of completed tasks and remove worker threads
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // Attempt to terminate thread pool
    tryTerminate();

    int c = ctl.get();

    // If the thread state is SHUTDOWN or RUNNING, you need to ensure the minimum number of threads in the thread
    // 1. If the thread ends because an exception is thrown, add a thread directly.
    // 2. If the thread ends normally
    //    * If timeout control is allowed for core threads, and there are tasks in the task queue
    //      Ensure that the number of threads is greater than or equal to 1
    //    * If timeout control of core is not allowed, ensure that the number of threads is greater than or equal to corePoolSize
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

We see that the tryTerminate method is called in processWorkerExit, which is mainly used to terminate the thread pool. If the thread pool satisfies the termination condition, first set the thread pool state to TIDYING, then execute the terminated method, and finally set the thread pool state to TERMINATED. This method is also called in shutdown and shutdownNow methods.

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // If the following three situations occur, the logic to terminate the thread pool is not executed and returned directly.
        //  1. The current thread pool is in RUNNING state and cannot be stopped.
        //  2. Current thread pool status is TIDYING or TERMINATED, no need to stop
        //  3. The current thread pool status is SHUTDOWN and the task queue is not empty
        if (isRunning(c) || runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
            return;
        // Determine whether the number of worker threads is 0
        if (workerCountOf(c) != 0) { // Eligible to terminate
            // If the number of worker threads is not zero, try to interrupt idle threads in the thread pool
            // ONLY_ONE indicates that only the first thread in the thread pool is attempted to interrupt (regardless of whether the thread is free or not)
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Setting the thread state to TIDYING indicates that the state of the thread pool has changed if the setting is unsuccessful and needs to be retried.
            // Here the transition of thread pool state from TIDYING to TERMINATED state is atomic
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // Execute terminated method (default empty method)
                    terminated();
                } finally {
                    // Set the thread state to TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

In the tryTerminate method, if the following two conditions are met, the thread pool state is set to TERMINATED:

  1. Thread pool status is SHUTDOWN and both thread pool and task queue are empty
  2. Thread pool status is STOP and thread pool is empty

If the thread pool is in the SHUTDOWN or STOP state, but the worker thread is not empty, try Terminate will try to interrupt a thread in the thread pool, mainly to prevent shutdown interrupt signal loss (we will discuss it in detail in the shutdown method). Let's look at the interruptIdleWorkers method, which interrupts idle threads.

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w: workers) {
            Thread t = w.thread;
            // First of all, whether the current thread has been interrupted or not depends on whether the thread is idle.
            // If a thread-associated Worker lock can be obtained, the thread is idle and can be interrupted.
            // Otherwise, the thread cannot be interrupted
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {} finally {
                    w.unlock();
                }
            }
            // If onlyOne is true, just try to interrupt the first thread
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

Close thread pool

Thread pools can be closed by shutdown and shutdown Now, and the difference between them has been mentioned earlier, and will not be discussed here. Let's first look at the shutdown method:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Check whether the current thread has permission to close the thread pool
        checkShutdownAccess();
        // Set the thread pool state to SHUTDOWN
        advanceRunState(SHUTDOWN);
        // Interrupt threads, where interruptIdleWorkers(false) is finally called;
        interruptIdleWorkers();
        // The hook method, which defaults to empty, allows users to do something when the thread pool is closed
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

We knew earlier that interruptIdleWorkers would check whether the thread was idle or not before interrupting it if it was found that the thread was not idle. The main purpose of interrupting threads at this time is to wake up the threads blocked in the task queue. Consider the following situation: if the thread is running when interruptIdleWorkers are executed, so it is not interrupted, but when the thread finishes executing the task, the task queue is empty, and the thread will be blocked, while shutdown has completed the interruptIdleWorkers operation (that is, the thread missed shutdow). The interrupt signal of n), if there is no additional operation, the thread will always be blocked. So in order to prevent this situation, the interruptIdleWorkers operation is added in tryTerminate(), mainly to make up for the lost signal in shutdown.

Finally, let's look at the shutdownNow method:

public List < Runnable > shutdownNow() {
    List < Runnable > tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Check whether threads have the right to close thread pools
        checkShutdownAccess();
        // Change thread state
        advanceRunState(STOP);
        // Interrupt thread
        interruptWorkers();
        // Clear the task queue and return the task
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

Then let's look at the interruptWorkers method:

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Execute interrupts regardless of whether the thread is idle or not
        for (Worker w: workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

From the above code, we can see that in the interruptWorkers method, as long as the thread starts, it interrupts the thread, so the shutdownNow interrupt signal will not be lost. Finally, let's look at drainQueue method. The main function of drainQueue method is to empty the task queue and return the remaining tasks in the queue.

private List <Runnable> drainQueue() {
    BlockingQueue <Runnable> q = workQueue;
    ArrayList <Runnable> taskList = new ArrayList < Runnable > ();
    // This method adds all items in the blocking queue to the taskList
    // Then empty the task queue, which is thread-safe
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        // Convert List to an array, and the incoming Runnable[0] is used to indicate that it is converted to a Runnable array
        for (Runnable r: q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

Thread pool monitoring

This section is excerpted from Deep Understanding of Java Thread Pool Executor

Monitor through parameters provided by thread pool. There are some properties in the thread pool that can be used when monitoring the thread pool

  • getTaskCount: The total number of tasks executed and not executed by the thread pool;
  • getCompletedTaskCount: The number of tasks completed by the thread pool, which is less than or equal to taskCount;
  • getLargestPoolSize: The maximum number of threads that a thread pool has ever created. From this data, we can know whether the thread pool is full or not, that is to say, the maximum Pool Size is reached.
  • getPoolSize: The current number of threads in the thread pool;
  • getActiveCount: The number of threads currently executing tasks in the thread pool.

Through these methods, thread pools can be monitored. Several empty methods, such as beforeExecute method, afterExecute method and terminated method, are provided in the ThreadPoolExecutor class. These methods can be extended to add some new operations before or after execution, such as counting the execution time of thread pools, etc. It can be extended by inheriting from ThreadPool Executor.

Reference Articles

Keywords: Java less JDK Programming

Added by jasonok6 on Sun, 07 Jul 2019 04:17:16 +0300