catalogue
ThreadPoolExecutor thread pool
ThreadPoolExecutor thread pool
Controllable and orderly creation and destruction of threads and reuse of thread context. Due to the limited operating system resources, to avoid Cpu < time slice scheduling, it is necessary to switch different thread contexts. top check that the resources such as sy > and memory of Cpu% are exhausted; It also takes time to create and destroy threads. When too much memory is occupied, the GC of the JVM stops.
When creating, you need to specify the following parameters:
corePoolSize | Number of core threads |
maximumPoolSize | Maximum number of threads |
keepAliveTime + unit | Temporary thread duration exceeding the number of core threads |
BlockingQueue | Blocking queue to ensure concurrent read and write security |
ThreadFactory | Create a factory class for Worker worker threads |
RejectedExecutionHandler | When the queue is full and no more workers can be added, reject the policy |
Be careful when using:
- Determines the number of threads and the duration of temporary threads.
- Number of CPU cores * (1 + wait_time / cpu_time). CPU intensive: number of CPU cores; IO intensive: 2 * number of CPU cores; Runtime.getRuntime().availableProcessors()
- The operating system has limits on the number of thread stacks and threads that can be created, and too many threads will also occupy Cpu resources. ( ulimit -a : max user processes ,stack size ; JVM : -Xss)
- Determines the limit of the blocking queue used. If it is unbounded (eg. LinkedBlockingQueue), it will occupy memory when there is a large backlog of tasks.
-
Determine the rejection policy. In key scenarios, how to ensure that data is not lost is very important!
-
The execution order and submission order of tasks may be inconsistent! It is possible that task A first blocks the queue, and Task B creates A temporary thread because the blocking queue is full, but it will be processed first; If there is A sequence dependency, the Worker of the processing thread of Task B needs to block and wait. Therefore, tasks should not have sequence dependencies as much as possible, and do not take too long to do operations, so that subsequent tasks will not be blocked for too long.
The Executors tool class provides, but is not limited to, the following methods for creating thread pools:
public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
Stop thread pool
- #SHUTDOWN: set the status to SHUTDOWN. If the Worker does not respond to the interrupt signal due to blocking, it will continue to process the task s in the blocking queue.
- #Shutdown now: set the status to STOP and remove the task blocking the queue.
#It is determined in the execute method that the task cannot be submitted in these two states. Both of them will traverse each Worker and execute Thread#interrupt() to initiate an interrupt, and respond to the interrupt signal only when the thread is blocked.
Task execution
There are two ways to submit a task
- void ThreadPoolExecutor#execute(Runnable command)
- Future<?> AbstractExecutorService#submit: abstract the methods provided by the parent class. The input parameter can be Runnable or Callable, and will be encapsulated as a FutureTask object to execute the method #execute, which returns the FutureTask
#execute(Runnable command)
When a new task is submitted to the thread pool, the internal execution priority is:
- Create core thread (full) → add to blocking queue (full) → create maximum temporary thread (full) → reject policy; When creating a Worker for the first time, 'firstTask' is usually specified as the incoming Runnable.
- When the number of core threads is 0, after adding to the blocking queue, it is determined that [the number of threads in the current pool is 0], a temporary thread worker is created to take tasks from the blocking queue for execution; in this case, 'firstTask' is empty.
Worker thread worker
Worker #run() - > ThreadPoolExecutor #runworker (worker w): give priority to the initially specified firstTask, and then take the task from the blocking queue for processing.
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted. // This requires a recheck in second case to deal with shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // Check again that interrupt is initiated in the STOP state to prevent concurrency try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
- If it is determined that [the number of active threads has exceeded the number of core threads], use the workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) provided by the blocking queue to block the 'keepAliveTime' duration. Otherwise, use take() to block all the time. See ThreadPoolExecutor#getTask()
- When no task is obtained after the blocking timeout, the current Worker is deleted from the Worker list of the thread pool, and the Worker count is reduced by one. See: ThreadPoolExecutor#processWorkerExit
A Worker execution exception
In the method ThreadPoolExecutor#runWorker, mark "completedAbruptly" as true. Execute the entry ThreadPoolExecutor#processWorkerExit method:
- At this time, first reduce the worker count CAS by one;
- Delete the worker from the worker list;
- If it is determined that the number of threads is less than the number of core threads, a new thread will be added to the thread pool. Even if coreSize is set to 0, 1 active thread Worker will still be maintained.
The "core" passed in by the addWorker method here is false, but the determination logic of the thread pool to the core thread pool does not depend on it. Just in order to add a new thread when the concurrency exceeds the coreSize
ExecutorCompletionService
During construction, it should be noted that the BlockingQueue should be large enough to accommodate the number of tasks.
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; // FutureTask execution completion queue public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
ExecutorCompletionService#submit method encapsulates Callable | Runnable into FutureTask, and then wraps a layer of QueueingFuture; When FutureTask is executed, it will be put into the completion queue completionQueue.
ExecutorCompletionService#take() can get the FutureTask of the end of execution from the completionQueue in a blocking manner.
FutureTask
Unsafe.CAS ensures atomicity:
- The change of the "state" value of the internal volatile variable is used to record the Callable execution state. It was NEW when it was first created.
- Wait for the addition, deletion and search of thread node chain "waiters".
FutureTask#run():
-
First, CAS binds "runner" as the current thread.
-
Execute callable call()
-
Normal execution: after obtaining the result, set it to outcome, and set the "state" status from NEW to {COMPLETING;
-
Exception occurs: set the exception Throwable to outcome, and set the "state" status to "EXCEPTIONAL";
-
-
After the execution is completed, the loop determines that there are threads in the suspended waiting state on the chain "waiters", then CAS clears the chain and wakes up the node threads one by one LockSupport#unpark.
Post FutureTask#get():
Determine the current "state" status ID:
-
If there is no result, encapsulate the current thread as a WaitNode, insert it into the one-way chain "waiters" in CAS mode, and LockSupport#park suspend the thread.
-
When there are results
-
If it is determined that the "state" status is NORMAL, the NORMAL result will be returned directly;
-
The exception result wraps Throwable as ExecutionException and throws it.
-
Main logic source code:
// Execute callable Wake up the waiting thread after call() completes private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // If the loop determines that there is a node in the waiting chain, it goes to cas to empty it if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { // Wake up one by one Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } // Suspend waiting for results while getting() private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // Forward insertion else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
How to ensure concurrency security?
- The waiting chain head "waiters" is modified by volatile.
- When get(), wait for the thread node insertion chain to be "forward insertion", After the Callable is completed, the finishcompletement () process is a loop decision [there are also suspended waiting threads on the chain, so the CAS will be emptied]. In both scenarios, the downward processing will continue only after UNSAFE#compareAndSwapObject successfully operates on the CAS of the head node of the waiting chain. Therefore, if concurrency occurs, one CAS must be unsuccessful, and it will be judged and processed again by volatile obtaining "waiters".
- In the awaitDone method, the execution status of FutureTask will be determined first. If the execution has ended, another thread will execute the get() method and directly return the status "state".
Unfinished, to be continued...