This article is shared from Huawei cloud community< Handwritten thread pool, compare and learn the implementation principle of ThreadPoolExecutor thread pool! >, author: Xiao Fu Ge.
Thanks for the plane, notes!, The last time I suffered a loss on the thread, could this pit twice at a time!
Xie plane: you ask, I'm ready!!!
Interviewer: Well, how is the thread pool state designed to be stored?
Xie plane: this! Next, next!
Interviewer: for the implementation class of Worker, why not use ReentrantLock to implement it, but inherit AQS?
Xie: I!
Interviewer: please briefly describe the execution process of execute!
Xie: bye!
1, Thread pool explanation
1. Let's look at an example first
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10)); threadPoolExecutor.execute(() -> { System.out.println("Hi Thread pool!"); }); threadPoolExecutor.shutdown(); // Executors.newFixedThreadPool(10); // Executors.newCachedThreadPool(); // Executors.newScheduledThreadPool(10); // Executors.newSingleThreadExecutor();
This is an example for creating a thread pool. I believe you have used it many times.
The core purpose of thread pool is to utilize resources and avoid resource consumption caused by repeated thread creation. Therefore, the idea of a pooling technology is introduced to avoid the performance overhead caused by repeated creation and destruction.
Then, let's analyze the structure of this pool through practice and see how it handles threads.
2. Write a thread pool
2.1 implementation process
In order to better understand and analyze the source code of thread pool, let's write a very simple thread pool according to the idea of thread pool.
In fact, most of the time, the core main logic of a piece of function code may not be very complex, but in order to make the core process run smoothly, many additional branch auxiliary processes need to be added. As I often say, I made the ass paper so big to protect my hands!
As for figure 21-1, the implementation of this handwriting thread pool is also very simple, which only reflects the core process, including:
- There are n threads running all the time, which is equivalent to the allowable thread pool size when we create the thread pool.
- Submit the thread to the thread pool for running.
- If the running thread pool is full, put the thread in the queue.
- Finally, when there is idle time, the thread in the queue is obtained to run.
2.2 implementation code
public class ThreadPoolTrader implements Executor { private final AtomicInteger ctl = new AtomicInteger(0); private volatile int corePoolSize; private volatile int maximumPoolSize; private final BlockingQueue<Runnable> workQueue; public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; } @Override public void execute(Runnable command) { int c = ctl.get(); if (c < corePoolSize) { if (!addWorker(command)) { reject(); } return; } if (!workQueue.offer(command)) { if (!addWorker(command)) { reject(); } } } private boolean addWorker(Runnable firstTask) { if (ctl.get() >= maximumPoolSize) return false; Worker worker = new Worker(firstTask); worker.thread.start(); ctl.incrementAndGet(); return true; } private final class Worker implements Runnable { final Thread thread; Runnable firstTask; public Worker(Runnable firstTask) { this.thread = new Thread(this); this.firstTask = firstTask; } @Override public void run() { Runnable task = firstTask; try { while (task != null || (task = getTask()) != null) { task.run(); if (ctl.get() > maximumPoolSize) { break; } task = null; } } finally { ctl.decrementAndGet(); } } private Runnable getTask() { for (; ; ) { try { System.out.println("workQueue.size: " + workQueue.size()); return workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } private void reject() { throw new RuntimeException("Error!ctl.count: " + ctl.get() + " workQueue.size: " + workQueue.size()); } public static void main(String[] args) { ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10)); for (int i = 0; i < 10; i++) { int finalI = i; threadPoolTrader.execute(() -> { try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task No.:" + finalI); }); } } } // test result Task No.: 1 Task No.: 0 workQueue.size: 8 workQueue.size: 8 Task No.: 3 workQueue.size: 6 Task No.: 2 workQueue.size: 5 Task No.: 5 workQueue.size: 4 Task No.: 4 workQueue.size: 3 Task No.: 7 workQueue.size: 2 Task No.: 6 workQueue.size: 1 Task No.: 8 Task No.: 9 workQueue.size: 0 workQueue.size: 0
Above, the implementation of thread pool is still very simple. From the test results, the core idea of pooling can be reflected. The main functional logic includes:
- ctl, used to record the number of threads in the thread pool.
- corePoolSize and maximumPoolSize are used to limit the thread pool capacity.
- workQueue, thread pool queue, that is, threads that cannot be run in time, will be loaded into this queue.
- execute, which is used to submit threads. This is a general interface method. The main implementation in this method is whether the currently submitted thread is added to the worker, queued or abandoned.
- addWorker is mainly the specific operation of class Worker, creating and executing threads. The getTask() method is also included here, which is to continuously obtain the unexecuted threads from the queue.
Well, the above is the concrete embodiment of the implementation of this simple thread pool. But if you think about it carefully, you will find that many improvements are needed here. For example, what about the status of thread pool? It's impossible to run all the time What about the lock of thread pool? Won't there be concurrency problem What about the strategy after thread pool rejection?, These problems are not solved in the main process, and it is precisely because there is no such process that the above code is easier to understand.
Next, we begin to analyze the source code of thread pool. Compared with the simple thread pool reference we implemented, it will be easier to understand!
3. Thread pool source code analysis
3.1 thread pool class diagram
The implementation and inheritance relationship between classes based on the implementation of the core class ThreadPoolExecutor is shown in figure 21-2 thread pool class relationship diagram.
- Interfaces Executor and ExecutorService, which define the basic methods of thread pool. In particular, execute(Runnable command) submits the thread pool method.
- The abstract class AbstractExecutorService implements the basic general interface method.
- ThreadPoolExecutor is the core tool class and method of the whole thread pool. All other classes and interfaces provide their own functions around this class.
- Worker is the task class, that is, the method of the thread that is finally executed.
- RejectedExecutionHandler is a rejection policy interface with four implementation classes; Abortpolicy (reject by throwing exceptions), discardpolicy (discard directly), discardoldestpolicy (discard the task with the longest survival time), callerrunspolicy (who submits and who executes).
- Executors are used to create thread pools with different strategies commonly used by us, including newFixedThreadPool, newCachedThreadPool, newScheduledThreadPool and newsinglethreadexecution.
3.2 high 3 bits and low 29 bits
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
In the ThreadPoolExecutor thread pool implementation class, use the ctl of AtomicInteger type to record the thread pool status and the number of thread pools. Multiple values are recorded on a type. It uses a split data area. The upper 3 bits record the state and the lower 29 bits store the number of threads. The default RUNNING state is 0 threads.
3.2 thread pool status
Figure 22-4 shows the state flow relationship in the thread pool, including the following states:
- RUNNING: RUNNING status, accepting new tasks and processing tasks in the queue.
- Shutdown: shutdown state (shutdown method called). Do not accept new tasks, but process tasks in the queue.
- STOP: STOP status (the shutdown now method is called). Do not accept new tasks, do not process tasks in the queue, and interrupt the task being processed.
- TIDYING: all tasks have been TERMINATED. The workerCount is 0. After the thread pool enters this state, it will call the terminated() method to enter the TERMINATED state.
- TERMINATED: the TERMINATED state, which is the state after the call of the terminated() method ends.
3.3 commit thread (execute)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
When reading this part of the source code, you can refer to our own thread pool. In fact, the ultimate goal is the same, that is, the thread that is submitted can start execution, join the queue and make decision strategy.
- ctl.get() takes the value of recording the thread status and the number of threads. Finally, you need to use the method workerCountOf() to get the current number of threads` workerCountOf performs the C & capability operation
- Compare the number of threads in the current thread pool with the number of core threads corePoolSize. If it is less than, add threads to the task execution queue.
- If the number of threads is full at this time, you need to judge whether the thread pool is running (c). If it is running, put the thread that cannot be executed into the thread queue.
- After being put into the thread queue, it is also necessary to re judge whether the thread is running and remove. If it is not running and removed, the rejection policy will be implemented. Otherwise, add a new thread after judging that the number of threads is 0.
- Finally, try to add task execution again. At this time, the second input parameter of the method addWorker is false, which will eventually affect the judgment of the number of added execution tasks. If the addition fails, reject the policy.
3.5 add execution task (addWorker)
private boolean addWorker(Runnable firstTask, boolean core)
The first part is to increase the number of threads
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
The first part is to create a startup thread
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
The process of adding and executing tasks can be divided into two parts. The upper part of the code is used to record the number of threads, and the lower part of the code is to create and start the execution thread in the exclusive lock. This part of the code is basically the same as the thread pool we started to write without looking at operations such as lock and CAS
- If (RS > = SHUTDOWN & &! (rs = = SHUTDOWN & & firsttask = = null & &! Workqueue. Isempty()), judge whether the current thread pool state is one of SHUTDOWN, STOP, TIDYING and TERMINATED. The current status is SHUTDOWN, the incoming task is null, and the queue is not empty. Then false is returned.
- compareAndIncrementWorkerCount, CAS operation, increase the number of threads, and the success will jump out of the marked loop body.
- runStateOf(c) != rs, and finally the thread pool state judgment to determine whether to cycle.
- After the number of thread pools is recorded successfully, you need to enter the lock phase, create an execution thread, and record the status. Finally, if it is judged that the startup is not successful, you need to execute the addWorkerFailed method and eliminate the thread method.
3.6 execution thread (runWorker)
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // Allow interrupt boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
In fact, with the foundation of handwriting thread pool, you can basically understand what thread pool is doing. The core point here is task Run () makes the thread run. Additional additional processes are as follows;
- Before execute and after execute, make some statistics before and after thread execution.
- In addition, the lock operation here is that the Worker inherits the non reentrant exclusive lock implemented by AQS itself.
- processWorkerExit. If you are interested, you can also have an in-depth understanding of similar methods. It is also very interesting for workers to do some removal processing and the number of tasks completed when the thread exits
3.7 queue acquisition task (getTask)
If you have started reading the source code, you can see such a circular code while (task! = null | (task = gettask()) in the runWorker method= null). This is the same as the way we operate in the handwritten thread pool. The core purpose is to get thread methods from the queue.
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
- The getTask method obtains the tasks waiting to be executed from the blocking queue, that is, the thread methods to get out one by one.
- If (RS > = shutdown... To judge whether the thread is closed.
- wc = workerCountOf(c), WC > corePoolSize. If the number of worker threads exceeds the number of core threads corePoolSize and the workQueue is not empty, increase the number of worker threads. However, if the thread is not obtained after the timeout, the thread larger than corePoolSize will be destroyed.
- Timed is derived from allowCoreThreadTimeOut. When the final timed is true, the timeout is controlled through the poll method of blocking the queue.
- If the task is not obtained within the keepAliveTime time, null is returned. If false, it is blocked.
2, Summary
- This chapter does not completely introduce all the knowledge points of thread pool, otherwise the content will be a little bloated. In this chapter, we start from the handwritten thread pool and gradually analyze how these codes are implemented in the Java thread pool. The knowledge points involved are almost what we have introduced before, including queue, CAS, AQS, reentry lock, exclusive lock and so on. Therefore, these knowledge are basically linked. It's best to have some foundation, otherwise it will be difficult to understand.
- In addition to the introduction in this chapter, we haven't talked about the destruction process of threads, the selection and use of four thread pool methods, and how to configure CPU intensive tasks and IO intensive tasks. In addition, Spring also has its own thread pool method. These knowledge points are very close to practical operation.
Click follow to learn about Huawei's new cloud technology for the first time~