Source code analysis of Java thread pool ThreadPoolExecutor

catalogue

Member variable

Constructor

Core threads and maximum threads

Thread lifetime

Work queue

Thread factory

Reject policy

It is also in the JUC concurrency package and is the name of our concurrency development.

Why do thread pools appear? It can be said that this is a good tool to help us manage multiple threads running in the program, which can avoid the confusion of multiple threads and reduce the overhead caused by unnecessary thread creation or thread destruction.

Member variable

/*Since:
1.5
Author:
Doug Lea
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
  	//int type 32 bits to indicate the status of threads (left 3 bits) and the number of threads (right 29 bits)
		private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; //29
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//29 1

    // Five states of threads
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Get thread status and number of threads
    private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
    private static int workerCountOf(int c)  { return c & COUNT_MASK; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     */

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Try to increment the workerCount field of ctl using CAS.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Try to decrement the workerCount field of ctl
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     Reduce the workerCount field of ctl. This is called only when the thread terminates abruptly (see processWorkerExit). Other decrements are executed in getTask.
     */
    private void decrementWorkerCount() {
        ctl.addAndGet(-1);
    }

    /**
    A queue used to hold tasks and hand them over to a worker thread. We don't require workqueue Null returned by poll() must mean workqueue isEmpty(), so we only rely on isEmpty to check whether the queue is empty (for example, we must do this when deciding whether to switch from SHUTDOWN to TIDYING) This applies to special-purpose queues, such as allowing poll() to return null DelayQueues, even though it may later return non null when the delay expires.
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
    Lock access to worker sets and related bookkeeping. Although we can use some kind of concurrency set, it turns out that it is usually best to use locks. One reason is that it serializes interruptIdleWorkers, thus avoiding unnecessary interruption storms, especially during shutdown. Otherwise, exiting the thread will interrupt those threads that have not been interrupted at the same time. It also simplifies some related statistical bookkeeping such as largestPoolSize. We also hold mainLock during shutdown and shutdown now. In order to ensure the stability of worker set, we check the permissions of interrupt and actual interrupt respectively.
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
		 Contains a collection of all worker threads in the pool. Accessed only when mainLock is held.
     */
    private final HashSet<Worker> workers = new HashSet<>();

    /**
     * Wait for conditions to support awaitTermination.
     */
    private final Condition termination = mainLock.newCondition();

    /**
     Tracks the maximum pool size reached. Can only be accessed under mainLock
     */
    private int largestPoolSize;

    /**
     Counters that complete the task. Updates only when the worker thread terminates. Can only be accessed under mainLock.
     */
    private long completedTaskCount;

    /*
     * All user control parameters are declared as volatiles so that
     * ongoing actions are based on freshest values, but without need
     * for locking, since no internal invariants depend on them
     * changing synchronously with respect to other actions.
     */

    /**
    Factory for new threads. All threads are created using this factory (through the method addWorker). All callers must be prepared for addWorker failure, which may reflect the system or user's policy of limiting the number of threads. Even if it is not considered an error, a thread creation failure may result in a new task being rejected or an existing task stuck in the queue. Let's go further and keep the pool invariant even when an error such as OutOfMemoryError may be thrown when trying to create a thread. As required in thread The native stack is allocated in start, so such errors are quite common, and users will want to perform a clean pool shutdown for cleanup. There may be enough memory for the cleanup code to complete without encountering another OutOfMemoryError.
     */
    private volatile ThreadFactory threadFactory;

    /**
     *Handler called when saturated or closed during execution.
     */
    private volatile RejectedExecutionHandler handler;

    /**
     Timeout (in nanoseconds) of idle threads waiting to work. This timeout is used by threads when there is more than corePoolSize or CoreThreadTimeOut allowed. Otherwise they will always wait for a new job.
     */
    private volatile long keepAliveTime;

    /**
     If false (default), the core thread remains active even when idle. If true, the core thread uses keepAliveTime timeout to wait for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
		The core pool size is the minimum number of worker threads that remain active (timeout is not allowed), unless allowCoreThreadTimeOut is set, in which case the minimum value is zero. Because the worker count is actually stored in COUNT_BITS bits, so the valid limit is corepoolsize & count_ MASK . 
     */
    private volatile int corePoolSize;

    /**
		Maximum pool size. Because the worker count is actually stored in COUNT_BITS bits, so the valid limit is maximumpoolsize & count_ MASK . 
     */
    private volatile int maximumPoolSize;

    /**
     * Default reject handler.
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    
}

There are a lot of source code member variables. I'll explain the important parts here. ctl is an AutomicInteger, which occupies 32 bits in memory like Integer, because there are five states of thread pool (RUNNING, SHUTDOWN, STOP, TIDYING and TERMINATED).

  • RUNNING: accept new tasks and handle queued tasks
  • SHUTDOWN: does not accept new tasks, but processes queued tasks

  • STOP: do not accept new tasks, do not process queued tasks, and interrupt ongoing tasks

  • TIDYING: all tasks have been terminated, workerCount is zero, and the thread transitioning to TIDYING will run terminate()

  • TERMINATED: terminate() completed.

The numerical order between these values is important to allow orderly comparison runState increases monotonically over time, but does not need to hit each state. The conversion is: when calling shutdown(): RUNNING - > SHUTDOWN; When calling SHUTDOWN now () (RUNNING or SHUTDOWN) - > stop; When the queue and pool are empty, stop - > tidying; when the pool is empty, SHUTDOWN - > tidying; When the terminate() hook method is completed, tidying - > TERMINATED; The thread waiting in awaitTermination() will return when the state reaches TERMINATED.

The first three bits of ctl can represent the current thread pool status, and then the other 29 bits can represent the current number of threads, up toThreads.

Because ctl is AutoInteger, its increment and decrement operations are atomic. mainLock and termination are the locks and condition variables required in the operation. The parameters that need to be specified in the constructor in the thread pool are described below.

Constructor

    /**
     Create a new ThreadPoolExecutor with the given initial parameters.
      Parameters:
      corePoolSize – The number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
      maximumPoolSize – Maximum number of threads allowed in the pool
      keepAliveTime – When the number of threads is greater than the number of cores, this is the maximum time for redundant idle threads to wait for new tasks before terminating.
      unit – keepAliveTime The time unit of the parameter
      workQueue – A queue used to save tasks before they are executed. This queue will only hold Runnable tasks submitted by the execute method.
      threadFactory – The factory used by the executor to create a new thread
      handler – Execute the handler used when blocked because the thread boundary and queue capacity have been reached
      Throw:
      IllegalArgumentException – If one of the following conditions holds: corepoolsize < 0 keepalivetime < 0 maximumpoolsize < = 0 maximumpoolsize < corepoolsize
      NullPointerException – If workQueue or threadFactory or handler is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

There are multiple thread pool constructors. I will only talk about the one with the most parameters. Because there are many parameters, I decided to speak separately.

Core threads and maximum threads

    /**
    The core pool size is the minimum number of worker threads that remain active (timeout is not allowed), unless allowCoreThreadTimeOut is set, in which case the minimum value is zero. Because the worker count is actually stored in COUNT_BITS bits, so the valid limit is corepoolsize & count_ MASK . 
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size.
			Maximum pool size. Because the worker count is actually stored in COUNT_BITS bits, so the valid limit is maximumpoolsize & count_ MASK . 
     */
    private volatile int maximumPoolSize;	

    /**

Here, the concepts of the number of core threads and the maximum number of threads are as follows: we assume that the number of core threads is x and the maximum number of threads is y (there must be Y > = x). For whether the thread factory creates threads, the current number of threads has been, there will be the following judgment mode.

  1. First, judge the relationship with the number of core threads x, if, the thread is directly created, the task is executed, and the. if Then go to step 2.
  2. Consider the capacity of the current work queue. If the capacity of the current working thread is insufficient, create a thread and insert it into the working thread. If the worker thread capacity is full, proceed to step 3.
  3. Compare the current number of threads with the maximum number of threads y, if, the factory creates a non core thread to execute the task,​. Otherwise, proceed to step 4.
  4. If, the factory cannot create a thread for it and needs to execute a reject policy.

Thread factories, work queues, and rejection policies will be discussed later. Let's talk about how to set the number of core threads and the maximum number of threads.

In java, we can get the number of CPU s of our computer in this way.

 int CPU_COUNT = Runtime.getRuntime().availableProcessors();

For example, on my machine, 6-core 12 threads are 12 (representing up to 12 threads running at the same time).

  • For CPU intensive, in order to improve efficiency, you want to reduce the switching of thread tasks, so set the CPU_ Just count. However, setting the number of threads to the number of CPU cores + 1 will achieve optimal utilization. Even when the intensive thread is blocked due to occasional memory page failure or other reasons, this additional thread can ensure that the CPU clock cycle will not be wasted, so as to ensure the CPU utilization.

  • For IO intensive tasks, the maximum number of threads is generally many times larger than the number of CPU cores, because the IO read-write speed is relatively slow compared with the CPU speed. If we set too few threads, it may lead to a waste of CPU resources. If we set more threads, when some threads are waiting for IO, they do not need CPU to calculate at this time. Then other threads can use CPU to perform other tasks without affecting each other. In this way, the tasks waiting in the task queue will be reduced and resources can be better used. The recommended setting here is as follows: number of threads = number of CPU cores * (1 + IO time / CPU time).

Thread lifetime

 /**
    Timeout (in nanoseconds) of idle threads waiting to work. This timeout is used by threads when there is more than corePoolSize or CoreThreadTimeOut allowed. Otherwise they will always wait for a new job.
     */
    private volatile long keepAliveTime;


