003. Analysis of thread pool application and implementation principle

1. Why use thread pool? Is the more thread pools the better?

  • Thread is an object in java, but also a resource of operating system. It takes time for thread creation and destruction. If create time + destroy time > execute task time, it is not cost-effective.

  • Java objects occupy heap memory, and operating system threads occupy system memory. According to the jvm specification, the default maximum stack size of a thread is 1 M, which needs to be allocated from system memory. Too many threads will consume a lot of memory.

  • The operating system needs to switch thread context frequently (everyone wants to be run), which affects performance.

  • The introduction of thread pool is for the convenience of controlling the number of threads.

2. Thread pool principle - concept

  • Thread pool manager: used to create and manage thread pools, including creating thread pools, destroying thread pools, and adding new tasks.
  • Worker thread: a thread in the thread pool, which is in a waiting state when there is no task, and can execute the task circularly.
  • Task interface: the interface that each task must implement for the worker thread to schedule the execution of the task. It mainly specifies the entry of the task, the ending work after the execution of the task, the execution status of the task, etc.
  • Task queue: used to store unprocessed tasks. Provides a buffer mechanism.

3. Thread pool API - interface definition and implementation class

type name describe
Interface Executor The top-level interface defines the method execute to execute the task
Interface ExecutorService Inherits the Executor interface and extends the Callable, Future and shutdown methods
Interface ScheduledExecutorService Inherits ExecutorService and adds methods related to timed tasks
Implementation class ThreadPoolExecutor Basic and standard implementation of thread pool
Implementation class ScheduledThreadPoolExecutor Inherits ThreadPoolExecutor and implements the method of related timing tasks in ScheduledExecutorService

4. Thread pool API - method definition

ExecutorService

ScheduledExecutorService

5. Thread pool API - Executors tool class

  • You can instantiate the thread pool by yourself or use Executors to create the factory class of the thread pool. The common methods are as follows:
Method describe
newFixedThreadPool(int nThreads) Create a fixed size thread pool with unlimited task queue capacity. Number of core threads = maximum number of threads.
newCachedThreadPool() A buffer thread pool of unlimited size is created. Its task queue is a synchronization queue. Tasks are added to the pool. If there are idle threads in the pool, the idle threads are used for execution. If there are no idle threads, a new thread is created for execution. Threads in the pool that are idle for more than 60 seconds will be destroyed and released. The number of threads varies with the number of tasks. It is suitable for asynchronous tasks with less time-consuming. The number of core threads in the pool = 0, and the maximum number of threads = integer.max_.
newSingleThreadExecutor() A single thread pool with only one thread to execute an unbounded task queue. The thread pool ensures that tasks are executed one by one in the order they are joined. When the only thread aborts due to a task exception, a new thread will be created to continue the subsequent tasks. The difference with newFixedThreadPool(1) is that the pool size of a single thread pool is hard coded in the newsinglethreadexector method and cannot be changed any more.
newScheduledThreadPool(int corePoolSize) A pool of threads that can execute tasks on a regular basis. The number of core threads in this pool is specified by the parameter, and the maximum number of threads = Integer.MAX_VALUE.

6. Thread pool principle - task execute process

  1. Is the number of core threads reached? No, create a worker thread to execute the task.
  2. Is the work queue full? If it is not full, the newly submitted task will be stored in the work queue.
  3. Is the maximum number of thread pools reached? No, create a new worker thread to execute the task.
  4. Finally, a denial policy is implemented to handle this task.

7. Number of threads

How to determine the appropriate number of threads?

  • Computational tasks: 1-2 times the number of CPUs.

  • IO tasks: compared with computational tasks, they need more threads, which should be determined according to specific IO blocking. For example, the default maximum number of threads in tomcat is 200. You can also consider automatically increasing or decreasing the number of threads between a minimum number and a maximum number as needed.

8. Code demonstration

package com.study.hc.thread.chapter1.thread;

import sun.nio.ch.ThreadPool;

import java.util.List;
import java.util.concurrent.*;

/**
 * Use of thread pool
 */
public class Demo9 {

