Thread pool resolution

Thread pool

Reference articles   Implementation principle of Java thread pool - meituan technical teamImplementation principle and source code analysis of Java thread pool

Write in front

1. What is a thread pool

Thread Pool is a tool to manage threads based on the idea of pooling, which often appears in multi-threaded servers.
Too many threads will bring additional overhead, including the overhead of creating and destroying threads, the overhead of scheduling threads, etc. at the same time, it also reduces the overall performance of the computer. The thread pool maintains multiple threads, waiting for the supervisor to assign tasks that can be executed concurrently. This approach, on the one hand, avoids the cost of creating and destroying threads when processing tasks, on the other hand, avoids the excessive scheduling problem caused by the expansion of the number of threads, and ensures the full utilization of the kernel.

2. Advantages of thread pool

  • Reduce resource consumption: reuse the created threads through pooling technology to reduce the loss caused by thread creation and destruction
  • Improve response speed: when a task arrives, it can be executed immediately without waiting for the thread to be created.
  • Improve the manageability of threads: threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also lead to resource scheduling imbalance and reduce the stability of the system due to the unreasonable distribution of threads. Using thread pools, you can perform unified allocation, tuning, and monitoring.
  • Provide more and more powerful functions: the thread pool is extensible, allowing developers to add more functions to it. For example, the delayed timed thread pool ScheduledThreadPoolExecutor allows tasks to be delayed or executed periodically.

The core and overall implementation of thread pool

overall design

The core implementation class of thread pool in Java is ThreadPoolExecutor. This chapter analyzes the core design and implementation of java thread pool based on the source code of JDK 1.8. Let's first look at the UML class diagram of ThreadPoolExecutor.

 

  • Executor: the top-level interface implemented by ThreadPoolExecutor is executor. The top-level interface executor provides an idea: decouple task submission and task execution. Users do not need to pay attention to how to create threads and how to schedule threads to execute tasks. Users only need to provide Runnable objects to submit the running logic of tasks to the executor, and the executor framework completes the deployment of threads and the execution of tasks.
  • ExecutorService: (1) expand the ability to execute tasks and supplement the methods that can generate Future for one or a batch of asynchronous tasks; (2) It provides methods to control the thread pool, such as stopping the operation of the thread pool
  • AbstractExecutorService: the upper level abstract class connects the processes of executing tasks to ensure that the lower level implementation only needs to focus on a method of executing tasks
  • ThreadPoolExecutor: implementation class, the most complex part of the business. On the one hand, ThreadPoolExecutor will maintain its own life cycle, on the other hand, it will manage threads and tasks at the same time, so as to make a good combination of the two, so as to execute parallel tasks.

        

Thread pool actually constructs a producer consumer model internally, which decouples threads and tasks and is not directly related, so as to buffer tasks and reuse threads. The operation of thread pool is mainly divided into two parts: task management and thread management. The task management part acts as a producer. After the task is submitted, the thread pool will judge the subsequent flow of the task: (1) directly apply for the thread to execute the task; (2) Buffer to the queue and wait for the thread to execute; (3) Reject the task. The thread management part is consumers. They are uniformly maintained in the thread pool and allocate threads according to the task request. When the thread executes the task, it will continue to obtain new tasks for execution. Finally, when the thread cannot obtain the task, the thread will be recycled.

Execution flow chart

 

  1. After submitting the task, the thread pool first judges whether the number of core threads has been reached. If the number of core threads has not been reached, the core thread is created to process the task, otherwise execute the next step.
  2. The thread pool then determines whether the task queue (blocking queue) is full. If it is not full, add the task to the task queue, otherwise execute the next step.
  3. The thread pool will judge whether the number of threads has reached the maximum number of threads. If not, create non core threads to process tasks; Otherwise, execute the saturation strategy and throw an exception.

