Java talking about thread pool and its principle from simple to deep

Advantages of thread pool

The main task of thread pool is to control the number of running threads, add tasks to the queue during processing, and then start these tasks after thread creation. If the number of threads exceeds the maximum
A large number of threads are queued to wait for other threads to finish executing and then take tasks out of the queue for execution.

His main characteristics are: reuse of threads; control of maximum concurrent number; management of threads.

First, reduce resource consumption. Reduce the cost of thread creation and destruction by reusing the thread you created.
Second, improve the response speed. When the task arrives, the task can be executed immediately without waiting for the thread and Rouen to love you.
Third: improve the manageability of threads. Thread is a scarce resource. If infinite chuang'a-jin is used, it will not only consume resources, but also reduce the stability of the system. Using thread pool, it can be used for unified allocation, tuning and monitoring.

Architecture diagram

The thread pool in Java is implemented through the Executor framework

Type and use

There are five types of Java thread pools

1.newFixedThreadPool

Create a fixed length thread pool to control the maximum concurrent number of threads. Threads exceeding the limit will wait in the queue. Threads in the pool will exist until a thread is explicitly shut down.

The thread pool created by newFixedThreadPool corePoolSize and MaxmumPoolSize are equal. The LinkedBlockingQueue used by the blocking queue is

Example

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        try {
            for (int i = 1; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();//Close thread pool
        }
    }
}

Result

2.newSingleThreadExecutor

Create a single threaded thread pool, which only uses unique worker threads to execute tasks, ensuring that all tasks are executed in the specified order.

Newsinglethreadexector sets corePoolSize and MaxmumPoolSize to 1, blocking the LinkedBlockingQueue used by the queue

Example

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        try {
            for (int i = 1; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();//Close thread pool
        }
    }
}

Result

3.newCachedThreadPool

Create a cacheable thread pool. If the length of the thread pool exceeds the processing requirements, free threads can be recycled flexibly. If there is no recyclable thread, a new thread can be created.

newCachedThreadPool sets corePoolSize to 0, MaxmumPoolSize to integer.max'value, and the blocking queue uses SynchronousQueue, that is to say, the task creates a thread to run, and if the thread is idle for more than 60 seconds, the thread is destroyed.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        try {
            for (int i = 1; i < 10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();//Close thread pool
        }
    }
}

Result

4.newScheduleThreadPool

Create a thread pool with scheduling, support scheduled and periodic task execution, support scheduled and periodic task execution.

Example

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService threadScheduledPool = Executors.newScheduledThreadPool(5);
        try {
            threadScheduledPool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Delay 1 S Post execution");
                }
            },1, TimeUnit.SECONDS);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadScheduledPool.shutdown();//Close thread pool
        }
    }
}

Result

Source code analysis

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

}

By observing the source code, we can find that the creation of four thread pools depends on a ThreadPoolExecutor class. Open the ThreadPoolExecutor and find that they call the construction methods with seven parameters.

Analysis of seven parameters

1.corepoolsize

The number of resident core threads in the thread pool. A minimum number of threads will be maintained in the thread pool. Even if these threads handle idle state, they will not be destroyed unless allowCoreThreadTimeOut is set.

2.maximumPoolsize

The thread pool can hold the maximum number of simultaneous threads, which is greater than or equal to 1. After a task is submitted to the thread pool, it will be cached in the work queue first. If the work queue is full, a new thread will be created, and then a task will be taken out of the work queue and handed over to the new thread for processing, and the newly submitted task will be put into the work queue. The thread pool does not create new threads unlimited. It has a maximum number of threads, which is specified by maximumpoolsize.

3.keepAliveTime

If a thread is idle and the current number of threads is greater than corePoolSize, the idle thread will be destroyed after the specified time. The specified time here is set by keepAliveTime

4.unit

Units of keepAliveTime

5.workQueue

Task queue, tasks submitted but not yet executed. After the new task is submitted, it will enter the work queue first, and then take the task out of the queue when the task is scheduled.

There are four types of work queues available in the jdk:

  1. ArrayBlockingQueue
  2. LinkedBlockingQuene
  3. SynchronousQuene
  4. PriorityBlockingQueue

If you don't know about blocking queues, take a look at this blog Talk about blocking queue

6.threadFactory

Indicates the thread factory that generates the working threads in the thread pool. Users can create new threads by default

7.handler

Rejection policy, which indicates how to reject when the thread queue is full and the worker thread is greater than or equal to the maximum number of displays in the thread pool (maxnumPoolSize)

Bottom principle

The main four steps are as follows:

  1. After creating the thread pool, wait for the submitted task request.
  2. When the execute() method is called to add a request task, the thread pool will make the following judgment:
    1. If the number of running threads is less than corePoolSize, create a thread to run the task immediately;
    2. If the number of running threads is greater than or equal to corePoolSize, put the task into the queue;
    3. If the queue is full at this time and the number of running threads is less than the maximum poolsize, create a non core thread to run the task immediately;
    4. If the queue is full and the number of running threads is greater than or equal to maximumPoolSize, the thread pool initiates a saturated denial policy to execute.
  3. When a thread completes a task, it takes a task from the queue to execute.
  4. When a thread has nothing to do for more than a certain time (keepAliveTime), the thread pool will judge that if the current number of running threads is greater than corePoolSize, the thread will be stopped.

Refusal strategy

