Use of JUC thread pool and source code analysis

Thread pool creation and use

Creation of thread pool

The Executors framework provides various types of thread pools, mainly including the following factory methods:

  • public static ExecutorService newFixedThreadPool(int nThreads)
  • public static ExecutorService newCachedThreadPool()
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  • public static ExecutorService newSingleThreadExecutor()
  • public static ScheduledExecutorService newSingleThreadScheduledExecutor()

Method names with corepoolsize > 1 end in Pool and those equal to 1 end in Executor.

The real implementation classes of Executors mainly include two threadpoolexecutors and ScheduledThreadPoolExecutor.

newFixedThreadPool

After the number of threads reaches the number of core threads, no new threads will be created (on the one hand, the queue is borderless, on the other hand, corePoolSize = maximumPoolSize).

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize = maximumPoolSize
                                  0L, TimeUnit.MILLISECONDS,
                                  // Unbounded queue, so even if the maximum number of threads is set, the number of threads in the thread pool will not reach the maximum number of threads
                                  new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor

It is similar to newFixedThreadPool, except that both corePoolSize and maximumPoolSize are set to 1.

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, // corePoolSize = maximumPoolSize = 1
                                0L, TimeUnit.MILLISECONDS,
                                // Unbounded queue
                                new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
newScheduledThreadPool

You can specify the number of core threads to create a thread pool for executing periodic or scheduled tasks.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          // Delay queue
          new DelayedWorkQueue());
}

The core method of ScheduledExecutorService is as follows:

// After delaying the delay time, execute the Runnable task
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

// After delaying the delay time, execute the Callable task
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

// Timed task
// The timing starts at the start time of the previous task. After the period time, if the previous task has been completed, it will be executed immediately. Otherwise, it will be executed after the previous task is completed
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

// Delayed task
// The end time of the above task starts to count, and it will be executed immediately after the delay time
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

After the delay or period time, it means that the task can be executed immediately, but it will not be executed immediately until the thread is obtained. Otherwise, the thread waiting for acquisition will be blocked first, which is similar to the general thread pool logic.

Use of thread pool

Wait for all task threads to complete execution

quote:

ExecutorService ends gracefully after waiting for the thread to complete

How to wait for all threads to finish, using ExecutorService?

Method 1: shutdown() / shutdown now() + awaittermination()
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);

while(...) {
    taskExecutor.execute(new MyTask());
}

// Thread pool pauses receiving new tasks
taskExecutor.shutdown();

try {
    // Wait for all tasks to complete
    taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
    // ...
}
Method 2: invokeall + shutdown() / shutdown now() + awaittermination()

The first method we can use to run threads is the invokeAll() method, which returns a list of Future objects after all tasks are completed or the timeout expires.

In addition, we must note that the order of the returned Future objects is the same as the list of Callable objects provided:

ExecutorService taskExecutor = Executors.newFixedThreadPool(10);

// your tasks
List<Callable<String>> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));

// invokeAll() returns when all tasks are complete
List<Future<String>> futures = taskExecutor.invokeAll(callables);

taskExecutor.shutdown();

try {
    // Wait for all tasks to complete
    taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
    // ...
}
Method 3: use CountDownLatch
ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);

for(int i = 0; i < 2; i++){
    taskExecutor.submit(() -> {
        try {
            // Business logic
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();
Method 4: use ExecutorCompletionService

Another way to run multiple threads is to use ExecutorCompletionService, which uses the provided ExecutorService to perform tasks.

One difference from invokeAll is that it returns the order of Futures representing the execution of the task. ExecutorCompletionService uses queues to store results in the end order, while invokeAll returns a list in the same order as the iterator generated for a given task list:

CompletionService<String> service = new ExecutorCompletionService<>(WORKER_THREAD_POOL);

List<Callable<String>> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));

for (Callable<String> callable : callables) {
      service.submit(callable);
}
Method 5: use Java 8's completable future
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, taskExecutor))
                               .toArray(CompletableFuture[]::new);
// Wait for all tasks to complete
CompletableFuture.allOf(futures).join();    
taskExecutor.shutdown();

Configuration and optimization of thread pool

Configuration is mainly the parameter configuration of ThreadPoolExecutor construction method.

coreThreadPoolSize

Each thread requires a certain stack memory space. In the most recent 64 bit JVM, the default stack size was 1024KB. If the server receives a large number of requests, or the handleRequest method executes slowly, the server may crash due to the creation of a large number of threads. For example, if there are 1000 parallel requests, the created 1000 threads need to use 1GB of JVM memory as thread stack space. In addition, objects created during code execution of each thread may also be created on the heap. If this situation worsens, it will exceed JVM heap memory And generate a large number of garbage collection operations, which eventually leads to out of memory errors.

These threads not only consume memory, they also use other limited resources, such as file handles, database connections, and so on. Uncontrolled thread creation may also cause other types of errors and crashes. Therefore, an important way to avoid resource depletion is to avoid uncontrollable data structures.

