Concurrent programming - the specific implementation of ThreadPoolExecutor

Concurrent programming

Executor thread pool principle and source code analysis

ThreadPoolExecutor default thread pool

Creation of thread pool

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)                        
Parameter interpretation
corePoolSize

The number of core threads in the thread pool. When a task is submitted, a new thread will be created to execute the current task. Know that the number of threads is equal to corePoolSize

maximumPoolSize

The maximum number of threads allowed in the thread pool. If the current blocking queue is full and the task continues to be submitted, the thread pool will continue to create threads to execute the newly submitted task until the number of threads in the current thread pool is equal to the maximumPoolSize

keepAliveTime

The maximum idle time allowed for non core threads in the thread pool. When the number of threads in the current thread pool is greater than corePoolSize, if no new task is submitted, the non core thread will not be destroyed immediately, but will wait for keepAliveTime

unit

Unit of keepAliveTime

workQueue

It is used to save the queue of tasks waiting to be executed. The task must implement the Runable interface. The JDK provides the following blocking queues

  1. ArrayBlockingQueue: bounded blocking queue based on array structure, sorting tasks by FIFO;
  2. LinkedBlockingQueue: a blocking queue based on list structure. Tasks are sorted by FIFO. The throughput is higher than ArrayBlockingQueue
  3. SynchronousQueue: a blocking queue that does not store elements. Each insertion operation must wait until another thread calls the removal operation. No, the insertion operation is always blocked, and the throughput is usually higher than that of LinkedBlockingQueue
  4. Priorityblockingqueue: unbounded blocking queue with priority;
threadFactory

Thread factory, used to create threads. Executors.defaultThreadFactory() to create a thread. Use the default ThreadFactory to create threads
When, the newly created thread will have the same NORM_PRIORITY priority and is a non daemon thread. It also sets the name of the thread.

handler

Reject policy. When the blocking queue is full, the number of thread pools is equal to maximumPoolSize, and there are no idle threads, if you continue to submit tasks. The thread pool will start the reject policy. The thread pool provides four strategies:

  1. AbortPolicy: the default policy. Exceptions are thrown directly
  2. CallerRunsPolicy: throw the task to the thread submitting the task for execution
  3. DiscardOldestPolicy: the new task will be executed. Discard the first submitted task in the blocking queue
  4. DiscardPolicy: discard the newly submitted task directly

The above four strategies are internal classes of ThreadPoolExecutor. Of course, the RejectedExecutionHandler interface can also be implemented according to the application scenario to customize the saturation strategy, such as logging or persistent storage of tasks that cannot be processed.

Thread pool monitoring

// The total number of tasks executed and not executed by the thread pool
public long getTaskCount()

// Number of tasks completed
public long getCompletedTaskCount()

// Number of threads in the thread pool
public int getPoolSize()

//The number of threads executing tasks in the thread pool
public int getActiveCount()

Thread pool principle

Source code analysis

execute method
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl records runState and workerCount
        int c = ctl.get();
        /**
        workerCountOf Method retrieves the value of the lower 29 bits, indicating the current number of threads; If the current number of threads
       If it is less than corePoolSize, a new thread will be created into the thread pool and the task will be executed by the thread
        */
        if (workerCountOf(c) < corePoolSize) {
        /**
         addWorker The second parameter in indicates whether to limit the number of added threads according to corePoolSize or maximumPoolSize;
         If true, judge according to corePoolSize;
         If it is false, it is determined according to the maximumPoolSize
        */
            if (addWorker(command, true))
                return;
           // Failed to add. Get ctl value again
            c = ctl.get();
        }
        // Judge the current thread status. If the current thread is allowed and the task is added to the queue successfully
        if (isRunning(c) && workQueue.offer(command)) {
        // Retrieve the value of ctl
            int recheck = ctl.get();
            // Judge the current thread status again. If it is not running, remove the task added in the previous step
            if (! isRunning(recheck) && remove(command))
            // After execution, the handler uses the reject policy to process the task, and the whole method returns
                reject(command);
            else if (workerCountOf(recheck) == 0)
             /**
              Get the number of valid threads in the thread pool. If the number is 0, execute the addWork method
              The parameters passed in here represent
              1.null´╝îIndicates that a thread is created in the thread pool but not started
              2.false,Set the upper limit of the limited number of threads in the thread pool to maximumPoolSize,
                When adding a thread, judge according to the maximumPoolSize;
            */  
                addWorker(null, false);
        }
          /**
              If this is the case, there are two situations:
             1. Thread pool is no longer RUNNING;
             2. The thread pool is RUNNING, but workercount > = corepoolsize and workQueue is full.
              At this time, the addWorker method is called again, but the second parameter is passed in as false, and the upper limit of the limited number of threads in the thread pool is set to maximumPoolSize;
              Reject the task if it fails
         */
        else if (!addWorker(command, false))
            reject(command);
    }

