Use and principle of thread pool


To create a thread from a thread pool:

 // A pool of multiple threads. The parameter is the number of threads
        ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
        // Single threaded thread pool
        ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
        // Scalable thread pool expansion mechanism? When the number of tasks processed is different, the number of thread pools is also different
        ExecutorService threadPool3 = Executors.newCachedThreadPool();

The underlying layer calls the construction method of ThreadPoolExecutor:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize
    Number of core threads (number of resident threads)
  • maximumPoolSize
    Maximum number of threads
  • keepAliveTime , TimeUnit unit,
    The survival time of a non core thread when it is idle. Beyond this time, the non core thread will be recycled
  • BlockingQueue workQueue

Work queue

  • ThreadFactory threadFactory
    Thread factory
  • RejectedExecutionHandler handler
    Reject policy:

How thread pool works

As shown in the figure above, the number of core threads in the thread pool is 2, the number of non core threads is 3, and the size of the blocking queue is 3;
Assuming that two tasks are coming, they will be processed by the core thread. In the process of processing, three more tasks (task 3, task 4 and task 5) will come. At this time, task 3, task 4 and task 5 will be added to the blocking queue for waiting. If three more tasks come during the waiting process, at this time, The three tasks are preferentially processed by non core threads in the thread pool (for example, when the dining hall's aunt sees that there are too many people in the queue, she opens another window. As a result, the later people get the meal first than the people waiting in the queue). If another task 9 comes during the processing, all five threads in the thread pool are working, and the blocking queue is full. If the thread pool cannot process task 9, the rejection policy will be executed.
When task 6 comes, why not take task 4 from the blocking queue and add task 6 to the blocking queue? In fact, it's not necessary at all. Instead of taking it out of the queue and adding it, it's better to directly execute the task 6. Blocking the tasks in the queue deserves to be cut in. It can only be said that it's not as clever as coming early~~

Four rejection strategies:

  1. Direct throw anomaly
  2. Where did you come from
  3. Discard the one with the longest waiting time
  4. No handling, no throwing of exceptions, loss allowed

Will the above three methods be used to create thread pools in actual development? The answer is No. Because of these three methods, the blocking queue is unbounded, which is clearly explained in the Alibaba Development Manual:

Custom thread pool

package com.lchtest.juc.threadpool;

import java.util.concurrent.*;

/**
 * Custom thread pool
 */
public class ThreadPoolDemo2 {
    public static void main(String[] args) {
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(3);
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 2L,
                TimeUnit.SECONDS,
                blockingQueue,
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        try {
            for (int i = 1; i <= 8; i++){
                int temp =i;
                threadPool.execute(() ->{
                    System.out.println(Thread.currentThread().getName() + "Handle the business" + temp);
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

The above code defines a thread pool. The number of core threads is 2, the maximum number of threads is 5, and the size of the blocking queue is 3,
After testing, when the number of for cycles is less than or equal to 5, only the core thread pool will be used to process tasks. At this time, the core thread can process two tasks at the same time, and three tasks are thrown into the blocking queue to wait. After the core thread processes the business, it will take out two more tasks from the queue for execution, and finally execute five tasks

When the number of for cycles is 6-8, yes, non core threads and core threads will be enabled to process business together:

At this time, there are 8 services in total, the maximum number of threads is 5, and the blocking queue is also full. At this time, it is necessary to enable non core threads to process services at the same time, and there are three tasks waiting to be executed in the blocking queue; So what if it's nine tasks? At this time, the 9th task will execute the rejection policy. Because the set rejection policy is AbortPolicy, an exception will be thrown.

Summarize the implementation process of process pool I:

  1. When the thread pool was first created, there was no thread in it. The task queue is passed in as a parameter. However, even if there are tasks in the queue, the thread pool will not execute them immediately.
  2. When calling the execute() method to add a task, the thread pool will make the following judgment:
    a) If the number of running threads is less than corePoolSize, create a thread to run the task immediately;
    b) If the number of running threads is greater than or equal to corePoolSize, put the task into the queue;
    c) If the queue is full at this time and the number of running threads is less than or equal to maximumPoolSize, you still need to
    Create a non core thread to run the task immediately;
    d) If the queue is full and the number of running threads is greater than maximumPoolSize, the thread pool
    An exception RejectExecutionException is thrown.
  3. When a thread completes a task, it will remove a task from the queue for execution.
  4. When a thread has nothing to do for more than a certain time (keepAliveTime), the thread pool will judge that if the number of currently running threads is greater than corePoolSize, the thread will be stopped. Therefore, after all tasks of the thread pool are completed, it will eventually shrink to the size of corePoolSize.

Keywords: Java

Added by t3hub3rz on Thu, 23 Dec 2021 23:59:56 +0200