Incidentally, due to the memory problem caused by the thread stack size, you can adjust the stack size through the - Xss switch. After reducing the size of the thread stack, the overhead of each thread can be reduced, but stack overflow errors may be caused. For general applications, the default 1024KB is too rich, and it may be more appropriate to reduce it to 256KB or 512KB. The minimum allowed for Java is 160KB.

cpu intensive tasks should be configured with as few threads as possible, such as configuring the thread pool of cpu cores + 1 thread to reduce thread switching.

If IO intensive task threads are not always executing tasks, configure as many threads as possible, such as cpu cores * 2.

There is a formula to calculate the optimal number of threads for IO tasks

$$Nthreads = NCPU * UCPU * (1 + W/C)$$

Of which:

* NCPU is the number of cores of the processor

* UCPU is the expected CPU utilization (the value should be between 0 and 1)

* W / C is the ratio of waiting time to calculation time

Since the number of threads depends on the type of application, it may take a lot of performance testing to get the best results.

It is monitored through the parameters provided by the thread pool. There are some properties in the thread pool that can be used when monitoring the thread pool:

  • getTaskCount: the total number of executed and unexecuted tasks in the thread pool (the number of completedtaskcounts of all threads plus the number of elements in the blocking queue);
  • getCompletedTaskCount: the number of tasks completed by the thread pool (the number of completedtaskcounts of all threads), which is less than or equal to taskCount;
  • getLargestPoolSize: the maximum number of threads that the thread pool has ever created. Through this data, you can know whether the thread pool is full, that is, it reaches the maximum poolsize;
  • getPoolSize: the current number of threads in the thread pool (the number of elements in workers);
  • getActiveCount: the number of threads executing tasks (exclusive lock occupied) in the current thread pool.

Through these methods, the thread pool can be monitored. Several empty methods are provided in the ThreadPoolExecutor class, such as beforeExecute method, afterExecute method and terminated method. These methods can be extended to add some new operations before or after execution, such as counting the execution time of the thread pool, It can be extended by inheriting from ThreadPoolExecutor.

workQueue
maximumPoolSize
threadFactory

DefaultThreadFactory is used by default and can be customized.

RejectedExecutionHandler

In addition to the four default rejection policies, you can also customize the rejection policy.

AbstractExecutorService source code

The AbstractExecutorService abstract class implements the ExecutorService interface.

FutureTask source code reference: Future source code analysis

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}


public abstract class AbstractExecutorService implements ExecutorService {