    /**
     * Test, submit 15 tasks with execution time of 3 seconds to see the status of thread pool
     * @param threadPoolExecutor Enter different thread pools to see different results
     * @throws Exception
     */
    private void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
        // Test, submit 15 tasks with execution time of 3 seconds, and see the processing of 2 tasks over the size
        for (int i = 0; i < 15; i ++) {
            int n = i;
            threadPoolExecutor.submit(() -> {
                try {
                    System.out.println("Start execution:" + n);
                    Thread.sleep(3000);
                    System.out.println("End of execution:" + n);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            System.out.println("Task submitted successfully:" + n);
        }
        // View the number of threads and queue waits
        Thread.sleep(500);
        System.out.println("The current number of thread pool threads is:" + threadPoolExecutor.getPoolSize());
        System.out.println("The current number of thread pool waits is:" + threadPoolExecutor.getQueue().size());
        // Wait 15 seconds to see the number of threads and the number of waits (in theory, threads that exceed the number of core threads are automatically destroyed)
        Thread.sleep(15000);
        System.out.println("The current number of thread pool threads is:" + threadPoolExecutor.getPoolSize());
        System.out.println("The current number of thread pool waits is:" + threadPoolExecutor.getQueue().size());
    }

    /**
     * 1. Thread pool information: the number of core threads is 5, the maximum number is 10, and there is no boundary queue. The survival time of threads exceeding the number of core threads is 5 seconds. Specify the rejection policy
     * @throws Exception
     */
    private void threadPoolExecutorTest1() throws Exception {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
                5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        testCommon(threadPoolExecutor);
        // Expected result: the number of threads in the thread pool is 5, and the tasks exceeding the number will enter the queue and wait to be executed
    }

    /**
     * 2. Thread pool information: the number of core threads is 5, the maximum number is 10, the queue size is 3, the survival time of threads exceeding the number of core threads is 5 seconds, and the rejection policy is specified
     * @throws Exception
     */
    private void threadPoolExecutorTest2() throws Exception {
        // Create a thread pool with 5 core threads, 10 maximum threads and 3 maximum waiting queues, that is to say, it can hold up to 13 tasks
        // The default policy is to throw RejectedExecutionException exception, java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), (r, executor) -> System.out.println("A task has been rejected"));
        testCommon(threadPoolExecutor);
        // Expected results:
        // 1. 5 tasks are directly assigned to threads to start execution
        // 2. Three tasks enter the waiting queue
        // 3. The queue is not enough, temporarily add 5 threads to execute the task (destroy if there is no work in 5 seconds)
        // 4. The queue and thread pool are full. There are two tasks left. They are out of resources and are rejected
        // 5. Task execution. After 5 seconds, if there is no task to execute, destroy the temporarily created 5 threads
    }

    /**
     * 3. Thread pool information: number of core threads 5, maximum 5, unbounded queue, thread survival time exceeding the number of core threads: 0 second
     * @throws Exception
     */
    private void threadPoolExecutorTest3() throws Exception {
        // Same as Executors.newFixedThreadPool(int nThreads)
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        testCommon(threadPoolExecutor);
        // Expected result: the number of threads in the thread pool is 5, and the tasks exceeding the number will enter the queue and wait to be executed
    }

    /**
     * 4. Thread pool information: number of core threads 0, maximum number integer.max'value, SynchronousQueue queue, survival time of threads exceeding the number of core threads: 60 seconds
     * @throws Exception
     */
    private void threadPoolExecutorTest4() throws Exception {
        /*
        SynchronousQueue,In fact, it is not a real queue, because it does not maintain storage space for elements in the queue. Unlike other queues, it maintains a set of threads waiting to add or remove elements from the queue
        Under the premise of using SynchronousQueue as work queue, when client code submits tasks to thread pool,
        And there is no idle thread in the thread that can take a task out of the SynchronousQueue queue
        The corresponding call to the offer method fails (that is, the task is not put into the work queue)
        At this time, the ThreadPoolExecutor will create a new worker thread to process the failed task (assuming that the thread pool size has not reached its maximum thread pool size at this time)
         */

        // Same as Executors.newCachedThreadPool()
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new SynchronousQueue<>());
        testCommon(threadPoolExecutor);
        // Expected results:
        // 1. The number of threads in the thread pool is 15. Tasks exceeding the number will enter the queue and wait to be executed
        // 2. After 60 seconds, if there is no task to execute, all threads are destroyed and the pool size is restored to 0
        Thread.sleep(60000);
        System.out.println("60 In seconds, look at the number in the thread pool:" + threadPoolExecutor.getPoolSize());
    }

    /**
     * 5. Regularly execute the thread pool information: execute in 3 seconds, one-time task, execute at the point
     * The number of core threads is 5, and the maximum number is integer.max'value, DelayedWorkQueue. The survival time of threads exceeding the number of core threads is 0 second
     */
    private void threadPoolExecutorTest5() {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
        scheduledThreadPoolExecutor.schedule(() -> {
            System.out.println("The task is executed at:" + System.currentTimeMillis());
        }, 3, TimeUnit.SECONDS);
        System.out.println("Scheduled task, submitted successfully on:" + System.currentTimeMillis() + ",Current number of thread pools:" + scheduledThreadPoolExecutor.getPoolSize());
        // Expected result: the task is executed once in 3 seconds
    }

    /**
     * 6. Scheduled execution thread pool information: fixed number of threads 5
     * Number of core threads 5, maximum number integer.max'value, DelayedWorkQueue delay queue, thread survival time exceeding the number of core threads: 0 second
     */
    private void threadPoolExecutorTest6() {
        ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
        // Execute a task periodically, and the thread pool provides two-week scheduling
        // Test scenario: the submitted task takes 3 seconds to complete
        // Effect 1: after the submission, the first execution starts in 2 seconds, and then every other second, the fixed execution (if it is found that the last execution has not been completed, wait for it to be completed, and then execute it immediately after it is completed)
        // That is to say, in this code, execute once every 3 seconds (calculation method: execute every 3 seconds, with an interval of 1 second, start the next execution immediately after execution, without waiting)
        threadPoolExecutor.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task-1 Executed at:" + System.currentTimeMillis());
        }, 2000, 1000, TimeUnit.MILLISECONDS);

