Java thread pool ThreadPoolExecutor

What is a thread pool

  1. In order to avoid frequent thread creation and destruction, we can reuse the created threads.
  2. There are always several active threads in the thread pool. When the thread needs to be used, get a free thread. When the work is completed, it is not urgent to turn off the thread, but return the thread to the thread pool for easy use by others
  3. In short, after using the thread pool, starting a thread becomes finding an idle thread in the thread pool, and destroying a thread becomes returning the thread to the thread pool

characteristic

  1. Controls the maximum number of parallel threads
  2. Thread reuse
  3. Management thread

working principle


For example, if CorePool equals 10 and maxnumPoolSize=50, the blocking queue uses ArrayBlockingQueue(50)

  1. When submitting a thread to the thread pool, first judge whether the working thread exceeds the corepool, where the corepool is 10
  2. If not, a new xianc (wrapped by the worker) will be created through ThreadFactory, and then the task will continue
  3. If it exceeds, it will be added to the blocking queue for queuing (here, the length of ArrayBlockingQueue used for blocking queue is 50)
  4. If the queue is full (that is, it exceeds 50), it will judge whether the current working thread is less than maxnumPoolSize (where maxnumPoolSize is 50). If it is less than 50, it will continue to create threads (packaged as worker objects)
  5. If maxnumPoolSize is also exceeded, the reject policy will be executed.

ThreadPoolExecutor constructor

    public ThreadPoolExecutor(int corePoolSize, // Number of core threads
                              int maximumPoolSize, // Maximum number of threads
                              long keepAliveTime, // When the number of threads is greater than the number of core threads, the maximum lifetime of idle threads is keepAliveTime
                              TimeUnit unit, // The time unit of keepAliveTime
                              BlockingQueue<Runnable> workQueue, // Task queue to be executed
                              ThreadFactory threadFactory, // Factory class for creating new threads
                              RejectedExecutionHandler handler) { // The reject policy implementation class used when execution cannot be performed due to reaching thread boundaries and queue capacity

    }

  • corePoolSize (required): number of core threads. That is, the number of threads that remain alive in the pool, even if they are idle. However, when the allowCoreThreadTimeOut (required) parameter is set to true, the core thread will also be recycled if it is idle for more than a period of time.
  • maximumPoolSize (required): maximum number of threads. When the core thread is full and the blocking queue is full, the thread pool will temporarily create additional threads until the number of threads reaches the upper limit of the maximumPoolSize value
  • keepAliveTime (required): when the number of threads is greater than the number of core threads, the maximum lifetime of an idle thread is keepAliveTime, and the thread will be recycled. When the allowCoreThreadTimeOut parameter is set to true, the core thread will also be recycled.
  • Unit (required): time unit
  • workQueue (required): task queue, implemented by blocking queue
  • threadFactory (optional): thread factory. Specifies how threads are created by the thread pool.
  • Handler (optional): reject policy. When the number of threads in the thread pool reaches maximumPoolSize and the workQueue is full, subsequent submitted tasks will be rejected. The handler can specify how to reject tasks.

Blocking task queue

To use ThreadPoolExecutor, you need to specify a task waiting queue that implements the BlockingQueue interface. In the API document of ThreadPoolExecutor thread pool, three kinds of waiting queues are recommended: synchronous queue, LinkedBlockingQueue and ArrayBlockingQueue;

  1. SynchronousQueue: a queue that contains only one element. The thread that inserts an element into the queue is blocked until another thread gets the element stored in the queue from the queue. Similarly, if a thread attempts to get an element and no element currently exists, the thread will be blocked until the thread inserts the element into the queue.
  2. LinkedBlockingQueue: unbounded queue (strictly speaking, it is not unbounded, and the upper limit is Integer.MAX_VALUE), which is based on the linked list structure. After using unbounded queue, when the core threads are busy, subsequent tasks can join the queue indefinitely, so the number of threads in the thread pool will not exceed the number of core threads. This queue can improve thread pool throughput, but at the expense of memory space, and even lead to memory overflow. In addition, the capacity can be specified when using it, so it is a bounded queue.
  3. ArrayBlockingQueue: bounded queue, implemented based on array. When the thread pool is initialized, the capacity of the queue is specified and cannot be adjusted later. This bounded queue helps to prevent resource depletion, but may be more difficult to adjust and control.

