Let's look at a piece of code and think about two questions
Take these two questions to see the source code, get twice the result with half the effort!
- Why is it the fourth error?
- Why does the third execute first and the second execute later?
Construction method analysis
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- In the construction method, only parameter verification is done without any logic processing!
- ThreadPoolExecutor has seven parameters, which are described below.
- corePoolSize: the number of core threads, which is also the number of resident threads in the thread pool. Note: there are no threads by default when the thread pool is initialized. When a task comes, it starts to create a thread to execute the task
- maximumPoolSize: the maximum number of threads. Some non core threads may be added to the number of core threads. A simple understanding is a temporary thread.
- keepAliveTime: the lifetime of a non core thread. If the idle time of the non core thread exceeds this time, the non core thread will be recycled. If corePoolSize = maximumPoolSize, there is no non core thread, so there is no recycling!
- Unit: the time unit of keepAliveTime.
- workQueue: the queue used to save tasks. When the number of tasks exceeds the maximumPoolSize, new tasks will be stored in this queue.
- threadFactory: the factory class for creating threads. Executors. Factory is used by default Defaultthreadfactory() can also be created using ThreadFactoryBuilder of guava library.
- handler: the saturation policy when the thread pool cannot continue to receive tasks (the queue is full and the number of threads reaches maximunPoolSize). The values include AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy and DiscardPolicy. The simple understanding is how to handle when the maximumPoolSize is full and the queue is full!
Thread pool execution: analysis of execute() method
public void execute(Runnable command) {
// If the task is empty, throw a null pointer exception directly!
if (command == null)
throw new NullPointerException();
// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
int c = ctl.get();
// Judge whether the current number of threads is less than corePoolSize. The number of core threads is processed here, and the corresponding code above is thread 1.
if (workerCountOf(c) < corePoolSize) {
// Create a new thread with the addWord method using the parameter entry task
if (addWorker(command, true))
// If the thread can be created successfully, it indicates that the execution has been completed! Direct return
return;
// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
c = ctl.get();
}
// If the above task does not create a thread and the status is running, the current task will be placed in the queue
if (isRunning(c) && workQueue.offer(command)) {
// Successfully put into the queue. Get the current thread pool data again
int recheck = ctl.get();
// check again. If the status changes to non running after the task is added to the queue (it may be that the thread pool is shut down at this point of execution), you need to remove the current task.
if (! isRunning(recheck) && remove(command))
// If it is not running and the removal is successful, execute the rejection policy.
reject(command);
// Judge whether the number of valid threads in the current worker thread pool is 0
else if (workerCountOf(recheck) == 0)
// Add workers, empty tasks, non core threads! To ensure that the thread pool is running, there must be a task executing
addWorker(null, false);
}
// The core thread is exhausted and cannot be added to the queue. Then increase the number of non core threads!
else if (!addWorker(command, false))
// Failure to increase the number of non core threads indicates that the queue is full and the threads are full. Call reject method
reject(command);
}
- Seeing this can actually explain why the fourth error is reported! The first is placed in the core thread, the second is placed in the queue, the third is in the non core thread, and the fourth calls the reject method.
What are the status of the thread pool?
// runState is stored in the high-order bits
// The top three bits indicate the thread pool status
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
The addWorker() method is called many times during execution
private boolean addWorker(Runnable firstTask, boolean core) {
// A loop ID used to jump out of a loop. java does not recommend it.
retry:
for (;;) {
// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
int c = ctl.get();
// Status acquisition of thread pool
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// Check that the queue is empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// Get the number of valid threads in the current worker pool
int wc = workerCountOf(c);
// The number of valid threads in the current worker thread pool is greater than the maximum number of threads (a maximum value, the maximum value of the last 29 bits)
// Or when the number of effective threads in the worker thread pool is greater than the maximum number of core (total) threads, execution cannot continue!
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// Increase the number of threads by one through CAS (CSA will be described in detail later). Successfully jumped out of the loop!
if (compareAndIncrementWorkerCount(c))
break retry;
// Query loading: thread pool data
c = ctl.get(); // Re-read ctl
// The state of the newly acquired thread pool is inconsistent with the initial state. Start the cycle again!
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
// Other CAS failed due to workerCount change; Retry internal loop
}
}
// The above code is mainly to cas increase the number of threads by one
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// An internal class: create threads through threadfactory!
w = new Worker(firstTask);
// Get your Worker thread
final Thread t = w.thread;
// When the thread exists
if (t != null) {
// Acquire lock
final ReentrantLock mainLock = this.mainLock;
// Lock
mainLock.lock();
try {
// Recheck while holding lock. Recheck while holding the lock.
// Back out on ThreadFactory failure or if ThreadFactory fails or
// shut down before lock acquired. Close before obtaining the lock.
// Gets the status of the thread pool
int rs = runStateOf(ctl.get());
// Less than shutdown is the running status
// firstTask is empty when SHUTDOWN. Is to process tasks from the queue! Then you can put it in the collection
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// Precheck that is startable
// What happens is that the thread has not start ed yet
if (t.isAlive())
throw new IllegalThreadStateException();
// workers is a HashSet to which the worker object is added
workers.add(w);
// Gets the length of the current HashSet
int s = workers.size();
// The length is relatively large. Record maximum threads
if (s > largestPoolSize)
largestPoolSize = s;
// Update status: indicates that the thread can be started
workerAdded = true;
}
} finally {
// Release the lock!
mainLock.unlock();
}
// The above creation is successful, and the identification thread can be started
if (workerAdded) {
// Thread start
t.start();
// Indicates that the thread has started
workerStarted = true;
}
}
} finally {
// Thread startup failed, processing failed logic
if (! workerStarted)
// Failure fallback removes w threads from wokers minus 1 and attempts to end the thread pool
addWorkerFailed(w);
}
return workerStarted;
}
Learn more about the Worker class just used
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// Running woker thread
final Thread thread;
/** Initial task to run. Possibly null. */
// Incoming tasks
Runnable firstTask;
/** Per-thread task counter */
// For monitoring the number of tasks completed
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// Disable thread interrupt
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
Running logic of threads in thread pool
- In the addWorker method of the above code, there is t.start(); Such a line, this line opens the real thread
- This line of code opens a new thread and calls the Worker's run() method.
- Then the Worker's run() method calls runWorker(this);
Thread execution: runWorker(this)
final void runWorker(Worker w) {
// Gets the current thread
Thread wt = Thread.currentThread();
// Get current task
Runnable task = w.firstTask;
// Set the current task to null
w.firstTask = null;
// Unlock operation, corresponding to setState(-1) initialized by Worker class;
w.unlock(); // allow interrupts
// Is the loop exited because of an exception
boolean completedAbruptly = true;
try {
// There are tasks currently, which can be executed directly
// There is no task at present. Get task execution from the queue!
// Here you can see the answer to the second question, why do you execute 3 first and then 2
while (task != null || (task = getTask()) != null) {
// Lock
w.lock();
// If pool is stopping, ensure thread is interrupted; If the pool is stopping, ensure that the thread is interrupted;
// if not, ensure thread is not interrupted. If not, make sure the thread is not interrupted.
// This requires a check in second case to deal with Shutdown now race while clearing interrupt. It should be considered that the shutdown now method may also be executed during the execution of the if statement, and the shutdown now method will set the status to STOP
// Thread. The function of the interrupted () method is to test whether the current thread is interrupted (check the interrupt flag), return a boolean and clear the interrupt state. The interrupt state has been cleared during the second call, and a false will be returned.
// The current thread is interrupted when it is greater than or equal to stop, or in thread Interrupted () can be satisfied, and it is not interrupted
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// Interrupt current thread
wt.interrupt();
try {
// Before executing the task, the subclass can implement this hook method to expand the thread pool
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Perform tasks
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// After executing the task, the subclass can implement this hook method to expand the thread pool
afterExecute(task, thrown);
}
} finally {
// Set task to null, and it will be obtained from the queue in the next cycle
task = null;
// Number of completed tasks + 1
w.completedTasks++;
// Unlock
w.unlock();
}
}
// Flag does not exit because of an exception
completedAbruptly = false;
} finally {
// Processing logic for task execution completion
processWorkerExit(w, completedAbruptly);
}
}
How to get a task from a queue: getTask()
private Runnable getTask() {
// Defines the timeout flag: whether the task timed out the last time it was fetched from the blocking queue
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
int c = ctl.get();
// Gets the status of the thread pool
int rs = runStateOf(c);
// Check if queue empty only if necessary. Check that the queue is empty only if necessary.
// The current status is greater than or equal to SHUTDOWN and (greater than or equal to STOP or the queue is empty)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// The thread has stopped, and the current workerCount is reduced by one. To offset the addition of one to the method of compareAndIncrementWorkerCount(c) in addWorker
decrementWorkerCount();
return null;
}
// Get the number of valid threads in the current worker pool
int wc = workerCountOf(c);
// Are workers subject to culling? Will workers be eliminated?
// If false (the default), the core thread remains active even when idle. If true, the core thread uses keepAliveTime timeout to wait for work.
// WC > corepoolsize, indicating that the number of threads in the current thread pool is greater than the number of core threads; For these threads that exceed the number of core threads, timeout control is required
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// The number of valid threads in the current worker pool is greater than the maximum number of threads required
// Timed & & timedout if true, it means that the current operation needs timeout control, and the last time the task was obtained from the blocking queue timed out
// The number of valid threads is greater than 1, or the blocking queue is empty
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// Then try subtracting workerCount by one. To offset the addition of one to the method compareAndIncrementWorkerCount(c) in addWorker
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// If it is true, the timeout is controlled through the poll method of blocking the queue. If the task is not obtained within the keepAliveTime time, null is returned;
// Otherwise, through the take method, if the queue is empty at this time, the take method will block until the queue is not empty.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// Get the data from the queue and return it directly
if (r != null)
return r;
// If no data is obtained, it indicates that it has timed out, and timedOut is set to true
timedOut = true;
} catch (InterruptedException retry) {
// If the current thread is interrupted when obtaining the task, set timedOut to false and return to loop retry
timedOut = false;
}
}
}
What to do after task execution: processWorkerExit()
- It is mainly used for thread cleaning
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If completedAbruptly is true, an error occurred
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// Careful subtraction operation
decrementWorkerCount();
// Acquisition theory
final ReentrantLock mainLock = this.mainLock;
// Lock
mainLock.lock();
try {
// Number of completed tasks per worker thread
completedTaskCount += w.completedTasks;
// Remove current work task
workers.remove(w);
} finally {
// Unlock
mainLock.unlock();
}
// Judge whether to end the thread pool according to the thread pool status
tryTerminate();
// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
int c = ctl.get();
// The current status is less than the STOP status
if (runStateLessThan(c, STOP)) {
// Not an abnormal interrupt
if (!completedAbruptly) {
// If false (the default), the core thread remains active even when idle. If true, the core thread uses keepAliveTime timeout to wait for work.
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// Keep the logic of at least one Worker
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// The current number of jobs is greater than or equal to the number of core threads
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// Continue to execute new tasks, empty tasks, get from the queue!
addWorker(null, false);
}
}
When will the thread pool end: tryTerminate();
- This method needs to be triggered
final void tryTerminate() {
for (;;) {
// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
int c = ctl.get();
// RUNNING cannot be stopped because it is still RUNNING;
// TIDYING or TERMINATED, because there are no running threads in the thread pool; Other restrictions are being groomed or have stopped.
// HUTDOWN and the waiting queue is not empty. At this time, the task in the workQueue should be executed.
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// If the number of threads is not 0
if (workerCountOf(c) != 0) { // Eligible to terminate
// Interrupt an idle worker thread
interruptIdleWorkers(ONLY_ONE);
return;
}
// Acquire lock
final ReentrantLock mainLock = this.mainLock;
// Lock
mainLock.lock();
try {
// Trying to set the status to TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// Set the status to TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// Wake up called the thread awaitTermination() waiting for the thread pool to terminate
termination.signalAll();
}
return;
}
} finally {
// Unlock
mainLock.unlock();
}
// else retry on failed CAS retry
}
}
Conclusion
- Get more valuable articles and let's become architects together!
- Paying attention to the official account enables you to have a deeper understanding of MySQL and concurrent programming.
- This official account is not advertising!!! Update daily!!!