Construction method

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
            ...
    }
  • corePoolSize: number of core threads. By default, the thread pool is empty. Threads are created only when tasks are submitted. If the number of currently running threads is less than the number of core threads, create a new thread to process the task; If it is greater than or equal to the number of core threads, it is not created. You can call preStartAllCoreThread to create and start all core threads in advance to wait for tasks.
  • maximumPoolSize: the maximum number of threads allowed to be created in the thread pool. If the task queue is full and the current number of threads is less than maximumPoolSize: the thread pool will still create new threads to process tasks.
  • keepAliveTime: idle timeout for non core threads. If allowCoreThreadTimeout is set to True, keepAliveTime will also be applied to the core thread.
  • Unit: the time unit of keepAliveTime.
  • workQueue: task queue. When the current number of threads is greater than the number of core threads, the task is added to the task queue. The task queue is of the BlockingQueue type, that is, the blocking queue.
  • threadFactory: thread factory. Used to create threads, set names, etc.
  • handler: saturation policy. The policy to take when the task queue and the number of threads are full. The default is abordproductivity, which identifies that a new task cannot be processed and throws a RejectedExecutionException. In addition, there are three strategies.
    (1) CallerRunsPolicy: run the rejected task directly in the calling thread of the execute method
    (2) DiscardPolicy: discards rejected tasks
    (3) DiscardOldestPolicy: discard the task at the top of the task queue, and then retry execute. Unless the executor is closed, the task will be discarded

Life cycle management

Status of the thread pool running. The AtomicInteger type ctl is a field that controls the running state of the thread pool and the number of effective threads in the thread pool. It also contains two parts of information: the running state of the thread pool (runState) and the number of effective threads in the thread pool (workerCount). The upper 3 bits save runState and the lower 29 bits save workerCount. The two variables do not interfere with each other.

    //The low 29 bits represent the number of threads in the thread pool, and the high 3 bits represent the running state of the thread pool
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//The initial value of get() is 0b11100000000000000000000000

Thread pool also provides several methods for users to obtain the current running state and number of threads of thread pool

private static int runStateOf(int c)     { return c & ~CAPACITY; } //Calculate current running status
private static int workerCountOf(int c)  { return c & CAPACITY; }  //Calculate the current number of threads
private static int ctlOf(int rs, int wc) { return rs | wc; }   //ctl generation by status and number of threads

Five states of thread pool

// runState is stored in the high-order bits
    //The thread pool in this state will receive new tasks and process the tasks in the blocking queue;
    private static final int RUNNING    = -1 << COUNT_BITS;//0b11100000000000000000000000000000
    //The thread pool in this state will not receive new tasks, but will process the tasks in the blocking queue;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
    //Threads in this state will not receive new tasks, nor will they process tasks in the blocking queue, and will interrupt running tasks;
    private static final int STOP       =  1 << COUNT_BITS;//0b00100000000000000000000000000000
    //All tasks have been terminated
    private static final int TIDYING    =  2 << COUNT_BITS;//0b01000000000000000000000000000000
    //The terminated() method has been executed
    private static final int TERMINATED =  3 << COUNT_BITS;//0b01100000000000000000000000000000

Task execution mechanism

Submit task

The entry of task scheduling is the execute() method. Execution process:
1. First check the RUNNING state of the thread pool. If it is not RUNNING, it will be rejected directly. The thread pool should ensure that tasks are executed in the RUNNING state.
2. If workercount < corepoolsize, create and start a thread to execute the newly submitted task.
3. If workercount > = corepoolsize and the blocking queue in the thread pool is not full, add the task to the blocking queue.
4. If workercount > = corepoolsize & & workercount < maximumpoolsize, and the blocking queue in the thread pool is full, create and start a thread to execute the newly submitted task.
5. If workercount > = maximumpoolsize and the blocking queue in the thread pool is full, the task is processed according to the rejection policy. The default processing method is to throw exceptions directly.

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //workerCountOf gets the current number of threads in the thread pool; If it is less than corePoolSize, execute addWorker to create a new thread and execute the command task
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //step 2
        //The thread pool is in the RUNNING state and successfully puts the submitted task into the blocking queue
        if (isRunning(c) && workQueue.offer(command)) {
            //double check
            int recheck = ctl.get();
            //If the thread pool has no RUNNING and the task is successfully deleted from the blocking queue, execute the reject method to process the task
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //If the thread pool is in the running state but there are no threads, a thread is created
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //step 3
        // If it fails to create a new thread into the thread pool, reject the task
        else if (!addWorker(command, false))
            reject(command);
    }

Task buffer

Task buffer module is the core part that thread pool can manage tasks. The essence of thread pool is the management of tasks and threads, and the key idea to do this is to decouple tasks and threads and prevent them from being directly related before subsequent allocation can be done. Thread pool is implemented in producer consumer mode through a blocking queue. The blocking queue caches the task, and the worker thread obtains the task from the blocking queue. A blocking queue is a queue that supports two additional operations. These two additional operations are: when the queue is empty, the thread getting the element will wait for the queue to become non empty. When the queue is full, the thread that stores the element waits for the queue to become available. Blocking queues are often used in the scenario of producers and consumers. Producers are threads that add elements to the queue, and consumers are threads that take elements from the queue. A blocking queue is a container where producers store elements, and consumers only get elements from the container.
Common blocking queues

 