In addition, Java provides four other queues:

  1. PriorityBlockingQueue: an unbounded blocking queue that supports prioritization
  2. DelayQueue: delay queue. Based on binary heap, it has the characteristics of unbounded queue, blocking queue and priority queue.
  3. LinkedBlockingDeque: double ended queue. Based on the linked list implementation, you can insert / take out elements from the tail and insert / take out elements from the head.
  4. LinkedTransferQueue: an unbounded blocking queue composed of a linked list structure. When the queue is special, a preemption mode is adopted, which means that when the consumer thread fetches elements, if the queue is not empty, the data will be directly fetched. If the queue is empty, a node (node element is null) will be generated to join the queue, and then the consumer thread will be waiting on this node. When the producer thread joins the queue, it will find a node with null element, The producer thread will not join the queue. It will directly fill the element into the node, wake up the waiting thread of the node, and the awakened consumer thread will take away the element.

Reject policy

  1. AbortPolicy (default): discards the task and throws a RejectedExecutionException exception.
  2. CallerRunsPolicy:
  3. DiscardPolicy: directly discard the task without throwing any exceptions.
  4. DiscardOldestPolicy: forcibly take out the waiting task currently at the head of the waiting queue, and then try to submit the currently rejected task to the thread pool for execution.

Thread pool status

Executors encapsulate thread pools

  1. FixedThreadPool: creates a fixed capacity thread pool. The feature is that the number of core threads is equal to the maximum number of threads, which means that only core threads can be created in the thread pool. keepAliveTime is 0, that is, the thread will be recycled immediately after executing the task. No capacity is specified for the task queue, which means the default value integer.max is used_ VALUE. It is suitable for scenarios where concurrent threads need to be controlled.
  2. Singlethreadexecution: single thread pool. The feature is that there is only one thread (core thread) in the thread pool. The thread is recycled immediately after executing the task, and the bounded blocking queue is used (the capacity is not specified, and the default value is Integer.MAX_VALUE)
  3. ScheduledThreadPool: timed thread pool. Specify the number of core threads. The number of ordinary threads is unlimited. The thread will be recycled immediately after executing the task. The task queue is a delay blocking queue. This is a special thread pool, which is suitable for executing timed or periodic tasks.
  4. CachedThreadPool: cache thread pool. There is no core thread, and the number of ordinary threads is Integer.MAX_VALUE (which can be understood as infinite). The thread is recovered after it is idle for 60s. The task queue uses the synchronization queue with no capacity. It is suitable for scenes with large amount of tasks but low time-consuming.

Source code interpretation

execute

    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        /** ctl Record workCount and runState */
        int c = ctl.get();

        /** case1: If the number of threads in the thread pool is less than the number of core threads, a thread is created and executed*/
        if (workerCountOf(c) < corePoolSize) { // workerCountOf(c): gets the number of currently active threads
            /**
             * Create a new thread in the thread pool
             * command: Runnable thread to execute
             * true: When adding a new thread, whether the [number of currently active threads] is < corepoolsize
             * false: When adding a new thread, whether the [number of currently active threads] is < maximumpoolsize
             */
            if (addWorker(command, true)) {
                // If the new thread is successfully added, it returns directly.
                return;
            }
            // If adding a new thread fails, retrieve [number of currently active threads]
            c = ctl.get();
        }

        /** Step 2: if the current thread pool is running and the task is added to the queue successfully
         * (That is: case2: if workcount > = corepoolsize, create a thread, add a thread task to the workQueue, and wait for execution)*/
        // BlockingQueue < runnable > workqueue and Runnable command
        if (isRunning(c) && workQueue.offer(command)) { // Add the command to the workQueue queue.
            // Retrieve ctl
            int recheck = ctl.get();
            // check again whether the current thread pool is in the running state. If it is not in the running state, remove the command just added to the workQueue and call the reject policy
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) { // If the number of currently active threads is 0, execute the addWork method
                /**
                 * null: Only create threads, but don't start them
                 * false: When adding a thread, judge according to the maximumPoolSize
                 *
                 * If (workercountof (recheck) > 0, it will be returned directly. The command in the queue will be out of the queue and executed later
                 */
                addWorker(null, false);
            }
        }

        /**
         * Step 3: if one of the following two conditions is met, enter the judgment statement in step 3
         *  case1: The thread pool is not running, i.e. isRunning(c)==false
         *  case2: workCount >= corePoolSize And failed to add workQueue queue. That is: workQueue.offer(command)==false
         *
         * Since the second parameter passes false, if workcount < maximumpoolsize, an execution thread will be created; otherwise, enter the method body to execute reject(command)
         */
        else if (!addWorker(command, false)) {
            // Execute thread creation failed reject policy
            reject(command);
        }
    }

Basic process:

  1. Obtain workcount and runState according to ctl
  2. If the ctl is less than the number of core threads, addwork will be called to add a worker thread and add a task
  3. If the ctl is greater than the number of core threads, it will be judged that the current thread pool is running and the task is successfully added to the queue
  4. After the insertion is successful, judge whether the number of working threads is 0 again. If so, execute the addWork method, otherwise return directly
  5. Finally, when the thread pool is not running and the task fails to be added to the queue, the reject policy is executed

