JAVA thread pool ThreadPoolExecutor class usage details

In Alibaba java development manual, it is pointed out that thread resources must be provided through the thread pool, and it is not allowed to create threads displayed in the application. On the one hand, the creation of threads is more standardized, and the number of threads can be reasonably controlled; On the other hand, the detail management of threads is handed over to the thread pool, which optimizes the cost of resources. Thread pools are not allowed to be created by Executors, but by ThreadPoolExecutor. On the one hand, although the Executor framework in jdk provides methods to create thread pools, such as newFixedThreadPool(), newsinglethreadexecution(), newCachedThreadPool(), they all have their limitations and are not flexible enough; In addition, the previous methods are implemented internally through ThreadPoolExecutor. Using ThreadPoolExecutor can help you clarify the running rules of thread pool, create thread pool that meets the needs of your own business scenario, and avoid the risk of resource depletion.

Let's give a detailed overview of how to use ThreadPoolExecutor.

First, look at the constructor of ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

The meaning of constructor parameters is as follows:

corePoolSize: Specifies the number of threads in the thread pool, which determines whether the added task is to open up a new thread to execute or put it into the workQueue task queue;

maximumPoolSize: Specifies the maximum number of threads in the thread pool. This parameter determines the maximum number of threads that the thread pool will open up according to the type of workQueue task queue you use;

keepAliveTime: when the number of idle threads in the thread pool exceeds the corePoolSize, how long will the redundant threads be destroyed;

Unit: the unit of keepalivetime

workQueue: task queue, which is added to the thread pool but has not been executed; It is generally divided into direct submission queue, bounded task queue, unbounded task queue and priority task queue;

threadFactory: thread factory, which is used to create threads. Generally, it can be used by default;

handler: reject policy; How to reject a task when there are too many tasks to handle;

Next, we will learn more about the important parameters:

1, workQueue task queue

As described above, it is generally divided into direct submission queue, bounded task queue, unbounded task queue and priority task queue;

1. Direct submission queue: set to SynchronousQueue queue. SynchronousQueue is a special BlockingQueue. It has no capacity. It will be blocked if it does not perform an insert operation. It will wake up only after another delete operation. On the contrary, each delete operation will wait for the corresponding insert operation.

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //The maximumPoolSize is set to 2, and the rejection policy is abortpolar policy. Exceptions are thrown directly
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

The output result is:

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

You can see that when the task queue is synchronous queue and the number of threads created is greater than maximumPoolSize, the rejection policy is directly executed and an exception is thrown.

Using the SynchronousQueue queue, the submitted tasks will not be saved, but will always be submitted for execution immediately. If the number of threads used to execute the task is less than maximumPoolSize, try to create a new process. If the maximum value set by maximumPoolSize is reached, execute the rejection policy according to the handler you set. Therefore, the tasks submitted by you in this way will not be cached, but will be executed immediately. In this case, you need to have an accurate evaluation of the concurrency of your program before you can set the appropriate number of maximumPoolSize, otherwise it is easy to implement the rejection policy;

2. Bounded task queue: bounded task queue can be implemented using ArrayBlockingQueue, as shown below

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

Using the ArrayBlockingQueue bounded task queue, if a new task needs to be executed, the thread pool will create a new thread. Until the number of threads created reaches corePoolSize, the new task will be added to the waiting queue. If the waiting queue is full, that is, it exceeds the capacity initialized by ArrayBlockingQueue, continue to create threads until the number of threads reaches the maximum number of threads set by maximumPoolSize. If it is greater than maximumPoolSize, execute the reject policy. In this case, the upper limit of the number of threads is directly related to the state of the bounded task queue. If the initial capacity of the bounded queue is large or does not reach the overload state, the number of threads will always remain below the corePoolSize. On the contrary, when the task queue is full, the maximum poolsize will be used as the upper limit of the maximum number of threads.

3. Unbounded task queue: bounded task queue can be implemented using LinkedBlockingQueue, as shown below

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

