Multithreading tutorial ThreadPoolExecutor

Multithreading tutorial (32) ThreadPoolExecutor

1. Thread pool status

ThreadPoolExecutor uses the upper 3 bits of int to indicate the thread pool status and the lower 29 bits to indicate the number of threads

Status nameHigh 3 bitsReceive new tasksProcessing blocking queue tasksexplain
RUNNING111YY
SHUTDOWN000NYNew tasks are not received, but the remaining tasks in the blocking queue are processed
STOP001NNInterrupts the task in progress and discards the blocking Queue task
TIDYING010--The task is fully executed, and the active thread is 0. It is about to enter the end
TERMINATED011--End state

Numerically, terminated > tidying > stop > shutdown > running
This information is stored in an atomic variable ctl. The purpose is to combine the thread pool state and the number of threads into one, so that it can be assigned with a cas atomic operation

// c is the old value and ctlOf returns the new value
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs is the upper 3 bits representing the thread pool status, wc is the lower 29 bits representing the number of threads, and ctl is to merge them
private static int ctlOf(int rs, int wc) { return rs | wc; }

2. Construction method

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

  • corePoolSize number of core threads (maximum number of threads reserved)
  • maximumPoolSize maximum number of threads
  • keepAliveTime lifetime - for emergency threads
  • Unit time unit - for emergency threads
  • workQueue blocking queue
  • threadFactory thread factory - a good name for thread creation
  • handler reject policy
operation mode


  • There are no threads in the thread pool at first. When a task is submitted to the thread pool, the thread pool will create a new thread to execute the task.

  • When the number of threads reaches the corePoolSize and no thread is idle, add a task, and the newly added task will be added to the workQueue queue until there are idle threads.

  • If a bounded queue is selected for the queue, when the task exceeds the queue size, a maximum poolsize - corepoolsize (difference) number of threads will be created for emergency rescue.

  • If the thread reaches maximumPoolSize and there are still new tasks, the reject policy will be executed. The denial policy jdk provides four implementations, and other well-known frameworks also provide implementations.

    • AbortPolicy allows the caller to throw a RejectedExecutionException exception, which is the default policy
    • CallerRunsPolicy lets the caller run the task
    • Discard policy abandons this task
    • DiscardOldestPolicy discards the earliest task in the queue and replaces it with this task
    • The implementation of Dubbo will record the log and dump the thread stack information before throwing the RejectedExecutionException exception, which is convenient to locate the problem
    • Netty's implementation is to create a new thread to perform tasks
    • The implementation of ActiveMQ, with timeout waiting (60s), tries to put into the queue, which is similar to our previously customized rejection policy
    • The implementation of PinPoint, which uses a rejection policy chain, will try each rejection policy in the policy chain one by one
  • After the peak has passed, if the emergency thread exceeding the corePoolSize has no task to do for a period of time, it needs to end to save resources. This time is controlled by keepAliveTime and unit.

According to this construction method, the JDK Executors class provides many factory methods to create thread pools for various purposes

3.newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

characteristic

  • Number of core threads = = maximum number of threads (no emergency threads were created), so there is no timeout

  • The blocking queue is unbounded and can hold any number of tasks

The evaluation is applicable to tasks with known task volume and relatively time-consuming

4.newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

characteristic

  • The number of core threads is 0,

  • The maximum number of threads is integer MAX_ Value, the idle lifetime of the emergency thread is 60s, which means

    • All are emergency threads (can be recycled after 60s)
    • Emergency threads can be created indefinitely
  • The queue is implemented by SynchronousQueue, which is characterized by no capacity and no thread to get it. It can't be put in (pay money and deliver goods)

SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
    try {
        log.debug("putting {} ", 1);
        integers.put(1);
        log.debug("{} putted...", 1);
        log.debug("putting...{} ", 2);
        integers.put(2);
        log.debug("{} putted...", 2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t1").start();
sleep(1);
new Thread(() -> {
    try {
        log.debug("taking {}", 1);
        integers.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t2").start();
sleep(1);
new Thread(() -> {
    try {
        log.debug("taking {}", 2);
        integers.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t3").start();

output

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

The evaluation of the whole thread pool shows that the number of threads will continue to increase according to the number of tasks, and there is no upper limit. When the task is completed and idle for 1 minute, the threads will be released. It is suitable for the situation that the number of tasks is relatively intensive, but the execution time of each task is short

5. newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Usage scenario:

Multiple tasks are expected to be queued. When the number of threads is fixed to 1 and the number of tasks is more than 1, it will be put into an unbounded queue. When the task is completed, the only thread will not be released.

difference:

  • If a thread fails to create a task pool, it will also terminate its own task pool if it fails to create a task pool

  • Executors. The number of threads of newsinglethreadexecutor() is always 1 and cannot be modified

    • FinalizableDelegatedExecutorService applies the decorator mode, which only exposes the ExecutorService interface, so it cannot call the unique methods in ThreadPoolExecutor
  • Executors.newFixedThreadPool(1) is initially 1 and can be modified later

    • External exposure is ThreadPoolExecutor object, which can be changed by calling setCorePoolSize and other methods.

6. Submit tasks

// Perform tasks
void execute(Runnable command);
// Submit the task and obtain the task execution result with the return value Future
<T> Future<T> submit(Callable<T> task);
// Submit all tasks in tasks
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
// Submit all tasks in tasks with timeout
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;
// Submit all tasks in the tasks. Which task is successfully executed first will return the execution result of this task, and other tasks will be cancelled
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;
// Submit all tasks in the tasks. Which task is successfully executed first will return the execution result of this task. Other tasks will be cancelled with timeout
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

7. Close thread pool

shutdown
/*
The thread pool state changes to SHUTDOWN
- No new tasks will be received
- But the submitted tasks will be completed
- This method does not block the execution of the calling thread
*/
void shutdown();
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // Modify thread pool status
        advanceRunState(SHUTDOWN);
        // Only idle threads are interrupted
        interruptIdleWorkers();
        onShutdown(); // Extension point ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // Try to terminate (threads that are not running can be terminated immediately, and if there are still running threads, they will not wait)
    tryTerminate();
}
shutdownNow
/*
The thread pool status changes to STOP
- No new tasks will be received
- The tasks in the queue are returned
- And interrupt the executing task in the way of interrupt
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // Modify thread pool status
        advanceRunState(STOP);
        // Break all threads
        interruptWorkers();
        // Get the remaining tasks in the queue
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // Try to end
    tryTerminate();
    return tasks; }
Other methods
// If the thread pool is not in the RUNNING state, this method returns true
boolean isShutdown();
// Is the thread pool status TERMINATED
boolean isTerminated();
// After calling shutdown, the calling thread will not wait for all tasks to finish running, so if it wants to do something after the thread pool is TERMINATED
 You can use this method to wait
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

8. Task scheduling thread pool

Before the "task scheduling thread pool" function is added, you can use Java util. Timer is used to realize the timing function. The advantage of timer is that it is simple and easy to use. However, since all tasks are scheduled by the same thread, all tasks are executed in series. Only one task can be executed at the same time, and the delay or exception of the previous task will affect the subsequent tasks.

public static void main(String[] args) {
    Timer timer = new Timer();
    TimerTask task1 = new TimerTask() {
        @Override
        public void run() {
            log.debug("task 1");
            sleep(2);
        }
    };
    TimerTask task2 = new TimerTask() {
        @Override
        public void run() {
            log.debug("task 2");
        }
    };
    // Use timer to add two tasks, hoping that they will be executed in 1s
    // However, since there is only one thread in the timer to execute the tasks in the queue in sequence, the delay of "task 1" affects the execution of "task 2"
    timer.schedule(task1, 1000);
    timer.schedule(task2, 1000);
}

output

20:46:09.444 c.TestTimer [main] - start... 
20:46:10.447 c.TestTimer [Timer-0] - task 1 
20:46:12.448 c.TestTimer [Timer-0] - task 2

Overwrite with ScheduledExecutorService:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// Add two tasks and expect them to be executed in 1s
executor.schedule(() -> {
    System.out.println("Task 1, execution time:" + new Date());
    try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
    System.out.println("Task 2, execution time:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);

output

Task 1, execution time: Thu Jan 03 12:45:17 CST 2019 
Task 2, execution time: Thu Jan 03 12:45:17 CST 2019

scheduleAtFixedRate example:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);

Output:

21:45:43.167 c.TestTimer [main] - start... 
21:45:44.215 c.TestTimer [pool-1-thread-1] - running... 
21:45:45.215 c.TestTimer [pool-1-thread-1] - running... 
21:45:46.215 c.TestTimer [pool-1-thread-1] - running... 
21:45:47.215 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate example (task execution time exceeds the interval):

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
    log.debug("running...");
    sleep(2);
}, 1, 1, TimeUnit.SECONDS);

Output analysis: at first, the delay is 1s. Next, because the task execution time > interval time, the interval is "supported" to 2s

21:44:30.311 c.TestTimer [main] - start... 
21:44:31.360 c.TestTimer [pool-1-thread-1] - running... 
21:44:33.361 c.TestTimer [pool-1-thread-1] - running... 
21:44:35.362 c.TestTimer [pool-1-thread-1] - running... 
21:44:37.362 c.TestTimer [pool-1-thread-1] - running...

scheduleWithFixedDelay example:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {
    log.debug("running...");
    sleep(2);
}, 1, 1, TimeUnit.SECONDS);

Output analysis: at the beginning, the delay is 1s. The interval of scheduleWithFixedDelay is the end of the previous task < - > delay < - > the start of the next task, so the interval is 3s

21:40:55.078 c.TestTimer [main] - start... 
21:40:56.140 c.TestTimer [pool-1-thread-1] - running... 
21:40:59.143 c.TestTimer [pool-1-thread-1] - running... 
21:41:02.145 c.TestTimer [pool-1-thread-1] - running... 
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

The evaluation of the whole thread pool shows that when the number of threads is fixed and the number of tasks is more than the number of threads, it will be put into an unbounded queue. After the task is executed, these threads will not be released. Used to perform tasks that are delayed or repeated

9. Correctly handle the abnormal execution of tasks

Method 1: actively catch exceptions

ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
    try {
        log.debug("task1");
        int i = 1 / 0;
    } catch (Exception e) {
        log.error("error:", e);
    }
});		

output

21:59:04.558 c.TestTimer [pool-1-thread-1] - task1 
21:59:04.562 c.TestTimer [pool-1-thread-1] - error: 
java.lang.ArithmeticException: / by zero 
 at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28) 
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)

Method 2: use Future

ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
    log.debug("task1");
    int i = 1 / 0;
    return true;
});
log.debug("result:{}", f.get());

output

21:54:58.208 c.TestTimer [pool-1-thread-1] - task1 
Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.ArithmeticException: / by zero 
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
 at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
 at cn.itcast.n8.TestTimer.main(TestTimer.java:31) 
Caused by: java.lang.ArithmeticException: / by zero 
 at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28) 
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)

Keywords: Java Cache

Added by sheffrem on Thu, 03 Mar 2022 12:48:27 +0200