addWorker (Runnable firstTask, boolean core) method

Parameter Description:

  1. Runnable firstTask: the task that the newly created thread should run first (empty if not).
  2. boolean core: this parameter determines the constraint condition of thread pool capacity, that is, the limit value of the current number of threads. If the parameter is true, corePollSize is used as the constraint value, otherwise maximumPoolSize is used.

private boolean addWorker(Runnable firstTask, boolean core) {
    // Outer loop: judge thread pool status
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 
         * 1.The thread pool is in non Running status (in Running status, you can add core threads or accept tasks)
         * 2.The thread is in shutdown state and firstTask is empty and the queue is not empty
         * 3.If condition 1 is satisfied and condition 2 is not satisfied, false is returned
         * 4.Interpretation of condition 2: when the thread pool is in the shutdown state and the task queue is not empty, you can add a thread of empty tasks to process the tasks in the queue
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

		// Inner loop: add the core thread to the thread pool and return the result of whether the addition is successful
        for (;;) {
            int wc = workerCountOf(c);
            // Verify whether the number of existing threads in the thread pool exceeds the limit:
            // 1. Maximum thread pool CAPACITY 
            // 2.corePoolSize or maximumPoolSize (depending on the input core)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) 
                return false;
            // Make the number of working threads + 1 through CAS operation to jump out of the outer loop
            if (compareAndIncrementWorkerCount(c)) 
                break retry;
            // Thread + 1 failed, reread ctl
            c = ctl.get();   // Re-read ctl
            // If the thread pool state is no longer running at this time, the outer loop is restarted
            if (runStateOf(c) != rs)
                continue retry;
            // Other CAS failed because the number of working threads changed. Continue the inner loop and try CAS to increase the number of threads by 1
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    /**
     * Number of core threads + 1 successful subsequent operations: add to the worker thread collection and start the worker thread
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // The following code needs to be locked: thread pool master lock
            mainLock.lock(); 
            try {
                // When the thread factory fails to create a thread or closes before acquiring a lock, it exits
                int c = ctl.get();
                int rs = runStateOf(c);

				// Verify again whether the thread pool is running or thread pool shutdown, but the thread task is empty
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // If the thread has been started, an illegal thread state exception is thrown
                    // Why does this state exist? Unresolved
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w); //Join thread pool
                    int s = workers.size();
                    // If the current number of worker threads exceeds the maximum number of threads that have ever occurred in the thread pool, refresh the latter value
                    if (s > largestPoolSize)
                        largestPoolSize = s; 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();  // Release lock
            }
            if (workerAdded) { // The worker thread was added successfully. Start the thread
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //If the thread fails to start, it enters addWorkerFailed
        if (! workerStarted) 
            addWorkerFailed(w);
    }
    return workerStarted;
}

unscramble

1. The outer loop determines whether a worker thread can be added in the thread pool. This layer verification is based on the following two principles:

  1. When the thread pool is in Running status, you can accept new tasks or process tasks
  2. When the thread pool is in Stop status, only the worker of empty tasks can be added to process the tasks in the task queue, and new tasks cannot be accepted

2. The inner loop adds a worker thread to the thread pool and returns the result of whether the addition is successful.

  1. Then check whether the number of threads has exceeded the limit. If yes, return false. Otherwise, go to the next step
  2. Make the number of working threads + 1 through CAS. If successful, go to step 3. If failed, verify whether the thread pool is running again. If yes, continue the inner loop, and if not, return to the outer loop

3. Number of core threads + 1 successful subsequent operations: add to the worker thread collection and start the worker thread

  1. First, after obtaining the lock, verify the thread pool status again (see code comments for specific verification rules). If it passes, go to the next step. If it fails, add a thread loss
  2. After the thread pool status verification passes, check whether the thread has been started. If yes, an exception will be thrown. Otherwise, try to add the thread to the thread pool
  3. Check whether the thread is started successfully. If successful, return true. If failed, enter the addWorkerFailed method

addWorkerFailed

    /**
     * Rollback created worker thread
     * 1> Remove the worker from the workers queue (if it exists in the queue).
     * 2> workerCount Minus one.
     * 3> Recheck whether to terminate to prevent the existence of this worker from preventing termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null) {
                workers.remove(w); // Step 1: remove w from the work queue
            }
            // Step 2: workerCount minus 1
            decrementWorkerCount();
            // Step 3:
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * If (state is SHUTDOWN and the thread pool and queue are empty) or (state is STOP and the thread pool and queue are empty), the state is converted to TERMINATED.
     * If it can be terminated, but the workerCount is non-zero, the idle worker is interrupted to ensure that signal propagation is turned off.
     * This method must be called after any operation that may terminate operations - reduce the worker count or delete the task from the queue during closing.
     * The method is non private to allow access from the ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (; ; ) {
            int c = ctl.get();
            /**
             * Only runState=STOP or shutdown (and workQueue is empty) can continue execution
             */
            if (isRunning(c) || // Determine whether c is RUNNING
                    runStateAtLeast(c, TIDYING) || // Determine whether c is TIDYING or TERMINATED
                    (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) { // Judge if c is in SHUTDOWN state and workQueue is not empty
                return;
            }
            // The number of currently active threads is not 0
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // Because ONLY_ONE=true, so interrupt() is executed only for one idle thread
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // Try to set the running state of the current thread pool to TIDYING and the number of active threads to 0
                    try {
                        terminated(); // Empty method
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0)); // Set the running state of the current thread pool to TERMINATED and the number of active threads to 0
                        termination.signalAll(); // Wake up all threads
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

technological process

  1. Remove the worker s
  2. workerCount -1
  3. Recheck whether to terminate to prevent the existence of this worker from preventing termination.

runWorker

1. Judge whether the current task or the task obtained from the task queue is not empty. If both are empty, go to step 2, otherwise go to step 3

2. If the task is empty, set completedAbruptly to false (that is, the thread does not terminate suddenly), and execute the processWorkerExit(w,completedAbruptly) method to enter the thread exit program

3. If the task is not empty, enter the cycle and lock it

4. Judge whether to add an interrupt ID to the thread. If one of the following two conditions is met, add an interrupt ID:

  • Thread pool status > = STOP, i.e. STOP or TERMINATED
  • At first, it is judged that the thread pool status is < STOP. Next, it is found that Thread.interrupted() is true, that is, the thread has been interrupted. Check again whether the thread pool status is > = STOP (to eliminate the effect of the instant shutdown method and make the thread pool STOP or TERMINATED)

5. After executing the preceding method beforeExecute(wt, task) (this method is empty and implemented by subclasses), execute the task.run() method to execute the task (if the execution is unsuccessful, throw the corresponding exception)

6. After executing the post method afterexecute (task, throw) (this method is empty and implemented by subclasses), the number of completed tasks in the thread pool will be + 1 and the lock will be released.

7. Judge the cycle condition again.

processWorkerExit()

Processworkerexit (worker W, Boolean completed abortly) is a method for executing thread exit

Parameter Description:

  1. Worker w: the worker thread to end.
  2. boolean completedAbruptly: whether to complete suddenly (caused by exception). If the worker thread dies due to user exception, the completedAbruptly parameter is true
/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * 1.Worker-1 operation
     * 1)If completedAbruptly is true, it indicates that the worker thread has an exception, and the number of working threads will be - 1
     * 2)If completedAbruptly is false, it means that the worker thread has no task to execute, and the worker-1 operation is performed by getTask()
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2. Remove the worker thread from the thread set. This process requires locking
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Append the number of tasks completed by the worker to the number of tasks completed by the thread pool
        completedTaskCount += w.completedTasks;
        // Remove the worker from HashSet < worker >
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
	// 3. Judge whether to end the thread pool according to the thread pool status
    tryTerminate();
	
	/**
     * 4.Need to add worker threads
     * The thread pool status is running or shutdown
     * If the current thread terminates suddenly, addWorker()
     * If the current thread does not terminate suddenly, but the number of current threads < the number of threads to maintain, addWorker()
     * Therefore, if the thread pool shutdown() is called until the workQueue is unprecedented, the thread pool will maintain corePoolSize threads, and then gradually destroy these corePoolSize threads
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

technological process

1. If completedAbruptly is true, that is, the worker thread suddenly dies because of an exception, the worker thread - 1 operation is executed.

2. After the main thread obtains the lock, the number of tasks completed by the thread pool is added to the number of tasks completed by w (current worker thread), and the current worker is removed from the worker's set collection.

3. Judge whether to execute tryTerminate() to end the thread pool according to the thread pool status.

4. Whether to add worker threads. If the thread pool has not been completely terminated, a certain number of threads still need to be maintained.

  • If the current thread terminates suddenly, addWorker() is called to create the worker thread
  • If the current thread does not terminate suddenly, but the current number of worker threads is less than the number of threads to be maintained in the thread pool, a worker thread is created. The number of threads to maintain is corePoolSize (depending on whether the member variable allowCoreThreadTimeOut is false) or 1.

https://blog.csdn.net/qq_32828253/article/details/112297856?spm=1001.2014.3001.5502
https://blog.csdn.net/mu_wind/article/details/113806680

Keywords: Java Multithreading

Added by The-Last-Escape on Thu, 25 Nov 2021 22:37:35 +0200