Using unbounded task queue, the task queue of the thread pool can add new tasks without limit, and the maximum number of threads created by the thread pool is the number set by your corePoolSize, that is, in this case, the parameter maximumPoolSize is invalid, even if many unexecuted tasks are cached in your task queue, when the number of threads in the thread pool reaches corePoolSize, There will be no more; If a new task is added later, it will directly enter the queue and wait. When using this task queue mode, you must pay attention to the coordination and control between task submission and processing, otherwise the tasks in the queue will grow until the last resources are exhausted due to the inability to process them in time.

4. Priority task queue: priority task queue is implemented through PriorityBlockingQueue. Let's demonstrate it through an example

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //Priority task queue
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    
    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
        
    }
    
    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //When comparing the current object with other objects, it returns - 1 if the current priority is high and 1 if the priority is low. The smaller the value, the higher the priority
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    }
    
    public void run() {
        try {
            //Let the thread block and make subsequent tasks enter the cache queue
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

Let's take a look at the results of the implementation

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

You can see that except for the first task, which is directly executed by creating a thread, other tasks are put into the priority task queue and rearranged according to priority, and the number of threads in the thread pool is always corePoolSize, that is, there is only one thread.

From the running code, we can see that PriorityBlockingQueue is actually a special unbounded queue. No matter how many tasks are added, the number of threads created by the thread pool will not exceed the number of corePoolSize, but other queues generally process tasks according to the first in first out rule, The PriorityBlockingQueue queue can customize rules to execute according to the priority of tasks.

2, Reject policy

Generally, when we create a thread pool, in order to prevent resources from being exhausted, the task queue will choose to create a bounded task queue. However, in both modes, if the task queue is full and the number of threads created by the thread pool reaches the maximum number of threads you set, you need to specify the RejectedExecutionHandler parameter of ThreadPoolExecutor, that is, a reasonable rejection policy, To handle the "overload" of the thread pool. The rejection policy of ThreadPoolExecutor is as follows:

1. AbortPolicy policy: this policy will directly throw exceptions to prevent the system from working normally;

2. CallerRunsPolicy policy: if the number of threads in the thread pool reaches the upper limit, the policy will put the tasks in the task queue into the caller thread to run;

3. DiscardOledestPolicy policy: this policy will discard the oldest task in the task queue, that is, the task first added to the current task queue and to be executed immediately, and try to submit again;

4. DiscardPolicy policy: this policy will silently discard tasks that cannot be processed and will not be processed. Of course, with this strategy, the loss of tasks should be allowed in the business scenario;

The above built-in policies implement the RejectedExecutionHandler interface. Of course, you can also extend the RejectedExecutionHandler interface to define your own rejection policy. Let's see the example code below:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //Custom reject policy
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"Reject policy implemented");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //Let the thread block and make subsequent tasks enter the cache queue
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

Output results:

com.hhxx.test.ThreadTask@33909752 Reject policy implemented
com.hhxx.test.ThreadTask@55f96302 Reject policy implemented
com.hhxx.test.ThreadTask@3d4eac69 Reject policy implemented
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

It can be seen that due to the sleep blocking of tasks, it takes some time to execute, resulting in some tasks being discarded, so as to execute the user-defined rejection policy;

3, ThreadFactory custom thread creation

Threads in the thread pool are created through ThreadFactory in ThreadPoolExecutor. Then, by customizing the ThreadFactory, you can make some special settings for the threads created in the thread pool as needed, such as naming and priority. In the following code, we record and name the threads created in the thread pool through ThreadFactory

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //Custom thread factory
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("thread "+r.hashCode()+"establish");
                //Thread naming
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //The name of the output execution thread
        System.out.println("ThreadName:"+Thread.currentThread().getName());
    }
}

Let's look at the output

Thread 118352462 create
 Thread 1550089733 create
 Thread 865113938 create
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
 Thread 1442407170 create
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

You can see that we have recorded, output and named the creation of each thread in the thread pool.

4, ThreadPoolExecutor extension

ThreadPoolExecutor extension is mainly implemented around three interfaces: beforeExecute(), afterExecute() and terminated(),

1. Before execute: the task in the thread pool executes before running

2. After execute: the task in the thread pool is executed after running

3. terminated: executed after the thread pool exits

Through these three interfaces, we can monitor the start and end time of each task, or some other functions. Now we can implement it through code

package com.ywx.utils;

import java.util.concurrent.*;