    // Use the thread object runnable and the variable value that holds the execution result of runnable to construct a FutureTask object
    // The newTaskFor method uses the adapter pattern and can construct a Runnable + value or callable object into a FutureTask object
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    // Use the callable object to construct a FutureTask object
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    // Submit the task to the thread pool and return a FutureTask object
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // Add task to thread pool for execution
        execute(ftask);
        return ftask;
    }

    // ditto
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    // ditto
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    /**
     * the main mechanics of invokeAny.
     */
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

    // Batch execution of multiple tasks
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // Add multiple tasks to the thread pool in turn for execution
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                // Add task results to futures
                futures.add(f);
                execute(f);
            }
            // Traversing the future set
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                // If the task is not completed
                if (!f.isDone()) {
                    try {
                        // Block get execution results
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            // Execution here indicates that all tasks have been completed
            done = true;
            // Return to execution
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

ThreadPoolExecutor source code

For several core thread pools, whether newFixedThreadPool() method, newCachedThreadPol(), or newsinglethreadexecution() method, although it seems that the created thread has completely different functional characteristics, its internal implementation uses the ThreadPoolExecutor implementation.

ThreadPoolExecutor property
public class ThreadPoolExecutor extends AbstractExecutorService {

    // The ctl field stores the current state of the thread pool and the number of threads
    // The upper 3 bits store the running state of the thread pool 
    // The number of valid threads (active) in the low 29 bit thread pool (workerCount)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // In Java, an int occupies 32 bits, so count_ The result of bits is 32 - 3 = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;

    // Capability represents the upper limit of workerCount, which is the theoretical maximum number of active threads in ThreadPoolExecutor
    // The operation process is to shift 1 by 29 bits to the left, that is, 00000000 00000000 00000000 00000001 -- > 001 00000000 00000000 00000000 00000000, and then subtract 1, that is, 000 11111 11111111111111111111111. The first three bits represent the running state of the thread pool runState, so the theoretical maximum value of workerCount here should be 29 1s, that is, 536870911
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits

    // RUNNING: accept new tasks and process queue tasks
    // -1 is represented by 32 1s at the bottom of Java. If it is shifted 29 bits to the left, that is, 111 00000 000000 00000000 00000000, that is, if the lower 29 bits are all 0 and the upper 3 bits are all 1, it indicates the RUNNING state, that is - 536870912
    private static final int RUNNING    = -1 << COUNT_BITS;

    // SHUTDOWN: does not accept new tasks, but processes queued tasks
    // When the thread pool is in the RUNNING state, calling the shutdown() method will make the thread pool enter this state (the finalize() method will also call the shutdown() method to enter this state during execution)
    // 0 is represented by 32 zeros at the bottom of Java. No matter how many bits are shifted to the left, or 32 zeros, i.e. 000 00000 00000000 00000000 00000000, that is, if the lower 29 bits are all 0 and the upper 3 bits are all 0, it indicates the SHUTDOWN state, i.e. 0;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    // STOP: do not accept new tasks, do not process queue tasks, and interrupt tasks in process
    // When the thread pool is in the RUNNING or SHUTDOWN state, calling the shutdownNow() method will bring the thread pool into this state
    // 1 in the bottom layer of Java, it is composed of 31 zeros and 1 1. If you move 29 bits to the left, that is, 001 00000 00000000 00000000 00000000, that is, if the lower 29 bits are all 0 and the upper 3 bits are 001, it indicates the STOP state, that is, 536870912;
    private static final int STOP       =  1 << COUNT_BITS;

    // TIDYING: all tasks have ended. The workerCount is 0. After the thread pool enters this state, it will call the terminated() method to enter the TERMINATED state
    // The bottom layer of Java is composed of the first 30 zeros and one 11. If it is shifted 29 bits to the left, i.e. 011 00000 00000000 00000000 00000000, that is, if the lower 29 bits are all 0 and the upper 3 bits are 011, it indicates the TERMINATED state, i.e. 1610612736;
    private static final int TIDYING    =  2 << COUNT_BITS;

    // TERMINATED: enter this state after the terminated() method is executed, and nothing is done in the default terminated() method
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl

    // The c passed in represents the value of ctl, that is, the upper 3 bits are the runState of the thread pool, and the lower 29 bits are the number of currently active threads in the thread pool, workerCount

    // runState: the RUNNING state of the thread pool, which occupies the top three bits of ctl. There are five states: RUNNING, SHUTDOWN, STOP, TIDYING and TERMINATED
    // ~It means reverse by bit. Capability represents the high three zeros and the low 29 ones, while ~ capability represents the high three ones and the low 29 zeros, and then performs bitwise and operation with input parameter c, that is, the high three bits remain the same, and the low 29 bits are all set to 0, which obtains the running state runState of the thread pool.
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

    // Gets the number of valid threads in the current thread pool
    // workerCount: the number of currently active threads in the thread pool, occupying the lower 29 bits of ctl
    // And operate c with capability &, that is, with 000 11111 11111 11111111 11111111. The first three bits of c are and operated with 000. No matter what the value of the first three bits of c is, it will eventually become 000, that is, the value of the first three bits will be discarded. While the lower 29 bits of c are and operated with 29 1s, the lower 29 bits of c will remain the original value, This resolves the value of workerCount from AtomicInteger ctl
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    // Initialization method of atomic variable ctl
    // The incoming rs indicates the RUNNING state runState of the thread pool, which is an int with value in the upper 3 bits and 0 in the lower 29 bits, while wc represents the number of effective threads in the thread pool. workerCount is 0 in the upper 3 bits and int in the lower 29 bits. Do or operate runState and workerCount, that is, use the upper 3 bits of runState, The lower 29 bits of workerCount are filled with numbers, while the default passed in runState and workerCount are RUNNING and 0 respectively.
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    /**
     * workQueue Is a queue used to hold tasks and convert them into worker threads
     */
    private final BlockingQueue<Runnable> workQueue;

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * workers Is a collection that contains all worker threads in the thread pool
     * The mainLock lock lock can only be accessed when it is acquired
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    // Record the maximum number of elements in the workers collection
    private int largestPoolSize;

    /**
     * The number of tasks completed by the thread pool
     */
    private long completedTaskCount;

    /**
     * Create factory class for new thread
     */
    private volatile ThreadFactory threadFactory;

    /**
     * Processor executing reject policy
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * The timeout (nanoseconds) for idle threads to wait for work, that is, the survival time of idle threads 
     */
    private volatile long keepAliveTime;

    /**
     * Allow core threads to time out
     * The default value is false. If false, the core thread will still survive when idle; If true, the core thread waits for work until the time expires to keepAliveTime
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * The core thread pool size is the minimum number of surviving working threads. When it is less than the corePoolSize, a new thread will be directly started to process tasks, regardless of whether there are idle threads in the thread pool
     */
    private volatile int corePoolSize;

    /**
     * The maximum number of threads in the thread pool
     */
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    /* The context to be used when executing the finalizer, or null. */
    private final AccessControlContext acc;

    // Public constructors and methods
    // The following is the construction method

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
            null :
        AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }


    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

    /**
     * Invokes {@code shutdown} when this executor is no longer
     * referenced and it has no threads.
     */
    protected void finalize() {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null || acc == null) {
            shutdown();
        } else {
            PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
            AccessController.doPrivileged(pa, acc);
        }
    }

    /**
     * Sets the thread factory used to create new threads.
     *
     * @param threadFactory the new thread factory
     * @throws NullPointerException if threadFactory is null
     * @see #getThreadFactory
     */
    public void setThreadFactory(ThreadFactory threadFactory) {
        if (threadFactory == null)
            throw new NullPointerException();
        this.threadFactory = threadFactory;
    }

    /**
     * Returns the thread factory used to create new threads.
     *
     * @return the current thread factory
     * @see #setThreadFactory(ThreadFactory)
     */
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }

    /**
     * Sets a new handler for unexecutable tasks.
     *
     * @param handler the new handler
     * @throws NullPointerException if handler is null
     * @see #getRejectedExecutionHandler
     */
    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
        if (handler == null)
            throw new NullPointerException();
        this.handler = handler;
    }

    /**
     * Returns the current handler for unexecutable tasks.
     *
     * @return the current handler
     * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
     */
    public RejectedExecutionHandler getRejectedExecutionHandler() {
        return handler;
    }

    /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle.  If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @see #getCorePoolSize
     */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

    /**
     * Returns the core number of threads.
     *
     * @return the core number of threads
     * @see #setCorePoolSize
     */
    public int getCorePoolSize() {
        return corePoolSize;
    }

    /**
     * Starts a core thread, causing it to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed. This method will return {@code false}
     * if all core threads have already been started.
     *
     * @return {@code true} if a thread was started
     */
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }

    /**
     * Same as prestartCoreThread except arranges that at least one
     * thread is started even if corePoolSize is 0.
     */
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    /**
     * Starts all core threads, causing them to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed.
     *
     * @return the number of threads started
     */
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

    /**
     * Returns true if this pool allows core threads to time out and
     * terminate if no tasks arrive within the keepAlive time, being
     * replaced if needed when new tasks arrive. When true, the same
     * keep-alive policy applying to non-core threads applies also to
     * core threads. When false (the default), core threads are never
     * terminated due to lack of incoming tasks.
     *
     * @return {@code true} if core threads are allowed to time out,
     *         else {@code false}
     *
     * @since 1.6
     */
    public boolean allowsCoreThreadTimeOut() {
        return allowCoreThreadTimeOut;
    }

    /**
     * Sets the policy governing whether core threads may time out and
     * terminate if no tasks arrive within the keep-alive time, being
     * replaced if needed when new tasks arrive. When false, core
     * threads are never terminated due to lack of incoming
     * tasks. When true, the same keep-alive policy applying to
     * non-core threads applies also to core threads. To avoid
     * continual thread replacement, the keep-alive time must be
     * greater than zero when setting {@code true}. This method
     * should in general be called before the pool is actively used.
     *
     * @param value {@code true} if should time out, else {@code false}
     * @throws IllegalArgumentException if value is {@code true}
     *         and the current keep-alive time is not greater than zero
     *
     * @since 1.6
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

    /**
     * Sets the maximum allowed number of threads. This overrides any
     * value set in the constructor. If the new value is smaller than
     * the current value, excess existing threads will be
     * terminated when they next become idle.
     *
     * @param maximumPoolSize the new maximum
     * @throws IllegalArgumentException if the new maximum is
     *         less than or equal to zero, or
     *         less than the {@linkplain #getCorePoolSize core pool size}
     * @see #getMaximumPoolSize
     */
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

    /**
     * Returns the maximum allowed number of threads.
     *
     * @return the maximum allowed number of threads
     * @see #setMaximumPoolSize
     */
    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    /**
     * Sets the time limit for which threads may remain idle before
     * being terminated.  If there are more than the core number of
     * threads currently in the pool, after waiting this amount of
     * time without processing a task, excess threads will be
     * terminated.  This overrides any value set in the constructor.
     *
     * @param time the time to wait.  A time value of zero will cause
     *        excess threads to terminate immediately after executing tasks.
     * @param unit the time unit of the {@code time} argument
     * @throws IllegalArgumentException if {@code time} less than zero or
     *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
     * @see #getKeepAliveTime(TimeUnit)
     */
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
            interruptIdleWorkers();
    }

    /**
     * Returns the thread keep-alive time, which is the amount of time
     * that threads in excess of the core pool size may remain
     * idle before being terminated.
     *
     * @param unit the desired time unit of the result
     * @return the time limit
     * @see #setKeepAliveTime(long, TimeUnit)
     */
    public long getKeepAliveTime(TimeUnit unit) {
        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    }

    /* User-level queue utilities */

    /**
     * Returns the task queue used by this executor. Access to the
     * task queue is intended primarily for debugging and monitoring.
     * This queue may be in active use.  Retrieving the task queue
     * does not prevent queued tasks from executing.
     *
     * @return the task queue
     */
    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

    /**
     * Removes this task from the executor's internal queue if it is
     * present, thus causing it not to be run if it has not already
     * started.
     *
     * <p>This method may be useful as one part of a cancellation
     * scheme.  It may fail to remove tasks that have been converted
     * into other forms before being placed on the internal queue. For
     * example, a task entered using {@code submit} might be
     * converted into a form that maintains {@code Future} status.
     * However, in such cases, method {@link #purge} may be used to
     * remove those Futures that have been cancelled.
     *
     * @param task the task to remove
     * @return {@code true} if the task was removed
     */
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

    /**
     * Tries to remove from the work queue all {@link Future}
     * tasks that have been cancelled. This method can be useful as a
     * storage reclamation operation, that has no other impact on
     * functionality. Cancelled tasks are never executed, but may
     * accumulate in work queues until worker threads can actively
     * remove them. Invoking this method instead tries to remove them now.
     * However, this method may fail to remove tasks in
     * the presence of interference by other threads.
     */
    public void purge() {
        final BlockingQueue<Runnable> q = workQueue;
        try {
            Iterator<Runnable> it = q.iterator();
            while (it.hasNext()) {
                Runnable r = it.next();
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    it.remove();
            }
        } catch (ConcurrentModificationException fallThrough) {
            // Take slow path if we encounter interference during traversal.
            // Make copy for traversal and call remove for cancelled entries.
            // The slow path is more likely to be O(N*N).
            for (Object r : q.toArray())
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    q.remove(r);
        }

        tryTerminate(); // In case SHUTDOWN and now empty
    }

    /* Statistics */

    /**
     * Returns the current number of threads in the pool.
     *
     * @return the number of threads
     */
    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Returns the approximate number of threads that are actively
     * executing tasks.
     *
     * @return the number of threads
     */
    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Returns the largest number of threads that have ever
     * simultaneously been in the pool.
     *
     * @return the number of threads
     */
    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Returns a string identifying this pool, as well as its state,
     * including indications of run state and estimated worker and
     * task counts.
     *
     * @return a string identifying this pool, as well as its state
     */
    public String toString() {
        long ncompleted;
        int nworkers, nactive;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            ncompleted = completedTaskCount;
            nactive = 0;
            nworkers = workers.size();
            for (Worker w : workers) {
                ncompleted += w.completedTasks;
                if (w.isLocked())
                    ++nactive;
            }
        } finally {
            mainLock.unlock();
        }
        int c = ctl.get();
        String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                     (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                      "Shutting down"));
        return super.toString() +
            "[" + rs +
            ", pool size = " + nworkers +
            ", active threads = " + nactive +
            ", queued tasks = " + workQueue.size() +
            ", completed tasks = " + ncompleted +
            "]";
    }
}
ThreadFactory
public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
public class Executors {

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    /**
     * Default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);//Number of pools
        final ThreadGroup group;//Thread group
        final AtomicInteger threadNumber = new AtomicInteger(1);//Number of threads
        final String namePrefix;

        /*
         * Create default thread factory
         */
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
        }

        /*
         * Create a new thread
         */
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  // The name of the new thread
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // Set the background daemon thread as the application thread
            if (t.isDaemon())
                t.setDaemon(false);

            // Set the priority of all threads to NORM_PRIORITY
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);

            return t;
        }
    }

}
Worker
// The function of exclusive lock is realized by inheriting AQS
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    // Thread executing task
    final Thread thread;

    /** Initial task to run.  Possibly null. */
    // Tasks to perform
    Runnable firstTask;

    /** Per-thread task counter */
    // Number of tasks completed by thread thread
    volatile long completedTasks;

    /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
    Worker(Runnable firstTask) {
        // The AQS state is set to - 1. The main purpose is to prevent interruption before runWoker.
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // this represents the Worker object of new, and the Worker implements the Runnable interface
        // So here we use the Worker object to create a thread object
        // The start method of thread in Worker will execute the run method of Worker
        // The Worker's run method calls the runWorker(this) method of the thread pool
        // runWorker(this) is the run method that calls the worker's firstTask
        // The advantage is that the thread in the Worker can be reused to process the tasks in the blocking queue
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // Try exclusive lock
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // Release exclusive lock
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    // Interrupt started thread
    void interruptIfStarted() {
        Thread t;
        // Getstate() > = 0 indicates that the thread has started
        // Thread t cannot be null
        // And t has not been interrupted (once interrupted, there is no need to interrupt again)
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // Interrupt the thread
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    // Get thread pool status
    int c = ctl.get();
    // 1. The number of working threads in the current thread pool is less than the number of core thread pools
    if (workerCountOf(c) < corePoolSize) {
        // Use the threads in the core thread pool to process the task, and return if successful
        if (addWorker(command, true))
            return;
        // If the thread processing task calling the core thread pool fails, the thread pool status is retrieved
        c = ctl.get();
    }
    // 2. If the current state of the thread pool is still running, add the task to the blocking queue
    // If addWorker fails to add, this will be the end
    if (isRunning(c) && workQueue.offer(command)) {
        // After adding to the blocking queue successfully, re obtain the thread pool status
        int recheck = ctl.get();
        // If the current thread pool state is not running, remove the newly added task from the blocking queue
        // If remove succeeds, reject the task. Otherwise, the task has been executed
        if (!isRunning(recheck) && remove(command))
            // After removing the task, run out and refuse to handle the exception
            reject(command);
        // Otherwise, if the current thread pool thread is empty, a thread is added
        else if (workerCountOf(recheck) == 0)
            /*
                addWorker(null, false) That is, a thread is created, but no task (null) is passed in. Because the task has been added to the workQueue (the if code block is entered only after the remove(command) fails), the worker will directly obtain the task (getTask()) from the workQueue when executing.
                */
            // addWorker(null, false) in order to ensure that the thread pool is RUNNING, there must be a thread to execute the task
            addWorker(null, false);
    }
    // 3. If the blocking queue is full, a new thread processing task will be added
    else if (!addWorker(command, false))
        // Failed to add a thread to process the task, throwing a reject exception
        reject(command);
}
reject
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
addWorker
// firstTask: the task to execute
// Core: whether to add to the core thread pool
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // RS > = shutdown indicates that the current thread pool will no longer accept new tasks
        // However, when the thread pool status is SHUTDOWN and there are tasks in the blocking queue, these tasks can still be processed
        // At this time, firstTask = null. Instead of adding a new task, a new thread is added to process the task, that is:
        // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // corePoolSize is not full, the number of core threads plus 1
        // Gets the latest number of core thread pools
        // Get the latest thread pool state (if it is inconsistent with the original state, loop again)
        for (;;) {
            // Number of threads in the thread pool
            int wc = workerCountOf(c);
            // Returns if the number of threads exceeds the limit
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // Automatically increase the workerCount. If successful, exit the retry cycle
            // Otherwise, update the ctl, and then judge whether the current state has changed. If it has changed, it will be re executed from the outer for loop (the outer for loop has judgment state logic)
            // If the status does not change, retry the auto increment workerCount operation
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // Failed to set workerCount. Get thread pool status again
            c = ctl.get();  // Re-read ctl
            // If the current state of the thread pool is consistent with the state at the beginning of the method, the for loop of this layer will be cycled again
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    
    try {
        // Initialize worker
        w = new Worker(firstTask);
        // The thread that performs this task
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // Obtain the mianLock lock lock and prepare to start the thread
            // First judge the status of the thread pool, and then judge whether the thread has been started
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // RS < shutdown indicates RUNNING status
                // rs is in SHUTDOWN state and firstTask is null. Add a thread to the thread pool to process the tasks in the blocking queue
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // If the thread has been started, an exception will be thrown, and it is not allowed to call start repeatedly
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // Add task to HashSet collection
                    workers.add(w);
                    int s = workers.size();
                    // If the length of workers (task queue length) is greater than the maximum number of threads, the maximum number of threads is updated
                    // largestPoolSize records the maximum number of threads in the thread pool
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // Release mainLock lock lock
                mainLock.unlock();
            }
            // Added task to workers collection
            if (workerAdded) {
                // Start thread
                t.start();
                // Thread started
                workerStarted = true;
            }
        }
    } finally {
        // Task addition failed, or task addition succeeded but startup failed
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
addWorkerFailed
/**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // Get mainLock lock lock
    mainLock.lock();
    try {
        // If the worker fails to start, then:
        // 1. If the worker is not null, remove the task from the workers collection
        if (w != null)
            workers.remove(w);
        // 2. workerCount minus 1
        decrementWorkerCount();
        // 3. Judge whether to end the thread pool according to the thread pool status
        tryTerminate();
    } finally {
        // Release mainLock lock lock
        mainLock.unlock();
    }
}
tryTerminate
// Judge whether to end the thread pool according to the thread pool status
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();

        // When the status of the current thread pool is:

        // 1. It cannot be stopped because it is still running
        if (isRunning(c) ||
            // 2. TIDYING or TERMINATED. Other threads are already ending the thread pool. There is no need for the current thread to end
            runStateAtLeast(c, TIDYING) ||
            // 3. The status after calling the shutdown() method is SHUTDOWN, but the tasks in the queue can still be processed
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 4. If the number of threads is not 0, interrupt an idle worker thread and return
        if (workerCountOf(c) != 0) { // Eligible to terminate
            /**
			   When the shutdown() method is called, interruptIdleWorkers() will be executed,
			   This method will first check whether the thread is idle. If it is found that the thread is not idle, the thread will be interrupted,
			   Interrupt the thread to wake up the thread blocked in the task queue. But if you execute the interruptIdleWorkers() method,
			   The thread is running and is not interrupted at this time; If the thread calls getTask () after executing the task,
			   At this time, if there are no tasks in workQueue, it will be blocked when calling workQueue.take().
			   At this time, the thread misses the interrupt signal of shutdown(). If there is no additional operation, the thread will always be blocked.
			   So every time you call the tryTerminate method at the end of a worker thread to try to interrupt an idle worker thread,
			   It avoids the situation that the fetch task is blocked when the queue is empty, and makes up for the lost signal in shutdown().
                */
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // Only the following situations will continue the following logic: end the thread pool.
        // 1. In shutdown status, new tasks are no longer accepted and the task queue is empty
        // 2. STOP status, when the shutdown now method is called

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 5. Try to set the status to TIDYING here. If the setting is successful, call the terminated method
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // The terminated method does nothing by default and is left to the subclass implementation
                    terminated();
                } finally {
                    // Set the status to TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
interruptWorkers
// Interrupt all started threads
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
Methods related to interruptIdleWorkers
// Interrupt all idle threads
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // First, check whether the current thread has been interrupted. If not, check whether the thread is idle 
            // If the Worker lock associated with the thread can be obtained, it indicates that the thread is idle and can be interrupted 
            // Otherwise, the thread cannot be interrupted
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // If onlyOne is true, only the first thread is attempted to be interrupted
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private static final boolean ONLY_ONE = true;
runWorker
final void runWorker(Worker w) {
    // Get the current thread (equivalent to w's thread)
    Thread wt = Thread.currentThread();
    // Gets the task of the current Worker object 
    Runnable task = w.firstTask;
    w.firstTask = null;
    // unlock source code: release(1); 
    // When new Woker(), the state is set to - 1. Here, unlock is called to set the state to 0 and allow the worker to interrupt
    w.unlock(); // allow interrupts
    // Used to mark whether the thread exits the loop normally or abnormally
    boolean completedAbruptly = true;
    try {
        // If the task is not empty, the thread has just been created,
        // If the task is empty, take the task from the queue. If the queue has no task, the thread will block here (the take blocking method of the queue is called in the getTask method). Here, tasks are obtained from the blocking queue and executed, instead of creating a new thread to execute, which is the advantage of thread pool.
        // After the task is executed, it is set to null in the finally block, so after the first execution of the firstTask in the worker, the task will be obtained from the blocking queue for processing
        while (task != null || (task = getTask()) != null) {
            // The worker obtains the exclusive lock and is ready to execute the task
            w.lock();

            /**
                The first condition runStateAtLeast(ctl.get(), STOP) is true, indicating that the status > = stop, and the thread is not interrupted, then the thread needs to be interrupted
                The first condition runStateAtLeast(ctl.get(), STOP) is false, then judge whether the current thread is interrupted. If it is interrupted, continue to judge whether the thread pool state > = stop
                Because Thread.interrupted() was called earlier, wt.isInterrupted() is false, that is, if the thread is not interrupted, the thread needs to be interrupted
            */
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // Do some processing before task execution. Empty functions require user-defined processing logic
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // Execute tasks, that is, tasks submitted to the thread pool, and catch exceptions
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // Because the runnable method cannot throw a checkedException, so here
                    // Wrap the exception as an Error and throw it
                    thrown = x; 
                    throw new Error(x);
                } finally {
                    // Do some processing after the task is executed. The default function is null
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        // If you throw an exception when executing task.run(), you won't get here
        // Therefore, when throwing an exception, completedabortly is true, indicating that the thread exits abnormally
        completedAbruptly = false;
    } finally {
        // thread 
        processWorkerExit(w, completedAbruptly);
    }
}
getTask
// Fetch task from thread pool blocking queue
// The workerCount minus 1 operation will be performed before returning null (the thread is destroyed and exited normally)
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        // Gets the current thread pool status
        int rs = runStateOf(c);

        // RS > = stop (the thread pool does not receive tasks or process tasks in the blocking queue)
        // perhaps 
        // RS > = shutdown and the blocking queue is empty (the thread pool does not receive tasks and processes the tasks in the blocking queue)
        // At this time, the number of core threads is reduced by 1, and null is returned directly. The thread will no longer continue to process tasks
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // Method implementation: do {} while (! compareAndDecrementWorkerCount(ctl.get()));
            // Continuously obtain the number of threads and reduce the number of core threads
            decrementWorkerCount();
            // The current thread needs to be destroyed
            return null;
        }

        // Gets the number of worker threads in the current thread pool
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed is used to determine whether timeout control is required.
        // allowCoreThreadTimeOut is false by default, that is, the core thread is not allowed to timeout
        // WC > corePoolSize indicates that the number of threads in the current thread pool is greater than corePoolSize. For these threads that exceed the number of core threads, timeout control is required
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // WC > maximumPoolSize the thread needs to be destroyed (setMaximumPoolSize may cause the maximumPoolSize to become smaller)
        // (here wc either > maximumpoolsize or > corepoolsize, so destroying threads will not affect the thread pool.)
        // Timed & & timedout indicates that the thread is idle and timeout, and the thread needs to be destroyed
        if ((wc > maximumPoolSize || (timed && timedOut))
            // wc is greater than 1 or the blocking queue is empty
            && (wc > 1 || workQueue.isEmpty())) {
            // The number of threads decreases automatically
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // Judge whether the workQueue has timed out to wait for the queue task or has been blocking the waiting task according to timed
            Runnable r = timed
                // Timeout wait: when the task is not obtained after the given keepAliveTime, null will be returned, and Woker will be destroyed (when the getTask method returns null, the while loop will jump out in the runWorker method, and then the processWorkerExit method will be executed)
                // keepAliveTime is the idle time of the thread (so it can be used as the waiting timeout for obtaining tasks)
                ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
                // Blocking wait: keep blocking until a task comes in
                : workQueue.take();
            if (r != null)
                return r;
            // The acquisition task timed out and needs to go through the cycle again to obtain the task
            timedOut = true;
        } catch (InterruptedException retry) {
            // If the current thread is interrupted when obtaining the task, set timedOut to false and return to loop retry
            timedOut = false;
        }
    }
}
processWorkerExit
// The processWorkerExit method logic is similar to the addWorkerFailed method logic
// processWorkerExit needs to add the number of tasks completed by the thread to all completed tasks in the thread pool
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // If the completedAbruptly value is true, it indicates that an exception occurred during thread execution. Reduce the workerCount by 1
    // If there is no exception during thread execution, it indicates that the workerCount may have been reduced by 1 in the getTask() method. There is no need to reduce it here
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Count the number of tasks completed
        completedTaskCount += w.completedTasks;
        // Removing from workers means that a worker thread is removed from the thread pool
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // Judge whether to end the thread pool according to the thread pool status
    tryTerminate();

    int c = ctl.get();
    
    // In the getTask method, there are too many threads to destroy
    // The number of threads here is small. You need to add threads
    // The status is RUNNING or SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // It is not an abnormal exit. It indicates that the exit is caused by returning null from getTask()
        if (!completedAbruptly) {
            // The minimum number of threads. If allowCoreThreadTimeOut is true, the core threads can be destroyed, so the minimum number can be 0. Otherwise, at least corePoolSize core threads should be reserved
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && !workQueue.isEmpty())
                // If allowCoreThreadTimeOut = true and there are tasks in the waiting queue, at least one thread is reserved to process the tasks
                // Fixed the minimum number of core threads to 1
                min = 1;
            // If allowCoreThreadTimeOut = false, the workerCount should not be less than the corePoolSize
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // Abnormal exit
        // In the thread pool, if the current number of active threads is not greater than or equal to min, add a thread to the thread pool to process tasks
        addWorker(null, false);
    }
}
hook method
/* Extension hooks */

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

