ThreadPoolExecutor Source Analysis-execute() Method

Preface

The previous two articles paved the way for analyzing thread pool sources

Bit Operations in the Java ThreadPoolExecutor ThreadPool

ThreadPoolExecutor Source Analysis Foundation - How ThreadPool works

Look back at what you've said before (you need to know it beforehand):

  • Several states of thread pool RUNNING, SHUTDOWN, etc.
  • Switching between thread pool States
  • Core parameters of thread pool such as corePoolSize and maximumPoolSize
  • How Thread Pool Works Core Threads - > Blocking Queues - > Non-Core Threads - > Denial Policy
  • How to determine the state of the thread pool and the number of threads: High or low from the AtomicInteger ctl variable
  • Role of the Worker object

With the above knowledge, it is much easier to analyze the specific source code.

text

Body Outline

  • workers property of ThreadPoolExecutor
  • execute method
  • addWorker method
  • runWorker method
  • ProceWorkerExit method

ThreadPoolExecutor#workers property

There is a workers collection in the thread pool that records all the worker threads, and only threads that get the mainLock lock can access it

/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();


// This property records the length of the workers collection and is accessible only by threads that need to get a lock
private int largestPoolSize;

ThreadPoolExecutor#execute(Runnble command) method

Last article ThreadPoolExecutor Source Analysis Foundation - How ThreadPool works Roughly analyze the execution process of the execute method, and mention it again here.

  • ctl.get() mentioned earlier that a three-digit high represents the state of the thread pool, followed by a number of digits indicating the number of threads
  • workerCountOf is the number of threads in the thread pool and gets the last few bits of ctl.get()
  • AddiWorker is temporarily understood as creating a thread to perform specific tasks, which are described in more detail below
  • isRunning(c) determines if the thread pool is running
  • reject denies the newly added task
public void execute(Runnable command) {
	// Parameter Check
    if (command == null)
        throw new NullPointerException();
        
    // The value of c can tell the state of the thread pool and the number of threads in the thread pool
    int c = ctl.get();
    // 1. If the number of threads in the thread pool is less than the number of core threads
    if (workerCountOf(c) < corePoolSize) {
        // Add a core thread command to represent a specific task, and true to represent a core thread
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. If the thread pool is in the RUNNING state (only in this state can new tasks be accepted)
    // And if the number of thread pools is greater than the number of core threads, the task is added to the blocking queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. If the queue is full, create a non-core thread (core==false)
    else if (!addWorker(command, false))
    		// If the creation fails, the rejection policy is executed
        reject(command);
}

To summarize:

  1. If the number of threads in the thread pool is less than the number of core threads, add a core thread command
  2. If the thread pool is in the RUNNING state and the number of thread pools is greater than the number of core threads, add the task to the blocking queue
  3. If the queue is full, create a non-core thread
  4. Execute rejection policy if non-core thread creation fails

ThreadPoolExecutor#addWorker(Runnable, boolean)

For the time being, the effect of this method can be understood as:

Create and start a thread

The method has two parameters:

  • Runnable firstTask If the parameter is not empty, threads created through the addWorker method will execute firstTask first

  • boolean core The value of core is related to an if statement

    If core == true, compare the number of thread pools to the value of corePoolSize. Conversely, it is compared with the value of maximumPoolSize.See Source Code Analysis Specifically

The method in execute above already shows three ways to call addWorker()

  • addWorker(firstTask, true)

  • addWorker(firstTask, false)

  • addWorker(null, false)

The addWorker method is divided into two parts as follows:

Part One

This part is a double-layer dead loop

During the dead-loop process, get the state of the thread pool to determine if the thread pool can create new threads or perform tasks in the blocked queue.

Final results of this section:

Either return false or go to the second part

There are several cases of returning false:

  • Thread pool status > SHUTDOWN, meaning that new tasks cannot be accepted or queued tasks cannot be processed
  • Thread pool state == SHUTDOWN, indicating that the thread pool can handle tasks in the queue, but the queue is empty
  • Or the thread pool cannot accept new tasks, but firstTask is not empty
Part Two

If you enter the second part successfully, it means that the thread pool can create new threads

The purpose of this section is to create a worker object with worker = new Worker(), then create and open threads to execute the run method of the worker object.

private boolean addWorker(Runnable firstTask, boolean core) {
    // A tag, like goto, knows what it means by combining break and continue below
    retry:
    // Part One
    for (;;) {
        int c = ctl.get();
        // Get thread pool state
        int rs = runStateOf(c);
        
        // Returning false indicates that the task cannot be received or processed
        // If you do not want to return false, that is, if the statement is not valid, you need to satisfy one of the following two conditions:
        // 1. RS < SHUTDOWN, which means rs == RUNNING (as we mentioned earlier, RUNNING means that new tasks are acceptable)
        // 2. rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
        // 	2.1RS == SHUTDOWN indicates that no new tasks will be received, but that tasks in the blocked queue will be executed
        // 	2.2FirstTask == nul indicates that the task in the blocking queue is to be executed
        // 	2.3Blocking queue cannot be empty
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && firstTask == null &&  ! workQueue.isEmpty())
        )
               
            return false;
				
			
        for (;;) {
        		// Gets the number of threads in the thread pool
            int wc = workerCountOf(c);
            // Returns false if the number of threads is greater than the maximum capacity (CAPACITY)
            // If core == true, determine if the number of threads is greater than the number of core threads
            // If core == false, determine if the number of threads is greater than the maximum number of threads
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
                
            // With the CAS operation, the number of threads in the thread pool is + 1, then jumps out of the two-tier loop and enters the second part
            if (compareAndIncrementWorkerCount(c))
                break retry;
                
            // CAS operation failed, check the state of thread pool
            c = ctl.get(); 
            if (runStateOf(c) != rs)
                // Continue execution from outermost loop
                continue retry;
            
        }
    }

	// Part Two
	// Going here means that the thread pool can create new threads or perform tasks in the queue
	// Two markers, see name and sense
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    
    try {
    	// The following two lines of code The previous article described the Worker object in more detail
    	// Packaging requires task execution (note that firstTask may be null)
        w = new Worker(firstTask);
        
        // Get the thread hosting the task
        final Thread t = w.thread;
        
        if (t != null) {
        	// Uplock
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Check the state of the thread pool again
                int rs = runStateOf(ctl.get());
				// If the thread pool state is RUNNING
				// perhaps
				// If the thread pool state is SHUTDOWN and firstTask == null,
                if (rs < SHUTDOWN ||  (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                        
                    // Add to workers collection
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // Added Successfully
                    workerAdded = true;
                }
            } finally {
                // Release lock
                mainLock.unlock();
            }
            
            // Start Thread
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
    	// If the thread fails to start, delete the Worker object just added to the workers collection
        if (! workerStarted)
            addWorkerFailed(w);
    }
    
    return workerStarted;
}