/*
    If this pool allows the core thread to time out and terminate when no task arrives within the keepAlive time, return true and replace it as needed when a new task arrives. If true, the same keep alive policy that applies to non core threads also applies to core threads. When false (the default), the core thread will never terminate due to the lack of incoming tasks.
    return:
    true if the core thread is allowed to time out, otherwise false
    since:
    1.6
     *
     * @since 1.6
     */
    public boolean allowsCoreThreadTimeOut() {
        return allowCoreThreadTimeOut;
    }

Generally speaking, this keepAliveTime is for non core threads. When a non core thread is not assigned to execute a task after waiting for keepAliveTime * time unit, it will be destroyed and the current number of threads will be reduced​. In addition, if the allowCoreThreadTimeOut member variable is set to true, the core thread will also be destroyed if it has not been assigned to perform tasks after waiting for the keepAliveTime time unit.

Work queue

The BlockingQueue interface is used here, which inherits from Queue.

public interface BlockingQueue<E> extends Queue<E> {
	...
}
/**
The implementation classes are
ArrayBlockingQueue.  //Bounded queue
DelayQueue
LinkedBlockingDeque Unbounded queue
PriorityBlockingQueue //Unbounded priority queue
SynchronousQueue //Synchronous handover queue
*/

The commonly used ones are ArrayBlockingQueue, PriorityBlockingQueue, LinkedBlockingDeque and SynchronousQueue, which are often used as work queues. Space reasons, do not analyze its source code, a brief description.

  • ArrayBlockingQueue: bounded blocking queue. The capacity needs to be specified. It can be fair or unfair. If it is fair, it can be FIFO. If it is not fair, it can be the unfair lock used by the bottom layer (the proportion of event execution time may be considered).

  • Priority blocking queue: priority blocking queue (unbounded), similar to priority queue. However, it is blocked, because it is unbounded, in fact, it will not block..

  • LinkedBlockingDeque: a typical unbounded queue.

  • SynchronousQueue: synchronous handover queue with capacity of 0. That is, when you want to get a thread from the queue, someone must come and put the thread. Otherwise, you will think that the thread is blocked, so go to see the maximum number of threads.

Thread factory

A threadfactory is actually an interface.

public interface ThreadFactory {

    /**
    Construct a new Thread. The implementation can also initialize priority, name, daemon status, ThreadGroup, etc.
    Parameters:
    r - A runnable object executed by a new thread instance
    return:
    The constructed thread is null if the request to create the thread is rejected
     */
    Thread newThread(Runnable r);
}

Here you can implement the newThread method to specify the rules for thread creation: thread name, thread group, etc., which is convenient for unified management.

Reject policy

The rejection policy is the response made when the thread pool cannot create a new thread for the requestor to execute a task. There are four kinds.

  1. AbortPolicy: the default policy, that is, an exception is thrown.

  2. CallerRunsPolicy: runs directly in the calling thread (such as the main thread).

  3. DiscardOldestPolicy: attempts to eject the first thread in the work queue and then execute the target task.

  4. DiscardPolicy: do nothing, return nothing, and throw no exception.

Keywords: Java Concurrent Programming thread pool

Added by dfarrell on Mon, 17 Jan 2022 01:14:55 +0200