Simply put, when executing the execute() method, if the status is always RUNNING, the execution process is as follows:

  1. If workercount < corepoolsize, create and start a thread to execute the newly submitted task;
  2. If workercount > = corepoolsize and the blocking queue in the thread pool is not full, add the task to the blocking queue;
  3. If workercount > = corepoolsize & & workercount < maximumpoolsize, and the blocking queue in the thread pool is full, create and start a thread to execute the newly submitted task;
  4. If workercount > = maximumpoolsize and the blocking queue in the thread pool is full, the task is processed according to the rejection policy. The default processing method is to throw an exception directly.

Note that addWorker(null, false);, That is, a thread is created, but no task is passed in. Because the task has been added to the workQueue, the worker will directly obtain the task from the workQueue when executing. Therefore, when workerCountOf(recheck) == 0, execute addWorker(null, false); In order to ensure that the thread pool is in the RUNNING state, there must be a thread to execute tasks.
The execution process of execute method is as follows:

addWorker method

The main work of addWorker method is to create and execute a new thread in the thread pool. The firstTask parameter is used to specify the first task executed by the new thread. The core parameter is true, which means that when adding a thread, it will judge whether the current number of active threads is less than corePoolSize. false means that before adding a new thread, you need to judge whether the current number of active threads is less than maximumPoolSize. The code is as follows

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // Get thread pool status
            int rs = runStateOf(c);

            /* If RS > = shutdown, it means that no new tasks will be received at this time
              Then judge the following three conditions. As long as one of them is not satisfied, false will be returned
              1.rs==SHUDOWN ,At this time, it indicates that it is closed and no new task submission is received. The saved tasks in the blocking queue can continue to be executed
              2.The currently submitted task is empty
              3.Blocking queue is not empty
            */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                /** wc Indicates the current number of threads
                wc >= CAPACITY,That is, the maximum value of the lower 29 bits of ctl (binary is 29 ones), and false is returned
                The core here is the second parameter of the addWorker method. If it is true, use corePoolSize for comparison
                false maximumPoolSize is used for comparison
                */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    // Try to increase the workerCount, and if successful, jump out of the first for loop
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // If the current allowed state is not equal to rs, it indicates that the state has been changed. Return to the first for loop to continue execution
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        // Create a worker object based on firstTask
            w = new Worker(firstTask);
            // Each firstTask object creates a thread
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                // RS < shutdown indicates RUNNING status
                //If is is RUNNING or rs is SHUTDOWN and firstTask is null, add a thread to the thread pool
                // Because new tasks will not be added during SHUTDOWN, but the tasks in workQueue will be executed
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            // workers is a hashSet
                        workers.add(w);
                        int s = workers.size();
                        // largestPoolSize records the maximum number of threads in the thread pool
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                // Start thread
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Each thread in the thread pool is encapsulated into a worker object. ThreadPool actually maintains a group of worker objects.
The Worker class inherits AQS and implements the Runnable interface. Note the firstTask and Thread attributes:

  1. firstTask is used to save the incoming task;
  2. Thread is a thread created through ThreadFactory when calling the construction method. Is a thread used to process tasks

When calling the construction method, you need to pass in the task through getthreadfactory() newThread (this) to create a new thread. The parameter passed in the newThread method is this, because the Worker itself inherits the Runnable interface, that is, a thread. Therefore, a Worker object will call the run method in the Worker class when it is started
Worker inherits AQS and uses AQS to realize the function of exclusive lock. Why not use reentrantlock? You can see that the tryAcquire method does not allow reentry, while reentrantlock allows reentry:

  1. Once the lock method obtains an exclusive lock, it indicates that the current thread is executing a task
  2. If the task is executing, the thread should not be interrupted;
  3. If the thread is not in the exclusive lock state, that is, idle state, it indicates that it has no task processing. At this time, the thread can be interrupted
  4. When the thread pool executes the shutdown method or the tryTerminate method, it will call the interruptldWorkers method to interrupt idle threads. The interruptldWorkers method will use the tryLocj method to determine whether the thread pool is idle
  5. The reason why it is set to non reentrant is that we do not want the task to re acquire the lock when calling a thread pool control method such as setCorePoolSize. If ReenTrantLock is used, it is reentrant, so if a thread pool control method such as setCorePoolSize is called in the task, the running thread will be interrupted

Therefore, Worker inherits from AQS and is used to determine whether a thread is idle and can be interrupted.

In addition, setState(-1) is executed in the constructor;, Set the state variable to - 1. Why?
This is because the default state in AQS is 0. If a Worker object has just been created and the task has not been executed, then
It should not be interrupted. Take a look at the tryAquire method:

protected boolean tryAcquire(int unused) {
 //cas modifies state and cannot re-enter
 if (compareAndSetState(0, 1)) {
     setExclusiveOwnerThread(Thread.currentThread());
    return true;
   }
 return false;
 }

The tryAcquire method determines whether the state is 0. Therefore, setState(-1); The state is set to - 1 to prevent the thread from being interrupted before executing the task. Because of this, in the runWorker method, the unlock method of the Worker object will be called first to set the state to 0

runWorker method

