JUC learning - in depth analysis of thread pool executor (supplementary)

Next blog https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501

6. Worker class

The following is an official comment on the Worker class:

/**
  * Class Worker mainly maintains interrupt control state for
  * threads running tasks, along with other minor bookkeeping.
  * This class opportunistically extends AbstractQueuedSynchronizer
  * to simplify acquiring and releasing a lock surrounding each
  * task execution.  This protects against interrupts that are
  * intended to wake up a worker thread waiting for a task from
  * instead interrupting a task being run.  We implement a simple
  * non-reentrant mutual exclusion lock rather than use
  * ReentrantLock because we do not want worker tasks to be able to
  * reacquire the lock when they invoke pool control methods like
  * setCorePoolSize.  Additionally, to suppress interrupts until
  * the thread actually starts running tasks, we initialize lock
  * state to a negative value, and clear it upon start (in
  * runWorker).
  */

From the comments, we can see that the Worker class is an internal class, which not only implements Runnable, but also inherits AbstractQueuedSynchronizer (hereinafter referred to as AQS), so it is both an executable task and a lock effect. It can also be seen from the following code:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

The Worker class mainly maintains the interrupt control status of the running task thread and other secondary records.

This class inherits the AbstractQueuedSynchronizer class to simplify the process of obtaining and releasing locks that act on each task execution code. This prevents interrupting a running task, which will only interrupt the thread waiting to get the task from the task queue.

We implemented a simple non reentrant mutex instead of using reentrant lock, because we don't want the work task to be able to re acquire the lock when calling pool control methods such as setCorePoolSize. In addition, in order to prohibit interrupts before the thread actually starts running the task, we initialize the lock state to a negative value and clear it at startup (in runWorker).

Let's take a look at some of its codes:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker held threads
    Runnable firstTask;//The initialized task can be null
}

The Worker thread implements the Runnable interface and holds a thread, an initialized task, and a firstTask. Thread is a thread created through ThreadFactory when calling the construction method, which can be used to execute tasks; firstTask uses it to save the first task passed in. This task can be null or empty. If this value is not empty, the thread will execute this task immediately at the beginning of startup, which corresponds to the situation when the core thread is created; If this value is null, you need to create a thread to execute the tasks in the workQueue, that is, the creation of non core threads.

The model of Worker executing tasks is as follows:

The addWorker method was introduced in the last blog. You need to see it[ https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501]

