JDK Concurrent package component Overview - ThreadPoolExecutor

catalogue

ThreadPoolExecutor thread pool

Stop thread pool

Task execution

#execute(Runnable command)

Worker thread worker

ExecutorCompletionService

FutureTask

ThreadPoolExecutor thread pool

Controllable and orderly creation and destruction of threads and reuse of thread context. Due to the limited operating system resources, to avoid Cpu < time slice scheduling, it is necessary to switch different thread contexts. top check that the resources such as sy > and memory of Cpu% are exhausted; It also takes time to create and destroy threads. When too much memory is occupied, the GC of the JVM stops.

When creating, you need to specify the following parameters:

corePoolSizeNumber of core threads
maximumPoolSizeMaximum number of threads
keepAliveTime + unit Temporary thread duration exceeding the number of core threads
BlockingQueueBlocking queue to ensure concurrent read and write security
ThreadFactoryCreate a factory class for Worker worker threads
RejectedExecutionHandler When the queue is full and no more workers can be added, reject the policy

Be careful when using:

  1. Determines the number of threads and the duration of temporary threads.  
    1. Number of CPU cores * (1 + wait_time / cpu_time). CPU intensive: number of CPU cores; IO intensive: 2 * number of CPU cores;  Runtime.getRuntime().availableProcessors()
    2. The operating system has limits on the number of thread stacks and threads that can be created, and too many threads will also occupy Cpu resources. ( ulimit -a :  max user processes ,stack size ;  JVM : -Xss)
  2. Determines the limit of the blocking queue used. If it is unbounded (eg. LinkedBlockingQueue), it will occupy memory when there is a large backlog of tasks.
  3. Determine the rejection policy. In key scenarios, how to ensure that data is not lost is very important!

  4. The execution order and submission order of tasks may be inconsistent! It is possible that task A first blocks the queue, and Task B creates A temporary thread because the blocking queue is full, but it will be processed first; If there is A sequence dependency, the Worker of the processing thread of Task B needs to block and wait. Therefore, tasks should not have sequence dependencies as much as possible, and do not take too long to do operations, so that subsequent tasks will not be blocked for too long.

The Executors tool class provides, but is not limited to, the following methods for creating thread pools:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

Stop thread pool

  1. #SHUTDOWN: set the status to SHUTDOWN. If the Worker does not respond to the interrupt signal due to blocking, it will continue to process the task s in the blocking queue.
  2. #Shutdown now: set the status to STOP and remove the task blocking the queue.

#It is determined in the execute method that the task cannot be submitted in these two states. Both of them will traverse each Worker and execute Thread#interrupt() to initiate an interrupt, and respond to the interrupt signal only when the thread is blocked.

Task execution

There are two ways to submit a task

  1. void  ThreadPoolExecutor#execute(Runnable command)  
  2. Future<?> AbstractExecutorService#submit: abstract the methods provided by the parent class. The input parameter can be Runnable or Callable, and will be encapsulated as a FutureTask object to execute the method #execute, which returns the FutureTask

#execute(Runnable command)

When a new task is submitted to the thread pool, the internal execution priority is:

  1. Create core thread (full) → add to blocking queue (full) → create maximum temporary thread (full) → reject policy; When creating a Worker for the first time, 'firstTask' is usually specified as the incoming Runnable.
  2. When the number of core threads is 0, after adding to the blocking queue, it is determined that [the number of threads in the current pool is 0], a temporary thread worker is created to take tasks from the blocking queue for execution; in this case, 'firstTask' is empty.
ThreadPoolExecutor#execute(Runnable command)

Worker thread worker