The run method in the worker class calls the runWorker method to execute the task. The code of the runWoker method is as follows:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        // Is the loop exited because of an exception
        boolean completedAbruptly = true;
        try {
        // If the task is empty, get the task through getTask
            while (task != null || (task = getTask()) != null) {
                w.lock();
             
                //If the thread pool is stopping, ensure that the current thread is in an interrupted state;
              // If not, ensure that the current thread is not in interrupt state;
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

Execution process of runWorker method

  1. The while loop continuously obtains tasks through the getTask () method;
  2. getTask() method obtains the task from the blocking queue;
  3. If the thread is stopping, ensure that the current thread is in the interrupt state, otherwise ensure that the current thread is not in the interrupt state
  4. Call task Run () executes the actual task logic
  5. If the task is null, the loop jumps out and the processWorkerExit() method is executed
  6. When the runWoker method is executed, it also means that the run method in the Worker is executed and the thread is destroyed

The beforeExecute method and afterExecute method here are empty in the ThreadPoolExecutor class and are left to be implemented by subclasses
The completedAbruptly variable indicates whether an exception occurs during task execution. The value of this variable will be judged in the processWorkerExit method

getTask method

The getTask method is used to get the task from the blocking queue. The code is as follows

 private Runnable getTask() {
       // The value of the timeOut variable indicates whether the task timed out the last time it was fetched from the blocking queue
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            /**
              If the thread pool status RS > = shutdown, that is, the non RUNNING status, make another judgment:
              1. rs >=SOP´╝îIs the thread pool stopping
              2. Is the blocking queue empty
              If all the above conditions are met, the workerCount is subtracted by 1 and null is returned
            */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /**
              timed Variable is used to determine whether timeout control is required.
              allowCoreThreadTimeOut The default is false, that is, the core thread is not allowed to timeout
              wc > corePoolSize,Indicates that the number of threads in the current thread pool is greater than the number of core threads. For these threads that exceed the number of core threads
              Thread, timeout control is required
            */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            /**
              wc > maximumPoolSize The reason is that the setMaximumPoolSize method may be executed simultaneously in this method execution phase;
              timed And timeOut if true, it means that the current operation needs timeOut control, and the last time the task was obtained from the blocking queue timed out
              Next, judge that if the number of valid threads is greater than 1 or the blocking queue is empty, try to reduce the workerCount by 1
              If subtracting 1 fails, retry is returned
              If wc == 1, it means that the current thread is the only thread in the thread pool
            */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
     
            try {
            //Judging by timed, if it is true, the timeout is controlled through the pol method of blocking the queue,
            //If the task is not obtained within the keepAliveTime time. null is returned;
            // Otherwise, through the take method, if the queue is empty at this time, the take method will block until the queue is not empty
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                    // If r == null, it indicates that the timeout has expired, and timedOut is set to true
                timedOut = true;
            } catch (InterruptedException retry) {
            // If the current thread is interrupted when obtaining the task, set timeOut to false and return to the loop to retry
                timedOut = false;
            }
        }
    }

The important part here is the second if judgment, which aims to control the effective number of threads in the thread pool. As can be seen from the above analysis, when executing the execute method, if the number of threads in the current thread pool exceeds corePoolSize and is less than maximumPoolSize, and the workQueue is full, you can increase the number of working threads. However, if no task is obtained during timeout, that is, when timedOut is true, it indicates that the workQueue is empty, This means that there are not so many threads in the current thread pool to perform tasks. You can destroy more threads than corePoolSize and keep the number of threads in corePoolSize.
When will it be destroyed? Of course, after the runWorker method is executed, that is, after the run method in the Worker is executed, it is automatically recycled by the JVM. When the getTask method returns null, the while loop will jump out of the runWorker method, and then the processWorkerExit method will be executed

processWokerExit method
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 // If completedAbruptly is true, it indicates that there is an exception in the execution of the thread, and the workerCount needs to be reduced by 1
 // If there is no exception, it indicates that the getTask() method has reduced the workerCount by 1. There is no need to reduce it here
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Count the number of tasks completed
            completedTaskCount += w.completedTasks;
            // Removing from workers means that a worker thread is removed from the thread pool
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
       // Judge whether to end the thread pool according to the thread pool status
        tryTerminate();

        int c = ctl.get();
        /*
            When the thread pool is in the RUNNING or SHUTDOWN state, if the worker ends abnormally, it will directly add the worker;
            If allowCoreThreadTimeOut=true and there are tasks in the waiting queue, at least one worker shall be reserved;
            If allowCoreThreadTimeOut=false, workerCount is not less than corePoolSize
 */
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

So far, after the processWorkerExit is executed, the working thread is destroyed. The above is the life cycle of the whole working thread. Starting from the execute method, the Worker uses ThreadFactory to create a new working thread. runWorker obtains the task through getTask, and then executes the task. If getTask returns null, enter the processWorkerExit method, and the whole thread ends, as shown in the figure:

In this section, we explained how to create ThreadPoolExecutor and understand the working principle of ThreadPoolExecutor by interpreting the source code. In the next section, we will explain another implementation class of Executor, ScheduledThreadPoolExecutor timed thread pool

Keywords: Java Back-end Concurrent Programming thread pool

Added by syed on Fri, 24 Dec 2021 09:16:20 +0200