Thread pool practice

1, Background of thread pool

The creation of Java threads is very expensive and requires the cooperation of JVM and OS (operating system) to complete a lot of work:

(1) A large number of memory blocks must be allocated and initialized for the thread stack, including at least 1MB of stack memory.

(2) A system call is required to create and register local threads in the OS (operating system). The frequent creation and destruction of threads in Java highly concurrent applications is very inefficient and is not allowed by the programming specification. How to reduce the creation cost of Java threads? Thread pool must be used.

Thread pool mainly solves the following two problems:

(1) Improve performance: the thread pool can be independently responsible for the creation, maintenance and allocation of threads. When executing a large number of asynchronous tasks, you do not need to create your own threads, but hand over the tasks to the thread pool for scheduling. The thread pool can use idle threads to execute asynchronous tasks as much as possible, and reuse the created threads to the greatest extent, which improves the performance significantly.

(2) Thread management: each Java thread pool will maintain some basic thread statistics, such as the number of completed tasks, idle time, etc., so as to effectively manage threads and efficiently schedule the received asynchronous tasks.

2, Thread pool architecture of JUC (java.util.concurrent)

How thread pools are created

No matter how many packages are made on the thread pool, the following construction methods should be called finally

   /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Thread creation scheduling process

Queue of thread pool

Compared with ordinary queues, blocking queue in Java has an important feature: when the blocking queue is empty, it will block the element acquisition operation of the current thread. Specifically, when a thread obtains elements from an empty blocking queue, the thread will be blocked until there are elements in the blocking queue; When there are elements in the queue, the blocked thread will wake up automatically (the wake-up process does not require user program intervention). The java thread pool uses the asynchronous tasks temporarily received by the BlockingQueue instance. BlockingQueue is a super interface of the JUC package. The commonly used implementation classes are:

(1) ArrayBlockingQueue: it is a bounded blocking queue (bounded queue) implemented by an array. The elements in the queue are sorted by FIFO. The size of ArrayBlockingQueue must be set when it is created. When the number of received tasks exceeds the number of corePoolSize, the tasks will be cached in the blocking queue. The number of task caches can only be the size set when it is created. If the blocking queue is full, threads will be created for new tasks until the total number of threads in the thread pool is greater than maximumPoolSize.

(2) LinkedBlockingQueue: it is a blocking queue based on linked list. Tasks are sorted by FIFO. The capacity (bounded queue) can be set. If the capacity is not set, integer is used by default Max_ Value as capacity (unbounded queue). The throughput of this queue is higher than that of ArrayBlockingQueue. If the capacity of the LinkedBlockingQueue (unbounded queue) is not set, when the number of tasks received exceeds the corePoolSize, new tasks can be cached into the blocking queue indefinitely until the resources are exhausted. There are two factory methods for creating thread pools, executors Newsinglethreadexecutor and executors Newfixedthreadpool uses this queue and has no capacity set (unbounded queue).

(3) PriorityBlockingQueue: an unbounded queue with priority.

(4) DelayQueue: This is an unbounded blocking delay queue. The underlying implementation is based on PriorityBlockingQueue. Each element in the queue has an expiration time. When the element is obtained from the queue (element out of the queue), only the expired element will be out of the queue, and the element at the head of the queue is the element with the fastest expiration. Quick factory method executors The thread pool created by newscheduledthreadpool uses this queue.

(5) SynchronousQueue: (synchronous queue) is a blocking queue that does not store elements. Each insertion operation must wait until the removal operation is called by another thread. Otherwise, the insertion operation is always blocked, and its throughput is usually higher than that of LinkedBlockingQueue. Quick factory method executors The thread pool created by newcachedthreadpool uses this queue. Compared with the previous queue, this queue is special. It does not save the submitted task, but directly creates a new thread to execute the new task

Thread pool reject policy

When the task cache queue of the thread pool is a bounded queue (queue with capacity limit), if the queue is full, the task submitted to the thread pool will be rejected. Generally speaking, there are two situations when a task is rejected: (1) the thread pool has been closed. (2) The work queue is full and maximumPoolSize is full.

(1) When AbortPolicy uses this policy, if the thread pool queue is full, the new task will be rejected and a RejectedExecutionException exception will be thrown. This policy is the default reject policy of thread pool.

(2) DiscardPolicy this policy is the Silent version of AbortPolicy. If the thread pool queue is full, the new task will be directly discarded and no exception will be thrown.

(3) DiscardOldestPolicy discards the oldest task policy, that is, if the queue is full, the task that enters the queue first will be discarded to make room from the queue and then try to join the queue. Because the queue is in at the end of the queue and out at the head of the queue, and the team head element is the oldest, every time you remove the team head element and then try to join the team.

(4) CallerRunsPolicy the caller executes the policy. When a new task is added to the thread pool, if the addition fails, the task submitting thread will execute the task itself and will not use the thread in the thread pool to execute the new task. Among the above four built-in policies, the default rejection policy of the thread pool is AbortPolicy. If the submitted task is rejected, the thread pool will throw a RejectedExecutionException exception. This exception is a non checked exception (runtime exception), which is easy to forget to catch. If you are concerned about the event that the task is rejected, you need to catch the RejectedExecutionException exception when submitting the task.

(5) Custom policy if the above rejection policies do not meet the requirements, you can customize a rejection policy and implement the rejectedExecution method of RejectedExecutionHandler interface.

Thread pool state transition

Determines the number of threads in the thread pool

(1) Because the CPU utilization of IO intensive tasks is low, resulting in a lot of spare time for threads, it is usually necessary to open threads with twice the number of CPU cores. When the IO thread is idle, you can enable other threads to continue to use the CPU to improve the CPU utilization.

(2) CPU intensive tasks are also called computing intensive tasks, which are characterized by the consumption of CPU resources due to a large number of calculations, such as calculating pi and HD decoding of video. Although CPU intensive tasks can also be completed in parallel, the more parallel tasks, the more time spent on task switching, and the lower the efficiency of CPU executing tasks. Therefore, to make the most efficient use of CPU, the number of parallel execution of CPU intensive tasks should be equal to the number of cores of CPU.

(3) Hybrid task

Optimal number of threads = (ratio of thread waiting time to thread cpu time + 1) * number of cpu cores

Keywords: Java Back-end

Added by bivaughn on Wed, 09 Feb 2022 02:36:41 +0200