Worker #run() - > ThreadPoolExecutor #runworker (worker w): give priority to the initially specified firstTask, and then take the task from the blocking queue for processing.

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted.  
                // This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt(); // Check again that interrupt is initiated in the STOP state to prevent concurrency
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. If it is determined that [the number of active threads has exceeded the number of core threads], use the workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) provided by the blocking queue to block the 'keepAliveTime' duration. Otherwise, use take() to block all the time. See ThreadPoolExecutor#getTask()
  2. When no task is obtained after the blocking timeout, the current Worker is deleted from the Worker list of the thread pool, and the Worker count is reduced by one. See: ThreadPoolExecutor#processWorkerExit

A Worker execution exception

In the method ThreadPoolExecutor#runWorker, mark "completedAbruptly" as true. Execute the entry ThreadPoolExecutor#processWorkerExit method:

  1. At this time, first reduce the worker count CAS by one;
  2. Delete the worker from the worker list;
  3. If it is determined that the number of threads is less than the number of core threads, a new thread will be added to the thread pool. Even if coreSize is set to 0, 1 active thread Worker will still be maintained.
ThreadPoolExecutor#processWorkerExit 
The "core" passed in by the addWorker method here is false, but the determination logic of the thread pool to the core thread pool does not depend on it. Just in order to add a new thread when the concurrency exceeds the coreSize


ExecutorCompletionService

During construction, it should be noted that the BlockingQueue should be large enough to accommodate the number of tasks.

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue; // FutureTask execution completion queue

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    public Future<V> poll() {
        return completionQueue.poll();
    }
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

ExecutorCompletionService#submit method encapsulates Callable | Runnable into FutureTask, and then wraps a layer of QueueingFuture; When FutureTask is executed, it will be put into the completion queue completionQueue.  

ExecutorCompletionService#take() can get the FutureTask of the end of execution from the completionQueue in a blocking manner.

FutureTask

Unsafe.CAS ensures atomicity:

  1. The change of the "state" value of the internal volatile variable is used to record the Callable execution state. It was NEW when it was first created.
  2. Wait for the addition, deletion and search of thread node chain "waiters".

FutureTask#run():

  1. First, CAS binds "runner" as the current thread.

  2. Execute callable call() 

    1. Normal execution: after obtaining the result, set it to outcome, and set the "state" status from NEW to {COMPLETING;

    2. Exception occurs: set the exception Throwable to outcome, and set the "state" status to "EXCEPTIONAL";

  3. After the execution is completed, the loop determines that there are threads in the suspended waiting state on the chain "waiters", then CAS clears the chain and wakes up the node threads one by one LockSupport#unpark.

Post FutureTask#get():

Determine the current "state" status ID:

  1. If there is no result, encapsulate the current thread as a WaitNode, insert it into the one-way chain "waiters" in CAS mode, and LockSupport#park suspend the thread.

  2. When there are results

    1. If it is determined that the "state" status is NORMAL, the NORMAL result will be returned directly;

    2. The exception result wraps Throwable as ExecutionException and throws it.

Main logic source code:

// Execute callable Wake up the waiting thread after call() completes   
 private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) { // If the loop determines that there is a node in the waiting chain, it goes to cas to empty it
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 
                for (;;) {
           // Wake up one by one
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }
// Suspend waiting for results while getting()
  private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s; 
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); // Forward insertion
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

How to ensure concurrency security?  

  • The waiting chain head "waiters" is modified by volatile.
  • When get(), wait for the thread node insertion chain to be "forward insertion", After the Callable is completed, the finishcompletement () process is a loop decision [there are also suspended waiting threads on the chain, so the CAS will be emptied]. In both scenarios, the downward processing will continue only after UNSAFE#compareAndSwapObject successfully operates on the CAS of the head node of the waiting chain. Therefore, if concurrency occurs, one CAS must be unsuccessful, and it will be judged and processed again by volatile obtaining "waiters".
  • In the awaitDone method, the execution status of FutureTask will be determined first. If the execution has ended, another thread will execute the get() method and directly return the status "state".

Unfinished, to be continued...

Keywords: Concurrent

Added by Baving on Tue, 21 Dec 2021 13:32:33 +0200