/**
     * Method invoked when the Executor has terminated.  Default
     * implementation does nothing. Note: To properly nest multiple
     * overridings, subclasses should generally invoke
     * {@code super.terminated} within this method.
     */
protected void terminated() { }
Shutdown and shutdown now related methods
/*
     * Methods for controlling interrupts to worker threads.
     */

/**
     * If there is a security manager, makes sure caller has
     * permission to shut down threads in general (see shutdownPerm).
     * If this passes, additionally makes sure the caller is allowed
     * to interrupt each worker thread. This might not be true even if
     * first check passed, if the SecurityManager treats some threads
     * specially.
     */
private void checkShutdownAccess() {
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkPermission(shutdownPerm);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                security.checkAccess(w.thread);
        } finally {
            mainLock.unlock();
        }
    }
}

void onShutdown() {
}

final boolean isRunningOrShutdown(boolean shutdownOK) {
    int rs = runStateOf(ctl.get());
    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}

/**
* Take out the unexecuted tasks in the blocking queue and return
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // Drawnto obtains all available data objects from BlockingQueue at one time (you can also specify the number of data obtained)
    // Through this method, the efficiency of obtaining data can be improved without locking or releasing locks in batches for many times
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        // Convert the List into an array, loop, and take out the unfinished elements of the drawnto method
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Check whether the current thread has permissions on the closed thread pool
        checkShutdownAccess();
        // Promote the thread pool state to SHUTDOWN
        advanceRunState(SHUTDOWN);
        // Interrupt the idle thread, and finally call interruptIdleWorkers(false);
        interruptIdleWorkers();
        // The hook method, which is empty by default, allows users to do some operations when the thread pool is closed
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // Check whether the thread pool can be closed
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Check whether the thread has permission to close the thread pool
        checkShutdownAccess();
        // Promote the thread pool state to STOP
        advanceRunState(STOP);
        // Interrupt all worker threads, whether idle or not
        interruptWorkers();
        // Take out the tasks that are not executed in the queue
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

public boolean isShutdown() {
    return ! isRunning(ctl.get());
}

public boolean isTerminating() {
    int c = ctl.get();
    return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}

public boolean isTerminated() {
    return runStateAtLeast(ctl.get(), TERMINATED);
}

// Returns when the thread pool status changes to TERMINATED, or the time has timed out. Because the whole process has exclusive locks, it is generally used after calling shutdown or shutdown now
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            // Returns true if the thread pool status is TERMINATED
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            // If the timeout occurs and the status is not TERMINATED, false is returned
            if (nanos <= 0)
                return false;
            // Timeout wait
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}
RejectedExecutionHandler
public interface RejectedExecutionHandler {
    //  r is the requested task, and executor is the thread pool
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

/* Predefined RejectedExecutionHandlers */

/**
Run the currently discarded task directly in the caller thread
 This will not really discard the task, but the performance of the task submission thread is likely to decline sharply
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code CallerRunsPolicy}.
         */
    public CallerRunsPolicy() { }

    /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

/**
Throw an exception directly
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
         * Creates an {@code AbortPolicy}.
         */
    public AbortPolicy() { }

    /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // Throw RejectedExecutionException exception
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

/**
Discard unprocessable tasks
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code DiscardPolicy}.
         */
    public DiscardPolicy() { }

    /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // Discard the task without processing
    }
}

/**
Discards the earliest blocked thread in the queue
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
    public DiscardOldestPolicy() { }

    /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // Take the first element from the blocking queue in the thread pool (discard the longest pending task in the queue)
            e.getQueue().poll();
            // Put the requested task r into the thread pool again for execution
            e.execute(r);
        }
    }
}

ScheduledThreadPoolExecutor source code

The source code of ScheduledThreadPoolExecutor will be analyzed later.

Keywords: thread pool

Added by Jenling on Thu, 02 Dec 2021 06:32:38 +0200