        // Effect 2: after submission, the first execution starts in 2 seconds, and then every 1 second, the fixed execution starts (if it is found that the last execution is not finished, wait for it to finish, wait for the last execution to finish, and then start timing, wait for 1 second)
        // That is to say, the effect of this code is as follows: execute once in 4 seconds (calculation method: execute every time for 3 seconds, interval time is 1 second, wait for 1 second after execution, so it is 3 + 1)
        threadPoolExecutor.scheduleWithFixedDelay(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task-2 Executed at:" + System.currentTimeMillis());
        }, 2000, 1000, TimeUnit.MILLISECONDS);
    }

    /**
     * 7. Terminate thread: thread pool information: number of core threads: 5, maximum number: 10, queue size: 3, thread survival time beyond the number of core threads: 5 seconds, specify the rejection policy
     */
    private void threadPoolExecutorTest7() throws InterruptedException {
        // Create a thread pool with 5 core threads, 10 maximum threads and 3 maximum waiting queues, that is to say, it can hold up to 13 tasks
        // The default policy is to throw RejectedExecutionException exception, java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), (r, executor) -> {
                    System.out.println("A task has been rejected");
                });
        // Test: submit 15 tasks that take 3 seconds to execute. Check the processing of 2 tasks that exceed the size
        for (int i = 0; i < 15; i ++) {
            int n = i;
            threadPoolExecutor.submit(() -> {
                try {
                    System.out.println("Start execution:" + n);
                    Thread.sleep(3000);
                    System.out.println("end of execution: " + n);
                } catch (InterruptedException e) {
                    System.out.println("Exception:" + e.getMessage());
                }
            });
            System.out.println("Task submitted successfully:" + n);
        }

        // Terminate thread pool in 1 second
        Thread.sleep(1000);
        threadPoolExecutor.shutdown();
        // Failed to submit prompt again
        threadPoolExecutor.submit(() -> {
            System.out.println("Append a task");
        });

        // Results analysis:
        // 1. 10 tasks are executed, 3 tasks enter the waiting queue, and 2 tasks are rejected
        // 2. After calling shutdown, do not receive new tasks, wait for 13 tasks to finish
        // 3. Additional tasks cannot be submitted after the online process pool is closed and will be rejected
    }

    /**
     * 8. Terminate thread immediately: thread pool information: number of core threads: 5, maximum number: 10, queue size: 3, thread survival time beyond the number of core threads: 5 seconds, specify the rejection policy
     * @throws InterruptedException
     */
    private void threadPoolExecutorTest8() throws InterruptedException {
        // Create a thread pool with 5 core threads, 10 maximum threads and 3 maximum waiting queues, that is to say, it can hold up to 13 tasks
        // The default policy is to throw RejectedExecutionException exception, java.util.concurrent.ThreadPoolExecutor.AbortPolicy
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), (r, executor) -> {
            System.out.println("A task has been rejected");
        });
        // Test: submit 15 tasks that take 3 seconds to execute. Check the processing of 2 tasks that exceed the size
        for (int i = 0; i < 15; i++) {
            int n = i;
            threadPoolExecutor.submit(() -> {
                try {
                    System.out.println("Start execution:" + n);
                    Thread.sleep(3000);
                    System.out.println("end of execution: " + n);
                } catch (InterruptedException e) {
                    System.out.println("Exception:" + e.getMessage());
                }
            });
            System.out.println("Task submitted successfully:" + n);
        }

        // Terminate thread pool in 1 second
        Thread.sleep(1000);
        List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
        // Failed to submit prompt again
        threadPoolExecutor.submit(() -> {
            System.out.println("Append a task");
        });
        System.out.println("Unfinished tasks are:" + shutdownNow.size());

        // Results analysis:
        // 1. 10 tasks are executed, 3 tasks enter the waiting queue, and 2 tasks are rejected
        // 2. After calling shutdown now, three threads in the queue will no longer execute and 10 threads will be terminated

    }

    public static void main(String[] args) throws Exception {
//        new Demo9().threadPoolExecutorTest1();
//        new Demo9().threadPoolExecutorTest2();
//        new Demo9().threadPoolExecutorTest3();
//        new Demo9().threadPoolExecutorTest4();
//        new Demo9().threadPoolExecutorTest5();
//        new Demo9().threadPoolExecutorTest6();
//        new Demo9().threadPoolExecutorTest7();
        new Demo9().threadPoolExecutorTest8();
    }
}

Keywords: Programming Java jvm less Tomcat

Added by Miichael on Tue, 17 Dec 2019 13:53:25 +0200