Explain the Worker of ThreadPool Executor

The Worker class not only implements Runnable, but also inherits AbstractQueued Synchronizer, so Worker itself is an executable task, and can also realize the function of locks. Worker is mainly used to manage the interruption status of threads and some indicators, such as the number of tasks completed; Worker simplifies the operation of acquiring and releasing locks when tasks are executed by inheriting AbstractQueued Synchronizer. Locking Worker prevents interruption of working threads in running tasks. Interruption is used only to wake up threads waiting to fetch tasks from workQueue.

How to prevent interruption?

Worker implements a simple non-reentrant mutex. When a worker thread performs a task, it first locks. If the main thread wants to interrupt the current worker thread, it needs to acquire the lock first, otherwise it cannot interrupt. When the worker thread finishes executing the task, it releases the lock and calls getTask to get the task from the workQueue to continue executing. As you can see, interruption occurs only when a task is waiting to be fetched from the workQueue (during the call to getTask). The worker thread receives the interrupt information and does not stop immediately. Instead, it checks whether the workQueue is empty, whether it is not empty, or continues to acquire task execution. Only when the queue is empty, it will stop. So interruption is to stop idle threads, which are those threads that get tasks blocked from the task queue (the task queue is empty). The whole process will be analyzed in detail.

Why is Worker designed to be non-reentrant?

This requires knowing which operations may interrupt the thread of work. At present, there are mainly the following:

setCorePoolSize();

setMaximumPoolSize();

setKeppAliveTime();

allowCoreThreadTimeOut();

shutdown();

tryTerminate();

If the lock can be reentry and the lock can be retrieved when calling thread pool control methods such as setCorePoolSize, it may cause interruption of the running worker thread during the invocation of thread pool control methods. jdk does not want to retrieve the lock when calling a pool control method like setCorePoolSize.

The source code of Worker is as follows:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never actually be serialized. Serial Version UID is provided primarily to shield javac warning
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Threads running in Worker objects */
        final Thread thread;
        /** The initial task to run may be null */
        Runnable firstTask;
        /** Task counters for each thread, using volatile to ensure visibility */
        volatile long completedTasks;

        /**
         * Create a Worker using the specified initial task and thread objects in ThreadFactory
         */
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegate the main run cycle to the external runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    }

Core function runWorker

runWorker flow chart:

RunWorker constantly takes tasks from the workforce list and executes them. At the same time, runWorker manages the interruption status of threads. The source code is as follows:

   final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        boolean completedAbruptly = true;//Whether "Sudden Completion" or "Abnormal Completion"
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                try {
                    beforeExecute(w.thread, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

Main steps:

1 Starting from the initial task, if firstTask is null, call getTask to fetch tasks from the queue as long as the thread pool is running. If getTask returns null, the worker may exit due to thread pool state adjustment or parameter dynamic adjustment. If an exception thrown in the external code causes the worker to exit, completedAbruptly will be true, and a new worker replacement will be created in processWorkerExit.

2 Locking the worker before executing the task prevents other operations in the thread pool from interrupting the current worker while the task is running. Call clearInterrupts ForTaskRun to manage thread interrupt status. First look at the source code:

   private void clearInterruptsForTaskRun() {
        if (runStateLessThan(ctl.get(), STOP) &&
            Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))
            Thread.currentThread().interrupt();
    }

This method call is very important. When the thread pool state is less than STOP, Thread.interrupted() is called. If the worker's interrupt state is set during getTask, it returns true. At the same time, Thread.interrupted() clears the interrupt state, that is, the call will return false again, and the thread pool state will be checked again if the state is greater than or equal. In STOP, you need to call Thread. current Thread (). interrupt () to restore the interrupt state of the thread. Therefore, the method has two functions:

<1>: When the thread pool is still running, if the worker is interrupted in other operations, the operation clears the interrupt state

<2>: After clearing the interrupt state, check the thread pool state again. If the state is greater than or equal to STOP, the interrupt state of the thread needs to be restored at this time, so that the next call to getTask will return false and the worker will exit normally.

3 Before each task is executed, call beforeExecute, and beforeExecute may throw an exception. In this case, the exception thrown will cause the task to die before the worker is executed. Without catch processing, the loop will be thrown up and out, and completedAbruptly==true.

4. When the beforeExecute completes normally, it starts running the task and collects any exceptions it throws to send to afterExecute. Here, RuntimeException, Error and any Throwables are handled separately. Since Throwables cannot be re-thrown in Runnable.run, Throwable is wrapped as Error (to Uncaug of threads). Processing in htException Handler) Throw up. Any exception thrown up will cause the thread to die, and completedAbruptly remains true.

5 When the task is completed, call afterExecute, which may also throw an exception and cause the thread to die.

Acquisition task

During the run of runWorker, getTask() is constantly called to fetch tasks from the task queue for execution.

The getTask method flow chart is as follows:

The source code is as follows:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        /**
         * Outer circulation
         * Used to check whether the thread pool status and work queue are empty
         */
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Calling shutdownNow() or shutdown() with workQueue empty returns true
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?
            /**
              * inner loop
              * Used to detect the number of worker threads and get the timeout status of task
              */
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

When the task queue is empty, getTask() performs blocking or timing wait tasks based on the current thread pool configuration, and returns null when the following conditions occur:

1 The number of worker threads exceeds maximumPoolSize

2 Thread pool has stopped

The 3-thread pool calls shutdown and the task queue is empty

The worker thread waits for a task to timeout, and allowCoreThreadTimeOut | workerCount > corePoolSize returns true.

Work thread exit

In runWorker, when getTask returns null or throws an exception, it enters the exit of the processWorkerExit processing worker thread.

The process WorkerExit method flow chart is as follows:

Look at the source code below:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
         /**
           * If it terminates abruptly, the number of worker threads is reduced by 1
           * If not abruptly terminated, 1 has been subtracted from getTask().
           */
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();//Locking thread pool
        try {
            completedTaskCount += w.completedTasks;//Summarize the number of tasks completed
            workers.remove(w);//Remove worker threads
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//Attempt to terminate thread pool

        int c = ctl.get();
        //The status is running, shutdown, that is, tryTerminate() failed to terminate the thread pool successfully
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //There are still tasks in the task queue that are not executed and at least one worker thread must be guaranteed.
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                /**
                  * allowCoreThreadTimeOut For false, you need to ensure that there are at least corePoolSize worker threads in the thread pool
                  */
                if (workerCountOf(c) >= min)
                    return; 
            }
            //Add a worker thread without firstTask
            addWorker(null, false);
        }
    }

ProceWorkerExit is invoked only in worker threads, mainly to clean up and record a dying thread, which may terminate the thread pool. The implementation of tryTerminate and addWorker is no longer detailed here.

Keywords: JDK less

Added by Idri on Fri, 05 Jul 2019 01:26:23 +0300