Prospect review
In the previous article, we specifically analyzed the submit, invokeAll and invokeAny methods in the abstract parent class AbstractExecutorService of the thread pool through the inheritance relationship of the thread pool. In this article, we will focus on the specific implementation of ThreadPoolExecutor. Through source code analysis, we will understand how the seven parameters operate in the source code.
Usage scenario
Let's first review the business code in the actual scenario. Next, we simulate 10 threads processing tasks in parallel, then stop the thread pool acceptance, and finally wait for the thread pool to close.
public static void main(String[] args) throws InterruptedException { // Open thread pool ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); // Enable parallel processing of 10 tasks for (int i = 0; i < 10; i++) { executor.execute(() -> { // Simulated service code try { Thread.sleep(1000); System.out.println("End of task"); } catch (InterruptedException e) { e.printStackTrace(); } }); } // Pause thread pool task reception executor.shutdown(); // Wait for the end of the thread pool executor.awaitTermination(1,TimeUnit.MINUTES); }
Constructor
A total of 4 constructors are overloaded and default parameters are set. This design idea can be used for reference. Only two important constructors are shown below.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, // Set the default factory. The returned threads in the factory have normal priority and are non daemon threads Executors.defaultThreadFactory(), // The default rejection policy is to throw an exception directly when rejection occurs defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // Judge parameter boundary if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // Setting the security manager is beyond the scope of this article this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); // Set the number of core threads this.corePoolSize = corePoolSize; // Set the maximum number of threads this.maximumPoolSize = maximumPoolSize; // Set up work queues this.workQueue = workQueue; // Set thread idle time this.keepAliveTime = unit.toNanos(keepAliveTime); // Set thread factory this.threadFactory = threadFactory; // Set rejection policy this.handler = handler; }
execute method
public void execute(Runnable command) { // Boundary judgment if (command == null) throw new NullPointerException(); /** Judge whether the current number of worker threads is less than the number of core threads ctl here can be considered as saving the number of worker threads and thread pool status of the thread pool Why can a variable represent two states? It will be explained later **/ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // If a worker thread is added, it returns whether it is successfully created. If it is successfully created, it returns if (addWorker(command, true)) return; /** If the creation fails, the thread pool status will be retrieved. The specific status of the thread pool will be described below The reason for this is that execute is a thread safe method Then there will be multithreaded calls, during which the thread pool state may change, close or add new tasks So get the thread pool state again and keep it up to date **/ c = ctl.get(); } /** Running to this indicates that the current number of worker threads is greater than the number of core threads or the creation of worker threads is unsuccessful (thread pool is not Running) Judge whether the current thread is running and whether the task queue is successfully added **/ if (isRunning(c) && workQueue.offer(command)) { // Recheck thread pool status int recheck = ctl.get(); // If it is not running and can be deleted, the task will be rejected if (! isRunning(recheck) && remove(command)) reject(command); // If the number of worker threads is 0, the worker thread is added. This happens when the worker thread is destroyed in idle time else if (workerCountOf(recheck) == 0) addWorker(null, false); } /** Running this description, the thread pool status is not running or adding the task queue is not successful Then try to add a worker thread. If the addition is unsuccessful, reject the task **/ else if (!addWorker(command, false)) reject(command); }
Summary: after reading the execute method, we can conclude that the thread pool will first add core working threads when the number of core threads is less than, add non core working threads when the task queue cannot add tasks, and reject tasks when the thread pool is in non running state or when the task queue is full and the working threads are full.
Review the question we raised in the previous article: when we create a thread pool with 10 core threads and 20 maximum threads, the task queue is an unbounded queue, and 30 tasks come at the same time.
Question 1: what is the number of threads in the thread pool?
Question 2: if I change the task queue to a queue with a size of 20, how many requests can I receive at most now?
Through reading the source code, we can now easily answer these two questions.
- Question 1: according to the source code, the first 10 tasks directly create the core worker thread. Because the task queue is unbounded, the last 20 tasks directly join the task queue and wait for the core worker thread to consume.
- Problem 2: if the task queue is changed to a queue with a capacity of 20, the maximum number of requests (maximum threads + queue capacity) = 40 can be accepted.
When reading the execute method, we regard the ctl attribute and addWorker as a black box. We only judge what the method does through the author's comments and method naming. We all know that execute is a thread safe method that can be called by different threads, but we don't find the locked part in the source code, My friends must be very curious about how these underlying methods do this.
CTL
// The type is atomic integer class. Addition, deletion, modification and query are atomic operations private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // There are 32-3 = 29 bits in total private static final int COUNT_BITS = Integer.SIZE - 3; // Represents a maximum capacity of 2 ^ 29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // Store the thread pool status in the high bit of the integer byte, and the high 3 bits represent the thread pool status private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Since the upper 3 bits represent the thread pool state, this method changes the lower 29 bits to 0 to get the upper 3 bits private static int runStateOf(int c) { return c & ~CAPACITY; } // As above, change the upper 3 bits to 0 to get the number of worker threads private static int workerCountOf(int c) { return c & CAPACITY; } // Or operation of rs and wc private static int ctlOf(int rs, int wc) { return rs | wc; } // Judge whether c is less than s private static boolean runStateLessThan(int c, int s) { return c < s; } // Judge whether c is greater than or equal to s private static boolean runStateAtLeast(int c, int s) { return c >= s; } // Because only Running is less than shutdown, it can be judged by this method private static boolean isRunning(int c) { return c < SHUTDOWN; } // Try to use CAS to give ctl+1 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } // Try to use CAS to give ctl-1 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
From the source code of ctl above, we can see that the author divides an integer variable into two parts. One part is used to represent the thread pool state, the other part is used to represent the current number of working threads, the higher 3 bits represent the current state of the thread pool, and the last 29 bits represent the size of the thread pool. Through here, the interviewer can ask how much the maximum number of threads can be set, and kill another one.
There may be small partners who do not understand bit operations, do not understand the logic of this section, and shift left and right bit operations, but in fact, they understand the general functions of the method first, which does not affect the reading of the later source code.
Due to the limited space of this article, it is recommended that you want to inquire about the data of query operation.
Thread pool status
In the ctl attribute section, we will find the following enumeration states. What do they mean?
- RUNNING: allows you to receive new tasks and process tasks in the task queue.
- SHUTDOWN: do not receive new tasks but process tasks in the task queue.
- STOP: do not receive new tasks, do not process tasks in the task queue, or interrupt tasks in process
- TIDYING: all tasks have ended, the number of worker threads is 0, and the terminated() hook method will be called
- TERMINATED: terminated() hook method executed successfully
The status in the process pool flows like this:
- Running - > shutdown: call the shutdown() method of the thread pool.
- (running / shutdown) - > stop: call the shutdown now() method of the thread pool.
- Shutdown - > tidying: when both task queue and worker thread are empty.
- Stop - > tidying: when the worker thread is empty.
- Tidying - > terminated: when the terminated() hook method is executed successfully.
addWorker
This method will create worker threads and control the number of created threads to the number of core threads or the maximum number of threads. The firstTask is the first task executed after the worker thread is successfully created. The second parameter represents whether it is a core worker thread and finally returns whether the thread is successfully created.
private final HashSet<Worker> workers = new HashSet<Worker>(); private final ReentrantLock mainLock = new ReentrantLock(); private boolean addWorker(Runnable firstTask, boolean core) { // Flag the outermost loop, and the loop is dead retry: for (;;) { // Get ctl value int c = ctl.get(); // Get thread pool status int rs = runStateOf(c); if ( // Judge whether it is one of SHUTDOWN, STOP, TIDYING and TERMINATED rs >= SHUTDOWN && /** This logic returns true only when SHUTDOWN, no task needs to be executed and the task queue is not empty Represents the need to continue adding work queues to perform tasks in the task queue **/ ! ( // The status is SHUTDOWN rs == SHUTDOWN && // No first task to perform firstTask == null && // Task queue is not empty ! workQueue.isEmpty()) ) return false; // Dead cycle for (;;) { // Gets the current number of worker threads int wc = workerCountOf(c); if ( // If the current quantity exceeds the maximum capacity, it will be returned directly wc >= CAPACITY || // If it is created as a core worker thread, it is compared with the maximum core thread size; otherwise, it is compared with the maximum number of threads wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS increases the number of worker threads and adds more than the end of the outermost loop if (compareAndIncrementWorkerCount(c)) break retry; // CAS execution failed, the value changed, and the CTL value needs to be read again c = ctl.get(); // If the thread pool state changes, re execute the outermost loop if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // Create worker thread w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // Because this code will operate on the HashSet, the reentry lock is used for locking final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Retrieve status int rs = runStateOf(ctl.get()); if ( // rs is RUNNING rs < SHUTDOWN || // This is the case when the worker thread in the pool reaches the empty time and is destroyed, but there are still tasks in the task queue (rs == SHUTDOWN && firstTask == null)) { // Pre check whether the thread can start if (t.isAlive()) throw new IllegalThreadStateException(); // Adds a worker thread to the workers collection workers.add(w); // Record the maximum number of worker threads reached int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // Worker thread addition indicates that the setting is successful workerAdded = true; } } finally { // Reentrant lock unlock mainLock.unlock(); } // If the worker thread is added successfully, start the worker thread if (workerAdded) { t.start(); workerStarted = true; } } } finally { // If the worker thread fails to start, remove the worker thread from the collection if (! workerStarted) addWorkerFailed(w); } // Returns whether the worker thread was added successfully return workerStarted; }
Through this source code, it passes our task task to the worker thread and creates it at the appropriate time, and adds the successfully created worker thread to the collection. The CAS dead cycle model is a model that we can learn from in our development.
worker
Through the source code, the task is passed to the worker and the start() method is called, which shows that the worker must be a thread and have its own run method, so it is necessary for us to explore how to encode it.
The above figure is the inheritance diagram of the Worker class. It can be seen that the Worker inherits AQS and implements the Runnable method. Then we can boldly guess that he implements a locking mechanism and can be executed by threads.
worker constructor
final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { // Let's first set the tag setState(-1); // Set the first task that will be performed this.firstTask = firstTask; /** Create threads from the thread pool factory that was initially passed in through the thread pool constructor Because the worker implements the Runnable interface, it can be passed into a new thread It can be inferred that thread. Is called Start () will execute the worker's run() method **/ this.thread = getThreadFactory().newThread(this); }
run method
public void run() { runWorker(this); } protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // The setting flag can be interrupted w.unlock(); boolean completedAbruptly = true; try { while ( // firstTask is not empty or the task can be obtained from the task queue task != null || (task = getTask()) != null) { w.lock(); // Interrupt judgment is introduced in the next chapter if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // Hook method, subclass implementation beforeExecute(wt, task); Throwable thrown = null; // Call the run method of the task, catch all exceptions and handle them by the hook method try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // Hook method, subclass implementation afterExecute(task, thrown); } } finally { // Leave task blank task = null; // Completed tasks + 1 w.completedTasks++; // Flag worker available w.unlock(); } } completedAbruptly = false; } finally { // Execute worker thread exit processWorkerExit(w, completedAbruptly); } }
Through this code, it can be analyzed that this code always obtains tasks from getTask() through the while loop. Then, the getTask method is analyzed below.
getTask
private Runnable getTask() { boolean timedOut = false; // Dead cycle for (;;) { // Get current status int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if ( // SHUTDOWN,STOP,TIDYING,TERMINATED rs >= SHUTDOWN && // STOP, TIDYING, TERMINATED or task queue is empty (rs >= STOP || workQueue.isEmpty())) { // Number of worker threads - 1 decrementWorkerCount(); // return null, the upper runWorker will exit the while loop and the worker thread will exit return null; } // Gets the current number of worker threads int wc = workerCountOf(c); // Judge whether timeout is allowed: when the allowed timeout of core threads is true or the current number exceeds the number of core threads boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ( // The current number of worker threads exceeds the maximum number or timeout allowed and has timed out (wc > maximumPoolSize || (timed && timedOut)) && // The worker thread is greater than 1 or the task queue is empty (wc > 1 || workQueue.isEmpty())) { // Number of CAS worker threads attempted - 1 if (compareAndDecrementWorkerCount(c)) return null; // CAS does not exceed, continue the next cycle continue; } try { // If timeout is allowed, call the poll method to wait for the set timeout. Otherwise, call the take method to block the waiting task acquisition Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // Get the task and return directly if (r != null) return r; // This indicates that the task timeout is obtained and the timeout flag bit is set timedOut = true; } catch (InterruptedException retry) { // The thread is interrupted, set the timeout flag to false, and restart the next cycle timedOut = false; } } }
Through the getTask method, we can see that when the core threads are allowed to timeout or the current number of threads is greater than the number of core threads, it means that the timeout is on. This switch can be used to determine whether to call the blocking method or non blocking method in the blocking queue. Once the timeout is returned null, the worker's run method will exit the loop and enter the worker destruction process, Thus, the number of threads in the thread pool can be dynamically modified.
summary
This paper takes the execute method as the starting point to show you the CAS mode, lock mode and how to deal with the thread pool state.
Many people love to read the source code in the process of reading source code, but actually reading source code is a process of read without thorough understanding. In the actual reading of source code, the stack may reach 5-6 levels or even more layers. This way, reading source code is actually very inefficient. In the process of digging down deep, you will find that your time and energy are being consumed constantly. Finally, I only understand some logical branches of the source code, which is completely different from our original intention of reading the source code.
So I recommend reading the source code, first read the 1-2 layers of the call stack, and then don't go deep. When the overall logic is clear, you can go back and learn what specific details.