ThreadPoolExecutor Thread Pool Source Analysis
White teeth want to say
I haven't updated it for a long time, not because I didn't learn, but because I didn't know how to write it. At the same time, there was a voice telling me that public numbers are flying all over the world. There are more people writing public numbers than reading public numbers, and there are too many articles with topic....But I think later on, although there are many similar articles, they are not written by me. Writing by myself will help to sort out the relevant knowledge. If it just helps you, it will be better. So white teeth still encourage you to output more, forcing input backwards through output, and the gains only know when you have done so.
ThreadPoolExecutor class diagram
From the class diagram, ThreadPoolExecutor is an ExecutorService that can perform tasks through threads in the pool
Common Properties
// An important variable in the thread pool, ctl, of type AtomicInteger, where a variable records both the state of the thread pool and the number of threads // Integer has 32 bits, with 3 higher bits representing the state of the thread pool and 29 lower bits representing the number of threads.Default RUNNING state with 0 threads private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Number of threads mask bits, removing bits higher than 3 bits representing the number of threads private static final int COUNT_BITS = Integer.SIZE - 3; // Maximum number of threads, low 29 bit 00011111111111111111111 11111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
You have to admire the big guys who write JDK source code and use a variable so well that the ctl variable expresses both the state of the thread pool and the number of threads through bitwise operations. Here's how
common method
// Calculate the state of the thread pool~CAPACITY is 11100000000000000000000000, which is equivalent to a 3-bit higher value by comparing ctl with~CAPACITY (the 3-bit higher ctl represents the thread pool state mentioned earlier) private static int runStateOf(int c) { return c & ~CAPACITY; } // CAPACITY for counting threads is: 00011111111111111111111111 11, which is equivalent to 29 bits lower by matching CTL with CAPACIITY (ctl low 29 bits are the number of threads mentioned earlier) private static int workerCountOf(int c) { return c & CAPACITY; } // Calculates the value of ct l, either using the thread pool state or the number of threads private static int ctlOf(int rs, int wc) { return rs | wc; }
Is it clever to see these methods often in the source code?
Thread pool life cycle
// Default state, accepts new tasks and handles tasks in the blocked queue private static final int RUNNING = -1 << COUNT_BITS; // Reject new tasks but handle tasks that are blocking the queue private static final int SHUTDOWN = 0 << COUNT_BITS; // Reject new tasks and discard tasks blocking the queue while interrupting tasks being processed private static final int STOP = 1 << COUNT_BITS; // All tasks have been completed, the number of threads is 0, the terminated() method will be called private static final int TIDYING = 2 << COUNT_BITS; // Termination state, after the terminated() method has been called private static final int TERMINATED = 3 << COUNT_BITS;
Remarks
If you want to see the binary representation of the above variable, you can view it by the method Integer.toBinaryString(int i)
State Transition of Thread Pool
RUNNING -> SHUTDOWN: When called shutdown() Method may also be called implicitly finalize() Method (because finalize() Method is also called shutdown() Method) On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP: When called shutdownNow() Method Time On invocation of shutdownNow() SHUTDOWN -> TIDYING: When both queue and thread pool are empty When both queue and pool are empty STOP -> TIDYING: When the thread pool is empty When pool is empty TIDYING -> TERMINATED: When terminated() On method completion When the terminated() hook method has completed
Thread pool constructor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Creating a thread pool is as common as the above parameters, which are briefly described below
corePoolSize: The number of core threads in the thread pool, even if they are idle, unless allowCoreThreadTimeOut is set maximumPoolSize: Maximum number of threads in the thread pool workQueue: Stores blocked queues that have not yet been executed threadFactory: The factory class that creates threads rejectHandle: Deny policy when the maximum number of threads is reached and the task queue is full.The rejection policy is executed.The rejection strategies are AbortPolicy (throw an exception directly), CallerRunsPolicy (the thread on which the caller is executing the task), DiscardOldestPolicy (remove a pending task from the task queue (the earliest submitted task) and DiscardPolicy (discard the task directly). keepAliveTime: Lifetime when the number of threads is greater than the number of core threads and is idle, the maximum time these idle threads can survive
Source Split
submit method
When using a thread pool, we typically call the ThreadPoolExecutor.submit(task) method, hand the task directly to the thread pool to process, and then return to us a Future where we can get the task results later through the Future.get() method
public Future<?> submit(Runnable task) { // Task is empty, throw exception directly if (task == null) throw new NullPointerException(); // Encapsulate tasks as RunnableFuture RunnableFuture<Void> ftask = newTaskFor(task, null); // Perform encapsulated tasks execute(ftask); return ftask; }
NewTaskForMethod
The newTaskFor method encapsulates the task into a RunnableFuture class, and the result is obtained by the get method
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { // This adapts Runnable to a Callable type task. Result is the result returned when the task completes successfully. If a special result is needed, null is sufficient this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); // Here the results of the task and task are adapted through the RunnableAdapter return new RunnableAdapter<T>(task, result); } // The adapter class RunnableAdapter, which we can use for reference static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
execute method
An attempt was made to hand the task over to a thread pool for execution. The thread executing the task may be newly created or may have reused the thread pool.A rejection policy is executed if the thread pool cannot perform the task (for two possible reasons, 1. the thread pool has been closed and 2. the thread has reached its maximum capacity)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps:A summary that retains the original flavor * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // Gets the combined variable ctl representing the state of the thread pool and the number of threads int c = ctl.get(); // Determine if the number of threads is less than the number of core threads and, if less than, create a new core thread to perform the task if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // If the thread pool is in the RUNNAING state, add the task to the blocking queue (code running here means either the number of threads >=the number of core threads or the execution of the addWorder method fails) if (isRunning(c) && workQueue.offer(command)) { // Get the combined variable ctl again and do a second check (because the state of the thread pool may have changed before that) int recheck = ctl.get(); // If the thread pool state is not RNUUAING, remove the task from the blocked task queue and execute the rejection policy if (! isRunning(recheck) && remove(command)) reject(command); // If the number of threads in the thread pool is 0, create a new thread else if (workerCountOf(recheck) == 0) addWorker(null, false); } // If the blocking task queue is full, create a new thread, and if the thread creation fails (i.e., the maximum number of threads reached), execute the rejection policy else if (!addWorker(command, false)) reject(command); }
Summary
When a task is committed to the thread pool, determine if the number of threads running is less than corePoolSize, and if it is less than, create a new thread to process the task even if other threads are idle.
Then come back to the task, and if the core thread is idle, execute the task directly.If the core threads are busy, add tasks to the task queue to be executed.
If the task queue is full and the number of threads running is greater than corePoolSize and less than maximumPoolSize, a new thread is created to execute the task.
addWorker method
First, it checks whether a new worker thread can be created based on the current state of the thread pool and the boundary between the number of threads (core or maximum).If you can, create a new worker thread, start it, and then perform the tasks passed in
/** * Create a new worker * * [@param](https://my.oschina.net/u/2303379) firstTask Tasks submitted to threads can be null to execute first * [@param](https://my.oschina.net/u/2303379) core If true, create threads bounded by the number of core threads is false, create threads bounded by the maximum number of threads * [@return](https://my.oschina.net/u/556800) */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // Get ctl int c = ctl.get(); // Get the state of the thread pool int rs = runStateOf(c); // There are a few judgements here, split into rs>=SHUTDOWN and! (rs == SHUTDOWN && firstTask == null &&! WorkQueue.isEmpty()) // ! (rs == SHUTDOWN && firstTask == null &&! WorkQueue.isEmpty()) is considered against this, as follows: // rs!=SHUTDOWN means greater than shutdown, stop,tidying,terminated // firstTask != null // workQueue.isEmpty() // If the thread pool is closed and meets one of the following conditions, no worker is created // Thread pool is in stop,tidying,terminated state // firstTask != null // workQueue.isEmpty() // Note: If the thread pool is shutdown, firstTask is null, and the queue is not empty, worker creation is allowed // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // Get the number of worker threads int wc = workerCountOf(c); // Do not create a worker when the number of worker threads is greater than the maximum capacity or when the number of worker threads exceeds the boundaries of the number of threads, depending on the value of the core if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // Number of worker threads + 1 via CAS // If it fails here, it means there are concurrent operations if (compareAndIncrementWorkerCount(c)) // Call up a loop to execute the real worker creation logic break retry; // Because of concurrency, the ctl value needs to be read again for status determination // Re-read ctl c = ctl.get(); // If the thread state changes, go back to the external loop if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // Checks passed and worker creation started // Has the worker been started boolean workerStarted = false; // Has a worker been added boolean workerAdded = false; Worker w = null; try { // Wrap the task as a worker, create threads through the thread factory, and finally set the task to the target property of Thread. When the thread's start method is executed, the run method of the corresponding task is executed w = new Worker(firstTask); // Get threads in worker final Thread t = w.thread; if (t != null) { // Get Object Lock final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // If the thread pool is in the Running state or the thread pool is in the shutdown state and the task is null (performing tasks in the task queue) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // precheck that t is startable // Check if the thread is in the startup state and throw an exception for the startup state if (t.isAlive()) throw new IllegalThreadStateException(); // Add a new worker to the worker set workers.add(w); int s = workers.size(); // LarrgestPoolSize records the maximum number of workers that ever existed if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // Start the thread when the new worker is added successfully, followed by analysis if (workerAdded) { t.start(); workerStarted = true; } } } finally { // Thread did not start successfully, roll back the process of creating the thread above if (! workerStarted) // Rollback operations, such as removing a worker from a worker and reducing the number of threads by one addWorkerFailed(w); } return workerStarted; }
addWorkerFailed method
worker thread creation before rollback
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) // Remove worker from workers workers.remove(w); // Reduce the number of threads by one decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
When introducing the addWorker method, there is a logic that if a newly created woker thread is successfully added to the woker thread set, the thread's start method is invoked, and the Worker's run method is actually executed at the end.Because Woker's constructor is a thread created through a thread factory, the analysis follows
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // The default thread factory here is DefaultThreadFactory this.thread = getThreadFactory().newThread(this); }
Create threads through the thread factory, and finally call Thread's constructor, Thread(... Runnable target...), to execute a task to create a thread as a target parameter, then call the Thread.start method to execute the run method, then execute the target.run, similar to a proxy
[@Override](https://my.oschina.net/u/1162528) public void run() { if (target != null) { // This target is the task that was passed in when the thread was created target.run(); } }
Then you can execute our worker's run method, which calls the runWorker method again. Let's look at this method
runWorker method
Continuously take tasks from the task queue and execute them
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Set status to 0 to allow interruption w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // Get tasks from the task queue and execute them while (task != null || (task = getTask()) != null) { // Locks are held here to prevent other threads from calling the shutdown method to close the thread in the thread pool that is executing the task while the task is running w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // If the thread pool state is greater than or equal to stop, that is, runStateAtLeast(ctl.get(), STOP) is true, then make sure the thread is interrupted // Don't look at the conditions behind ||, just judge! wt.isInterrupted(), because the state of the thread pool is paused, make sure the thread is interrupted. If there is no interrupt, interrupt the thread manually, that is, execute wt.interrupt() // If the thread pool state is not stop, that is, runStateAtLeast(ctl.get(), STOP) is false, make sure the thread is not interrupted so that tasks can be performed later // You need to look at the || following (Thread.interrupted() && runStateAtLeast (ctl.get (), STOP)) because to make sure the thread is not interrupted, call Thread.interrupted() to clear the interrupt state. // The state of the thread pool needs to be verified again because shutdownNow may occur if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // Empty method body, subclasses can be implemented, do some specialization work beforeExecute(wt, task); Throwable thrown = null; try { // Execute Tasks task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // Empty method body, subclasses can be implemented, do some specialization work afterExecute(task, thrown); } } finally { task = null; // Count how many tasks the current worker has completed w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // Perform cleanup processWorkerExit(w, completedAbruptly); } }
In the runWorker method, there is a getTask method, which is briefly described below
getTask method
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Return null if either of the following two conditions is satisfied, reducing the number of threads by 1 // 1. Thread pool is already closed // 2. Thread pool is shutdown and queue is empty if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Determine if a thread is timeliness, as mentioned earlier, if allowCoreThreadTimeOut is set to false, threads within the number of core threads will not be closed.If set to true, only keepAliveTime will survive for that long // If the number of threads is greater than the number of core threads, all threads are time-sensitive boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // This logic of judgment is points 1 and 4 summarized by the following methods (I have seen this code for half an hour and the comment on the combined method has finally come to light and I feel awkward) // The timeout and timeout worker thread needs to terminate. // If the task queue is not empty, make sure that the current worker thread is not the last thread in the thread pool (if the task is empty, it is also okay if the current thread is the last thread in the thread pool, because the task queue is empty and the current worker thread is closed without affecting it) // The criteria here can be seen as if (wc > maximumPoolSize | ((timed && timedOut) && (wc > 1 || workQueue.isEmpty()) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // Tasks taken from a queue can be categorized into timeout and non-timeout constraints Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // The interrupt exception is thrown here because it is possible to call the setMaximumPoolSize method to set the maximum number of threads to a smaller number, which may be because the current number of threads is greater than the new maximum number of threads // This closes the extra thread, so re-enter the for loop and return null timedOut = false; } } }
Summary getTask method
1. Return to task
2. Return null, in which case the worker thread needs to exit because the number of threads has decreased. There are four possible reasons for this
(1) The number of threads in the thread pool is greater than the maximum number of threads (because it can be set through the setMaximumPoolSize method)
(2) Thread pool is closed [neither new tasks nor tasks in the task queue are rejected]
(3) Thread pool is shutdown and task queue is empty [new task rejected]
(4) Timeout and timeout worker threads need to terminate.If the task queue is not empty, make sure that the current worker thread is not the last thread in the thread pool (if the task is empty, it is also okay if the current thread is the last thread in the thread pool, because the task queue is empty and the current worker thread is closed without affecting it)
There is also a processWorkerExit method in the runWorker method that performs cleanup, which is described briefly below
ProceWorkerExit method
private void processWorkerExit(Worker w, boolean completedAbruptly) { // If it completes abruptly, you need to adjust the number of threads if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Calculates the number of tasks completed by the thread pool and deletes the current worke r thread from the worke r thread set completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // Trying to set the state of the thread pool to TERMINATED, which will be analyzed later tryTerminate(); int c = ctl.get(); // Thread pool state is at least STOP if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // New worker threads can be created if the current number of threads is less than the number of core threads or if the task queue is not empty but there are no running threads (allowing core threads to timeout) addWorker(null, false); } }
tryTerminate method
final void tryTerminate() { for (;;) { int c = ctl.get(); // You cannot set the state of the thread pool to TERMINATED if you are in any of the following three situations if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // Code is executed here to qualify for termination.But if the number of threads is not zero at this time, interrupt an idle thread to ensure shutdown signaling if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Set thread pool state to TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // Set thread pool state to TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // Activate threads blocked in the calling thread pool by calling awaitTermination series methods termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
If you read this article here carefully in combination with the source code, you should have a little bit of a feeling about how thread pools work?The last thing left is the thread pool shutdown method
shutdown method
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Permission Check checkShutdownAccess(); // Set thread pool state to SHUTDOWN advanceRunState(SHUTDOWN); // Interrupt idle threads interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // Attempt to set thread pool state to TERMINATED tryTerminate(); }
shutdownNow method
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Permission Check checkShutdownAccess(); // Set thread pool state to STOP advanceRunState(STOP); // Interrupt all threads interruptWorkers(); // Move tasks in the task queue to tasks tasks = drainQueue(); } finally { mainLock.unlock(); } // Attempt to set thread pool state to TERMINATED tryTerminate(); return tasks; }
Postnote
This article was written on vacation and is a little comb of thread pool knowledge. Of course, there may be some problems in it. White Teeth wants you to criticize the inheritance. If you find problems, you can go to Public Number (White Teeth every Day) message and point out that you can also add my WeChat (dingaiminIT) to exchange discussions.
Historical article recommendation
Class Loader Knowledge Point Hematemesis
Evolution and optimization of original | ES advertising inverted index structure
Advertising inverted index structure and optimization
High cpu usage and high jvm old usage
The real culprit of frequent FGC s was it
The older generation accounts for 100% more, and vertx-redis-client bug s are found by the way
Network Layer Source Analysis on Kafka Server
How is Redis's expiration strategy implemented?
Origin | If you understand both of HashMap, the interview will be OK
Origin|Interviewer: Do Java objects have to be allocated on the heap?
Origin|This interview question, most people answered it incorrectly
Colleague: Let's Abstract Retry as a tool class
Reference material
- https://juejin.im/entry/59b232ee6fb9a0248d25139a
- Java Asynchronous Programming Practice Welcome to the Public Number, Get the latest articles, and we can communicate and make progress together!