public class ThreadPoolExecutorTest {
    private static ExecutorService pool;

    public static void main(String[] args) throws InterruptedException {

        //Implement custom interfaces
        pool = new ThreadPoolExecutor(2, 8, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        System.out.println("thread " + r.hashCode() + "establish");
                        //Thread naming
                        Thread th = new Thread(r, "threadPool" + r.hashCode());
                        return th;
                    }
                }, new ThreadPoolExecutor.CallerRunsPolicy()) {

            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("Ready to execute:" + ((ThreadTask) r).getTaskName());
            }
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("Execution completed:" + ((ThreadTask) r).getTaskName());
            }

            protected void terminated() {
                System.out.println("Thread pool exit");
            }
        };

        Long start = System.currentTimeMillis();
        // How many tasks need to be performed
        for (int i = 0; i < 10; i++) {
            pool.execute(new ThreadTask("Ticket number[" + i+"]"));
        }
        Long end = System.currentTimeMillis();
        Long su = end - start;
        System.out.println("time consuming:" + su + "millisecond");
        pool.shutdown();
    }
}

class ThreadTask implements Runnable {
    private String taskName;

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public ThreadTask(String name) {
        this.setTaskName(name);
    }

    public void run() {
        //The name of the output execution thread
        System.out.println("TaskName:" + this.getTaskName() + "By thread[ ThreadName:" + Thread.currentThread().getName()+"]Grab");
    }
}

Let me see the output

Thread 2133927002 create
 Thread 1836019240 create
 Thread 325040804 create
 Ready for execution: ticket No. [0]
Ready for execution: ticket No. [1]
TaskName:Ticket No. [1] is deleted[ ThreadName:threadPool1836019240]Grab
 Execution completed: ticket No. [1]
Ready for execution: ticket No. [7]
TaskName:Ticket No. [0] is deleted by the thread[ ThreadName:threadPool2133927002]Grab
 Thread 1173230247 create
 Execution completed: ticket No. [0]
TaskName:Ticket No. [7] is deleted[ ThreadName:threadPool325040804]Grab
 Execution completed: ticket No. [7]
Ready for execution: ticket No. [4]
TaskName:Ticket No. [4] is cancelled[ ThreadName:threadPool325040804]Grab
 Execution completed: ticket No. [4]
Ready for execution: ticket No. [5]
TaskName:Ticket No. [5] is cancelled[ ThreadName:threadPool325040804]Grab
 Ready for execution: ticket No. [2]
Execution completed: ticket No. [5]
Ready for execution: ticket No. [3]
Ready for execution: ticket No. [6]
TaskName:Ticket No. [2] is deleted[ ThreadName:threadPool1836019240]Grab
TaskName:Ticket No. [6] is cancelled[ ThreadName:threadPool325040804]Grab
 Execution completed: ticket No. [6]
TaskName:Ticket No. [3] is cancelled[ ThreadName:threadPool2133927002]Grab
 Ready for execution: ticket No. [9]
Ready for execution: ticket No. [8]
Execution completed: ticket No. [2]
TaskName:Ticket No. [8] is cancelled[ ThreadName:threadPool1173230247]Grab
 Execution completed: ticket No. [8]
TaskName:Ticket No. [9] is deleted[ ThreadName:threadPool325040804]Grab
 Execution completed: ticket No. [9]
time consuming:1 millisecond
 Execution completed: ticket No. [3]
Thread pool exit

You can see that through the implementation of beforeExecute(), afterExecute() and terminated(), we monitor the running status of threads in the thread pool and output relevant printing information before and after their execution. In addition, using the shutdown method can safely close the thread pool. When the thread pool calls this method, the thread pool will no longer accept subsequent added tasks. However, at this time, the thread pool will not exit immediately until the tasks added to the thread pool have been processed.

5, Number of threads in thread pool

There is no clear indicator for the number of threads to eat. According to the actual situation, it is not a problem as long as it is not set too large or too small. It can be combined with the following formula

/**
             * Nthreads=CPU quantity
             * Ucpu=Target CPU usage, 0 < = ucpu < = 1
             * W/C=Ratio of task waiting time to task calculation time
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

Keywords: Java

Added by _theworks on Mon, 03 Jan 2022 12:13:40 +0200