Let's take a look at the complete code of the Worker class:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    // Worker held threads
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // The initialized task can be null
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
    Worker(Runnable firstTask) {
        // Set synchronization status of AQS
        // 		-State: lock state, - 1 is the initial value, 0 is the unlock state, and 1 is the lock state
        setState(-1); // inhibit interrupts until runWorker. Interrupt is prohibited before calling runWorker
        this.firstTask = firstTask;
        // The thread factory creates a thread
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  
      *
      * Delegate the main run loop to an external runWorker
      */
    public void run() {
        runWorker(this);   // runWorker() is the method of ThreadPoolExecutor
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state. 0 represents the "unlocked" state
    // The value 1 represents the locked state. 1 represents the locked state

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
	
    /**
      * Method of trying to acquire lock
      * 	- Override tryAcquire() of AQS
      */
    protected boolean tryAcquire(int unused) {
        // It is judged that the original value is 0 and reset to 1, so when the state is - 1, the lock cannot be obtained.
        // It is 0 - > 1 every time, which ensures the non reentrance of the lock
        if (compareAndSetState(0, 1)) {
            // Set exclusiveOwnerThread = current thread
            // Exclusive Mode 
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

   /**
     * Attempt to release the lock
     *     - Not state-1, but set to 0
     */
    protected boolean tryRelease(int unused) {
    	// Clear currently occupied threads
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

   /**
     * Interrupt (if running)
     * shutdownNow The loop executes on the worker thread
     * There is no need to obtain the worker lock, and it can be interrupted even when the worker is running
     */
    void interruptIfStarted() {
        Thread t;
        // If state > = 0, t!=null, and t is not interrupted
        // state==-1 when new Worker(), indicating that it cannot be interrupted
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

For the description of AQS, see another article by the blogger[ https://blog.csdn.net/qq_43605444/article/details/121705312?spm=1001.2014.3001.5501]

7. runWorker method

  • Method description: it can be said that runWorker(Worker w) is the real method for processing tasks in the thread pool. The previous execute() and addWorker() are preparing and paving the way for this method.
  • Parameter Description:
    1. Worker w: an encapsulated worker that carries many elements of a worker thread, including Runnable (pending tasks), lock (locks), and completedTasks (record the number of completed tasks in the thread pool)
  • The following is a specific code analysis:
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // new Worker() is state==-1. Here is to call the tryRelease() method of Worker class and set state to 0. In interruptIfStarted(), only state > = 0 is allowed to call interrupt
    w.unlock(); // allow interrupts
    
    // The reason why the thread exits. true is caused by the task, and false is caused by the normal exit of the thread
    boolean completedAbruptly = true;
    try {
        // If the current task and the task obtained from the task queue are empty, the cycle will be stopped
        while (task != null || (task = getTask()) != null) {
            // Locking prevents a running worker from being terminated during shutdown(), rather than dealing with concurrency
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
           /**
             * Judgment 1: ensure that only when the thread is in the stop state and the wt is not interrupted, the wt will be set with the interrupt flag
             * Condition 1: thread pool status > = STOP, that is, STOP or TERMINATED
             * Condition 2: first, judge that the thread pool status is < stop, and then check that Thread.interrupted() is true,
             * That is, the thread has been interrupted. Check again whether the thread pool status is > = stop (to eliminate the instant that the shutdown method takes effect,
             * Make the thread pool STOP or TERMINATED),
             * If either condition 1 or condition 2 is satisfied and wt is not in the interrupt state, interrupt wt, otherwise enter the next step
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // The current thread calls interrupt()
                wt.interrupt();
            try {
                // Before execution (empty method, implemented by subclass override)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // After execution (empty method, implemented by subclass override)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // Number of completed tasks + 1
                w.completedTasks++;
                // Release lock
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // Handle worker exit
        processWorkerExit(w, completedAbruptly);
    }
}

The execution process of the runWorker method is:

  1. The while loop continuously obtains the task through the getTask() method.
  2. The getTask() method fetches the task from the blocking queue.
  3. If the thread pool is stopping, ensure that the current thread is in interrupt state, otherwise ensure that the current thread is not in interrupt state.
  4. Perform the task.
  5. If the getTask result is null, jump out of the loop and execute the processWorkerExit() method to destroy the thread.

8. getTask method

  • Method description: according to the function call diagram, in the implementation of ThreadPoolExecutor class, the Runnable getTask() method serves the void runWorker(Worker w) method, and its function is to obtain the task (Runnable) in the task queue.
private Runnable getTask() {
    // Whether the latest poll timed out
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
       /**
         * Condition 1: thread pool status SHUTDOWN, STOP, TERMINATED
         * Condition 2: thread pool STOP, TERMINATED status or workQueue is empty
         * If both condition 1 and condition 2 are true, then workerCount-1 and null will be returned
         * Note: condition 2 is that the thread pool in SHUTDOWN state will not accept the task, but will still process the task
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
       /**
         * If either of the following two conditions is met, the blocking time limit is set for the worker thread currently trying to obtain the task
         *(The timeout will be destroyed (I'm not sure about this), otherwise the thread can remain active all the time
         * 1.allowCoreThreadTimeOut: Whether the current thread waits for a task with keepAliveTime as the timeout
         * 2.The current number of threads has exceeded the number of core threads
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // If both conditions are true, make the number of working threads - 1 through CAS, that is, eliminate the working threads
        // Condition 1: the number of worker threads is greater than maximumPoolSize, or (the worker thread blocking time is limited and the last pull task in the task queue timed out)
        // Condition 2: WC > 1 or task queue is empty
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // Remove the worker thread, return null if successful, and enter the next cycle if unsuccessful
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // After execution, it indicates that the task has been verified and started to be obtained
        try {
            // If the worker thread blocking time is limited, use poll(), otherwise use take()
            // poll() sets the blocking time, while take() has no time limit until the result is obtained
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            // If r is not empty, the Runnable is returned
            if (r != null)
                return r;
            // If the Runable cannot be obtained, set the timeout of the most recently obtained task to true
            timedOut = true;
        } catch (InterruptedException retry) {
            // In response to the interrupt, set the timeout status of the most recently obtained task to false before entering the next cycle
            timedOut = false;
        }
    }
}

9. processWorkerExit method

  • Method description: processworkerexit (worker W, Boolean completed abortly) is a method to execute thread exit
  • Parameter Description:
    1. Worker w: the worker thread to end.
    2. boolean completedAbruptly: whether to complete suddenly (caused by exception). If the worker thread dies due to user exception, the completedAbruptly parameter is true.
  • Let's take a look at the source code of processWorkerExit:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
   /**
     * 1.Worker-1 operation
     * 1)If completedAbruptly is true, it indicates that the worker thread has an exception, and the number of working threads will be - 1
     * 2)If completedAbruptly is false, it means that the worker thread has no task to execute, and the worker-1 operation is performed by getTask()
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2. Remove the worker thread from the thread set. This process requires locking
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Append the number of tasks completed by the worker to the number of tasks completed by the thread pool
        completedTaskCount += w.completedTasks;
        // Remove the worker from HashSet < worker >
        workers.remove(w);
    } finally {
        // Release lock
        mainLock.unlock();
    }

    // 3. Judge whether to end the thread pool according to the thread pool status
    tryTerminate();

   /**
     * 4.Need to add worker threads
     * The thread pool status is running or shutdown
     * If the current thread terminates suddenly, addWorker()
     * If the current thread does not terminate suddenly, but the number of current threads < the number of threads to maintain, addWorker()
     * Therefore, if the thread pool shutdown() is called until the workQueue is empty,
     * The thread pool will maintain corePoolSize threads, and then gradually destroy these corePoolSize threads
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

10. Thread initialization

By default, after the thread pool is created, there are no threads in the thread pool, and the threads will not be created until the task is submitted.

In practice, if you need to create threads immediately after the thread pool is created, you can do it in the following two ways:

  • prestartCoreThread(): boolean prestartCoreThread(), initializes a core thread
  • prestartAllCoreThreads(): int prestartAllCoreThreads(), initializes all core threads and returns the number of initialized threads
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);   // Note that the parameter passed in is null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))   // Note that the parameter passed in is null
        ++n;
    return n;
}

Reference article:

1,https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html [meituan technical team blog]

You can take a look at the article on thread pool written by meituan technical team, which contains the practice of thread pool in business.

Keywords: Java Back-end Multithreading thread pool JUC

Added by eabigelow on Mon, 06 Dec 2021 07:05:57 +0200