Principle of Java thread pool

preface

Why use thread pool

In a process, thread is a very scarce resource. Frequently creating or destroying threads is also a resource consuming process, so using thread pool can reduce the above processes.
Advantages:

  1. Thread reuse, reduce creation and destruction, and improve performance
  2. Fast response
  3. It can be managed and monitored uniformly

When

  1. Large amount of tasks and need to be asynchronous
  2. The processing time of each task is short

principle

parameter

Seven parameters of thread pool:

  1. corePoolSize (core pool size)
  2. maximumPoolSize (maximum thread pool size)
  3. keepAliveTime
  4. Unit (time unit)
  5. workQueue
  6. threadFactory (thread factory)
  7. handler (reject Policy)

schematic diagram

explain

Here, the parameters of thread pool will be set: number of cores = 2, maximum number = 3, number of queues = 5

  1. If the core pool is empty, a task will enter and be performed in the core pool immediately
  2. If the core pool is full, tasks will enter, and the non core pool will open up threads for processing
  3. If both the core pool and the non core pool are full, there are still tasks to wait in the queue
  4. If the queue is also full, the tasks that want to enter the queue will be processed according to the rejection policy. The default is to report exceptions directly
    Four rejection strategies:
    AbortPolicy:
    Direct throw anomaly
java.util.concurrent.RejectedExecutionException: Task com.company.Thread.MyTask@135fbaa4 rejected from java.util.concurrent.ThreadPoolExecutor@45ee12a7

DiscardPolicy:
Discard the task without throwing an exception.

CallerRunsPolicy:
Discard the task at the top of the queue and resubmit the rejected task

for (int i = 0; i < 8; i++) {
    try {
        threadPoolExecutor.execute(new MyTask(i));
    }
    catch (Exception e){
        e.printStackTrace();
    }
}
threadPoolExecutor.execute(new MyTask(8));

#######################

pool-1-thread-3--7:done!
pool-1-thread-2--1:done!
pool-1-thread-1--0:done!
pool-1-thread-2--4:done!
pool-1-thread-3--3:done!
pool-1-thread-1--5:done!
pool-1-thread-2--6:done!
pool-1-thread-3--8:done!

It can be seen that 8 is the last to submit, but 8 tasks are completed, of which 2 is missing, because 2 is at the front of the queue.

DiscardOldestPolicy:
The calling thread processes the task

code analysis

Seven parameters of constructor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

There are four alternative strategies

  1. CallerRunsPolicy
  2. AbortPolicy
  3. DiscardPolicy
  4. DiscardOldestPolicy
    Default policy: AbortPolicy()
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
...
public static class CallerRunsPolicy implements RejectedExecutionHandler 
...
public static class AbortPolicy implements RejectedExecutionHandler
...
public static class DiscardPolicy implements RejectedExecutionHandler
...
public static class DiscardOldestPolicy implements RejectedExecutionHandler
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
...
public void execute(Runnable command) {
    // The command is empty and throws an exception
    if (command == null)
        throw new NullPointerException();
    // Atomic cas, thread safety
    int c = ctl.get();
    // The number of work is less than the number of cores and starts to work normally
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // Queue full, reject
    else if (!addWorker(command, false))
        reject(command);
}

test

Test code:

public class threadpool {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory());
//        ExecutorService e1 = Executors.newCachedThreadPool();
//        ExecutorService e2 = Executors.newFixedThreadPool(10);
//        ExecutorService e3 = Executors.newSingleThreadExecutor();


        for (int i = 0; i < 9; i++) {
            try {
                threadPoolExecutor.execute(new MyTask(i));
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }


        Queue<Runnable> queue = threadPoolExecutor.getQueue();

        for (int i = 0; i < 10; i++) {
            System.out.println("Number of cores:" + threadPoolExecutor.getCorePoolSize());
            System.out.println("Maximum number:" + threadPoolExecutor.getMaximumPoolSize());
            System.out.println("Number of pools:" + threadPoolExecutor.getPoolSize());
            System.out.println("Number of queues:" + queue.size());
            System.out.println("Default policy:" + threadPoolExecutor.getRejectedExecutionHandler());
            Thread.sleep(60000L);
            System.out.println("*************************************");
        }

        threadPoolExecutor.shutdown();
    }
}

class MyTask implements Runnable{
    int i = 0;

    public MyTask(int i){
        this.i = i;
    }

    @Override
    public void run() {
        try{
            Thread.sleep(60000L);
            System.out.println(Thread.currentThread().getName() + "--" + i + ":done!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

As can be seen from the following results, when the core pool and non core pool are full and the queue is full, the rejection policy will be triggered and an exception will be reported when adding a task.
Every time a task is completed, the tasks in the queue are returned to the core pool until the queue is empty.
result:

java.util.concurrent.RejectedExecutionException: Task com.company.Thread.MyTask@135fbaa4 rejected from java.util.concurrent.ThreadPoolExecutor@45ee12a7[Running, pool size = 3, active threads = 3, queued tasks = 5, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at com.company.Thread.threadpool.main(threadpool.java:17)
Number of cores: 2
 Maximum number: 3
 Number of pools: 3
 Number of queues: 5
 Default policy: java.util.concurrent.ThreadPoolExecutor$AbortPolicy@2503dbd3
*************************************
pool-1-thread-1--0:done!
pool-1-thread-2--1:done!
pool-1-thread-3--7:done!
Number of cores: 2
 Maximum number: 3
 Number of pools: 3
 Number of queues: 2
 Default policy: java.util.concurrent.ThreadPoolExecutor$AbortPolicy@2503dbd3
pool-1-thread-1--2:done!
*************************************
Number of cores: 2
 Maximum number: 3
 Number of pools: 3
pool-1-thread-3--4:done!
pool-1-thread-2--3:done!
Number of queues: 1
 Default policy: java.util.concurrent.ThreadPoolExecutor$AbortPolicy@2503dbd3
*************************************
Number of cores: 2
 Maximum number: 3
pool-1-thread-1--5:done!
pool-1-thread-3--6:done!
Number of pools: 2
 Number of queues: 0
 Default policy: java.util.concurrent.ThreadPoolExecutor$AbortPolicy@2503dbd3

Keywords: Java Back-end

Added by LBmtb on Thu, 03 Feb 2022 09:00:29 +0200