Four rejection policies are provided in the jdk:

  1. CallerRunsPolicy: under this policy, the run method of the rejected task is executed directly in the caller thread. Unless the thread pool has been shut down, the task is discarded directly.
  2. AbortPolicy: under this policy, tasks are discarded directly and RejectedExecutionException exception is thrown.
  3. DiscardPolicy: under this policy, tasks are directly discarded and nothing is done.
  4. DiscardOldestPolicy: under this policy, discard the earliest task in the queue, and then try to put the rejected task in the queue. This is the best rejection policy

Handwriting thread pool

Although JDK provides us with thread pool, we can't use it directly in the actual production process. Alibaba manual has stipulated it.

The reason is very simple, because they use LinkedBlockingQueue at the bottom, and it is a "boundless" blocking queue, and its limit is too large, which will cause the OutOfMenery exception.

Example 1 (using AbortPolicy policy policy)

import java.util.concurrent.*;

public class ThreadPool {
    public static void main(String[] args) {
        int corePoolSize = 2;   //Number of core threads 2
        int maximumPoolSize = 5;    //Maximum threads 5
        long keepAliveTime = 1L;    //Idle time 1s
        TimeUnit unit = TimeUnit.SECONDS;   //Unit second
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3);   //Blocking queue, 3 Max
        ThreadFactory threadFactory = Executors.defaultThreadFactory(); //Thread factory, using default
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();    //Reject strategy, throw exception
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

        try {
            for (int i = 0; i < 9 ; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
}

Result

When we change the number of generated threads from 8 to 9, we will find an exception throwing RejectedExecutionException. Therefore, the current maximum number of connections is < = maximumpoolsize + the maximum queue length. Once the number of connections exceeds, we will throw an exception

Example 2 (using the CallerRunsPolicy policy policy)

import java.util.concurrent.*;

public class ThreadPool {
    public static void main(String[] args) {
        int corePoolSize = 2;   //Number of core threads 2
        int maximumPoolSize = 5;    //Maximum threads 5
        long keepAliveTime = 1L;    //Idle time 1s
        TimeUnit unit = TimeUnit.SECONDS;   //Unit second
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3);   //Blocking queue, 3 Max
        ThreadFactory threadFactory = Executors.defaultThreadFactory(); //Thread factory, using default
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();    //Caller mode, fallback caller
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        try {
            for (int i = 0; i < 9 ; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
}

Result

There is an additional main thread in the running result, which is because the calling thread is fallback for execution after exceeding the maximum value.

Example 3 (using the DiscardOldestPolicy policy policy)

import java.util.concurrent.*;

public class ThreadPool {
    public static void main(String[] args) {
        int corePoolSize = 2;   //Number of core threads 2
        int maximumPoolSize = 5;    //Maximum threads 5
        long keepAliveTime = 1L;    //Idle time 1s
        TimeUnit unit = TimeUnit.SECONDS;   //Unit second
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3);   //Blocking queue, 3 Max
        ThreadFactory threadFactory = Executors.defaultThreadFactory(); //Thread factory, using default
        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy(); 
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        try {
            for (int i = 0; i < 10 ; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
}

Result

There were 10 threads accessing, but only 8 lines were output. In this strategy, he discards the task that enters the queue first, and then joins the latest one.

Example 4 (using the DiscardPolicy policy policy)

import java.util.concurrent.*;

public class ThreadPool {
    public static void main(String[] args) {
        int corePoolSize = 2;   //Number of core threads 2
        int maximumPoolSize = 5;    //Maximum threads 5
        long keepAliveTime = 1L;    //Idle time 1s
        TimeUnit unit = TimeUnit.SECONDS;   //Unit second
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(3);   //Blocking queue, 3 Max
        ThreadFactory threadFactory = Executors.defaultThreadFactory(); //Thread factory, using default
        RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
        ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        try {
            for (int i = 0; i < 10 ; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName()+" invoked");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
    }
}

Result

There were 10 threads accessing, but only 8 lines were output. Although similar to the result of DiscardOldestPolicy, this policy still rejects the latest requests.

Reasonable configuration of thread pool parameters

CPU intensive

Insert: Java get threads

// Get number of threads
System.out.println(Runtime.getRuntime().availableProcessors());

CPU intensive means that the task requires a lot of computation without blocking, and the CPU has been running at full speed.
CPU intensive tasks can only be accelerated (by multithreading) on real multi-core CPUs

Configure CPU intensive tasks with as few threads as possible:
General formula: CPU cores + 1 thread pool

IO intensive

IO intensive, that is, the task requires a lot of IO, that is, a lot of blocking.
Running IO intensive tasks on a single thread will waste a lot of CPU computing power waiting.
So using multithreading in IO intensive tasks can greatly speed up program running. Even on single core CPU, this acceleration mainly uses the wasted blocking time.

Reference 1

When IO intensive, most threads are blocked, so you need to configure multiple threads:
Reference formula: CPU cores / (1-blocking coefficient) [blocking coefficient is between 0.8 and 0.9]
For example, 8-core CPU: 8 / (1-0.9) = 80 threads

Reference 2

Since IO intensive task threads are not always executing tasks, you should configure as many threads as possible, such as CPU cores * 2

62 original articles published, 45 praised, 10000 visitors+
Private letter follow

Keywords: Java JDK less

Added by mcgruff on Tue, 10 Mar 2020 12:27:09 +0200