Task acquisition

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //Has the thread pool stopped
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            //Block acquisition or time limited acquisition
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

A little code here is the key to keepAliveTime:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();

allowCoreThreadTimeOut is false, and the thread will not be destroyed even if it is idle; If it is true, it will be destroyed if it is still idle within keepAliveTime.
If the thread is allowed to wait idle without being destroyed, timed == false, workQueue.take task: if the blocking queue is empty, the current thread will be suspended for waiting; When a task is added to the queue, the thread is awakened, and the take method returns the task and executes it;
If the thread does not allow endless idle time = = true, workqueue.poll task: if the queue is blocked or there is no task within the keepAliveTime time, null is returned;

Task rejection policy

The task rejection module is the protection part of the thread pool. The thread pool has a maximum capacity. When the task cache queue of the thread pool is full and the number of threads in the thread pool reaches maximumPoolSize, it is necessary to reject the task and adopt the task rejection policy to protect the thread pool.

Policy interface:

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

Four existing rejection strategies

 

Worker thread management

Worker

The Worker thread implements the Runnable interface and holds a thread, an initialized task, and a firstTask. Thread is a thread created through ThreadFactory when calling the construction method, which can be used to execute tasks; firstTask uses it to save the first task passed in. This task can be null or empty. If this value is not empty, the thread will execute this task immediately at the beginning of startup, which corresponds to the situation when the core thread is created; If this value is null, you need to create a thread to execute the tasks in the workQueue, that is, the creation of non core threads.

The thread pool needs to manage the life cycle of threads, and it needs to be recycled when the online process is not running for a long time. The thread pool uses a Hash table to hold thread references, which can control the thread life cycle by adding and removing references. At this time, the important thing is how to judge whether the thread is running.

Worker inherits AQS and uses AQS to realize the function of exclusive lock. Instead of reentrant lock, AQS is used to realize the non reentrant feature to reflect the current execution state of the thread.

1. Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task.
2. If a task is being executed, the thread should not be interrupted.
3. If the thread is not in the exclusive lock state, that is, idle state, it indicates that it is not processing tasks. At this time, the thread can be interrupted.
4. When the thread pool executes the shutdown method or tryTerminate method, it will call the interruptIdleWorkers method to interrupt idle threads. The interruptIdleWorkers method will use the tryLock method to judge whether the threads in the thread pool are idle; If the thread is idle, it can be safely recycled.

Add thread addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //Number of cas update thread pools
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // Thread pool reentry lock
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //add work thread to a hashSet
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //Start the thread and execute the task
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker thread recycling

The destruction of threads in the thread pool depends on the automatic recycling of the JVM. The work of the thread pool is to maintain a certain number of thread references according to the current state of the thread pool to prevent these threads from being recycled by the JVM. When the thread pool determines which threads need to be recycled, it only needs to eliminate their references. After the Worker is created, it will continuously poll and get the task to execute. The core thread can wait for the task indefinitely, and the non core thread should get the task within a limited time. When the Worker cannot get the task, that is, the obtained task is empty, the cycle will end, and the Worker will actively eliminate the reference in its thread pool.

try {
  while (task != null || (task = getTask()) != null) {
    //Perform tasks
  }
} finally {
  processWorkerExit(w, completedAbruptly);//When the task cannot be obtained, take the initiative to recycle yourself
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

Execute task runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

Some methods

 public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            //Initiate an interrupt to the work of idle
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

Additional knowledge points

Thread group

ThreadGroup is to facilitate thread management. Some properties of thread group can be set uniformly, such as setDaemon, setting the handling method of unhandled exceptions, setting unified security policies, etc; You can also easily get some information about threads through thread groups.
Each ThreadGroup can contain a group of sub threads and a group of sub thread groups. In a process, the thread group exists in the form of a tree. Generally, the root thread group is the system thread group. The system thread group is the main thread group. By default, the first level application's own thread group is created through the main thread group.

Keywords: Java Android

Added by wildmalc on Fri, 29 Oct 2021 03:29:40 +0300