Worker#run

Calling the runWorker method in the run method

public void run() {
    runWorker(this);   this Represent specific worker object
}

ThreadPoolExecutor#runWorker

In the second part of the addWorker method source code, the worker object is first created through the new Worker(firstTask), at which point:

  • The parameter firstTask passed in!= null indicates that the first task executed after the thread starts is firstTask
  • When the parameter firstTask == null is passed in, it is also mentioned above that firstTask == null simply means that the thread pool is required to perform tasks in the blocked queue.

Then get the threads of the worker object to start the thread and execute the runWorker method

Brief summary:

The runWorker method first executes the firstTask passed in when addWorker is executed, and if firstTask == null, executes the task in the blocked queue

runWorker method execution ends when both are empty.

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // Notice that this step reassigns firstTask to task 
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
    		
    		// 1. task!=null, execute the task directly
    
    		// 2. task == null, which means the first Task passed in when the addWorker method is called == null
    		// 			So get the task in the queue blocked by the getTask method
        while (task != null || (task = getTask()) != null) {
            w.lock();
          
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())   
                wt.interrupt();
                
            try {
                // Do some preparatory work
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // implement
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
    	// If there are no tasks in the blocking queue, the threads'tasks are processed and some aftermath is done
        processWorkerExit(w, completedAbruptly);
    }
}

Worker#processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Count the number of tasks completed by the thread pool
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	// Trying to set the thread pool state to TERMINATED,
    tryTerminate();
	
	// If the number of threads in the thread pool is less than the number of core threads, add a thread
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

summary

Before analyzing the ThreadPoolExecutor source code, you need to understand the following:

  • The number and status of thread pools are represented by Integer.SIZE - 3 binary bits, where the top 3 bits represent the state of the thread pool and the remaining low bits represent the number of threads

  • Getting the number and status of thread pools is calculated based on the binary bits above

    For example:

	private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;

    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
  • Several states of the thread pool and how to switch between them

  • Role of the Thread Pool Worker object

    	 Worker w = new Worker(firstTask);
    	 Thread t = w.thread;
    	 t.start();
    	 When a thread runs, it executes the Worker#run method
    	The run method then calls ThreadPoolExecutor#runWorker(); the method
    	If firstTask!= null, runWorker will execute the run method of firstTask
    	If firstTask == null, runWorker will execute the task in the blocked queue
    

Keywords: Programming less Java

Added by Zack on Thu, 14 May 2020 19:13:18 +0300