This article takes you through the use and source code of Java thread pool

Problems encountered in using a single thread (new Thread().start):

  • Frequent creation and destruction of threads
  • Thread execution data is large and high-frequency, and CPU context switching is frequent, resulting in a waste of CPU resources

The above problems make us wonder how to reuse threads and introduce pool technology
For example: connection pool, object pool, thread pool

Pool technology

The core is resource reuse, so that idle threads can be reused
What conditions do thread pools need to meet before they can be brought to reuse?

  • Create a series of threads during initialization
  • Let the thread not end, through the blocking queue, similar to the producer consumer model + Plus blocking queue (thread safe)
  • Wake up a thread to execute the run method when necessary
  • When the queue is full, add consumer threads to dynamically expand capacity
  • If the expansion cannot be implemented, a rejection policy shall also be provided
    • Report an error and throw the exception directly up. AbortPolicy: discards the task and throws RejectedExecutionException, which is the default rejection policy of the thread pool
    • Discard DiscardPolicy directly
    • Directly call the run method (used by newCachedThreadPool) CallerRunsPolicy
    • Remove the head task (the longest waiting task), and then add the new task to the DiscardOldestPolicy
    • It is saved and not recycled. It needs to be customized

Thread pool in Java

1. Detailed explanation of parameters

public ThreadPoolExecutor(int corePoolSize,		//Number of core threads
                          int maximumPoolSize,	//Maximum number of threads
                          long keepAliveTime,	//Worker thread (non core thread) lifetime
                          TimeUnit unit,		//Company
                          BlockingQueue<Runnable> workQueue,	//Blocking queue for storing tasks
                          ThreadFactory threadFactory,		//Initialize thread factory
                          RejectedExecutionHandler handler) {}	//Reject policy
  • The thread survival time is approximately fixed, and the run method ends when it is executed. The worker thread recycling method is to block the fetching task from the queue. It ends after timeout and can be recycled

2. Fixed thread pool newFixedThreadPool

  • Number of core threads = maximum number of threads
  • The blocking queue uses LinkedBlockingQueue
  • Reject the policy and use the default AbortPolicy to report an error
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

3. Single thread pool newSingleThreadExecutor

  • Just one thread
  • The blocking queue uses LinkedBlockingQueue
  • Reject the policy and use the default AbortPolicy to report an error
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

4. Cache thread pool newCachedThreadPool

  • How many thread pools to execute
  • Synchronous queue yes, it is a blocking queue without structure and storage. It will be blocked when there is no consumer
  • The default AbortPolicy of the reject policy reports an error
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

5. Timed thread pool newScheduledThreadPool

  • Implemented by ScheduledThreadPoolExecutor, Max integer MAX_ VALUE
  • Blocking queue DelayedWorkQueue delay queue
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

Using thread pools in Java

Taking the use of fixed thread pool as an example, others are similar.

ExecutorService threadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
    threadPool.execute(()->{
        System.out.println(Thread.currentThread().getName()+",implement");
    });
}

//result
pool-1-thread-2,implement
pool-1-thread-2,implement
pool-1-thread-3,implement
pool-1-thread-1,implement

If the existing thread pool of Java cannot match the current business, the most reasonable thread pool should be customized

Thread pool source code analysis

  • Construction method
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    //Blocking queue defined during initialization
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • AtomicInteger ctl focuses on this member variable. It records the lock ID and the number of threads in the current thread pool according to this ID. it is a thread safe shaping variable
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Number of active threads in the thread pool (lower 29 bits) - > 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//Maximum number of worker threads - > 536870911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//Various status of thread pool - > - 536870912
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

//Currently runnable thread pit
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//Number of threads currently working
private static int workerCountOf(int c)  { return c & CAPACITY; }

//The unsafe class and its compareAndSwapInt method are used to ensure thread safety
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

The thread pool adopts the atomic Integer identifier, and the 32-bit Integer is divided into two parts. The binary bits (32 bits) of AtomicInteger of atomic class are divided into the upper 3 bits (indicating the running state of the thread pool) and the lower 29 bits (indicating the number of working threads)

1.execute executes a task

//It is carried out in three steps
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //Get current ctl
    int c = ctl.get();
	//If the number of working threads is less than the core thread, start a new thread directly
	//From here, we know that the initialization core threads are delayed initialization
    if (workerCountOf(c) < corePoolSize) {
        //Open a new thread and give it the current Runnable to execute
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
	//The second step is to judge the running state of the thread pool and add the task to the blocking queue
	//If the offer is added successfully, true will be returned; otherwise, false will be returned
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //A task that has died invokes the reject policy
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //Recheck the working thread. If not, add a thread that is not considered
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
	//If the task cannot be queued, try adding a new task worker thread
    else if (!addWorker(command, false))
        reject(command);
}

2. Add a task with addworker()

//Save the set of the core thread Worker
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        //Get thread pool status
        int rs = runStateOf(c);
        //A series of inspection and judgment
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
        	//Gets the number of threads in the work
            int wc = workerCountOf(c);
            //If the number of existing threads is greater than the maximum login capacity or the core is easy, false will be returned directly 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //Add a thread with cas
            //return ctl.compareAndSet(expect, expect + 1);
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //Tasks wrap the tasks to be executed with Worker 
        //Two variables in Worker, firstTask thread(new thread)
        w = new Worker(firstTask);
        //new thread in worker
        final Thread t = w.thread;
        if (t != null) {
            //Lock
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
               	//Check the status of the current thread pool again
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //Throw an exception if the t thread has executed
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //Save Worker to set
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //If a task is added successfully, start the worker thread immediately
            if (workerAdded) {
                t.start();	//Next, we come to the run method in the Worker
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3. The worker class starts the worker thread

//It implements thread Runnable and inherits thread concurrency AQS
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
	//The constructor stores the thread that needs to be executed and creates a new thread at the same time
	Worker(Runnable firstTask) {
	    setState(-1); // inhibit interrupts until runWorker
	    this.firstTask = firstTask;
	    this.thread = getThreadFactory().newThread(this);
	}
}

When there are tasks on it, it will be packaged into a worker with two variables.

  • firstTask: save the task to be executed
  • Thread: it can be seen from the above that a thread is directly new

Then, after adding the member variable workers successfully, the new thread will be started directly. Let's directly find the run method in the worker class

//Worker#run
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    //Current thread
    Thread wt = Thread.currentThread();
    //Get and execute task addWorker add
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // Release the lock first to allow interrupts
    boolean completedAbruptly = true;
    try {
        //Get the task getTask() and analyze it below
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //If the thread pool is stopped or the thread is interrupted, the current thread sends an interrupt ID
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //Some monitoring can be customized before thread execution
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //Call the run method that executes the task
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //Some monitoring can be customized after thread execution
                    afterExecute(task, thrown);
                }
            } finally {
                //After the task is executed, clear the record of the execution times of the current thread and release the lock
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //If an exception occurs during execution, remove the worker thread
        processWorkerExit(w, completedAbruptly);
    }
}

4. getTask() in the worker class gets the task to be executed

Don't think about it. There must be a blockage

  • workQueue
//The blocking queue defined during initialization is preceded by workqueue Offer (command) add task
private final BlockingQueue<Runnable> workQueue;
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
	//spin
    for (;;) {
        int c = ctl.get();
        //Judging thread pool status
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //Does the thread pool stop checking whether the queue is empty
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
		//Gets the number of worker threads (if there is no task, the core thread will block)
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        //Whether to set the survival time or if the current number of worker threads is greater than the number of cores, you need to set the survival time
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //(the worker thread is larger than the maximum thread 𞓜 there is a lifetime & & timeout) 
        //&&(the worker thread is greater than 1 𞓜 the queue is empty) the worker thread is running the last task
        //Then directly subtract a worker thread
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //return ctl.compareAndSet(expect, expect - 1)
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //It is extracted from the blocking queue according to whether there is a survival time | blocking timeout
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            //If it is still null after timeout, it indicates that the queue is empty. The current thread declares that the cycle ends and jumps out of the loop and returns null
            //Without the identity of the core thread, whether it can become a core thread depends on who executes it last
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

5.processWorkerExit() worker thread execution completion or exception handling

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //If it is an unexpected exit, workerCount--
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
	//Lock to count the number of tasks executed and remove the Worker in the WorkerSet
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	//Record the number of tasks executed, and then remove them from the worker set
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	//Determine the status of thread pool and whether to terminate threads according to the status
    tryTerminate();

    int c = ctl.get();
    //If the thread pool state is a non stop state less than stop
    if (runStateLessThan(c, STOP)) {
        //If there is no unexpected exit, you need to destroy the thread according to the survival flag of the number of running core threads
        if (!completedAbruptly) {
            //If KeepAliveTime is set, the core thread will also be destroyed
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
          	//If the core thread keeps at least 0 and the queue is not empty 
          	//If the thread pool is not stopped, a worker thread is reserved
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
           	//If the current total number of Worker threads is greater than the required number of workers, it is returned directly
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //Create a new worker core thread
        addWorker(null, false);
    }
}

Source code summary

When executing a task, if the current working thread is smaller than the core thread, directly initialize a new core thread (worker) and store it in the member variable HashSet workers. If the addition is successful, the thread in the worker will be started.

If the worker thread is larger than the core thread, judge the running status of the thread pool, add it to the blocking queue with offer () (adding successfully returns true, otherwise false), and check the running status of the thread pool. If it is not running, remove the task and call the reject policy. If it is normal but the worker thread is 0, create a new worker thread without task

If the offer() fails to be added, the worker thread will be added directly. If the addition fails, the reject policy will be called

The worker implements the thread Rannable and inherits the AQS. It circulates in the run method to block the tasks to be executed in the queue. getTask() obtains the tasks to be executed, and then directly executes them in the run method. Here, the mutex in AQS is used

getTask() gets the task, or determines whether to block or block with timeout according to whether to set the survival time or whether the number of current working threads is greater than the number of core threads. If it is a timeout blocking, it means that the queue is null, and then the working thread can be destroyed

If KeepAliveTime is set, the core thread will also be destroyed. However, if the queue is not empty but the number of threads is 0, a new worker will be created

That is the whole content of this chapter.

Previous: Mysterious veil of Java local thread variable ThreadLocal
Next: Use of thread pool worker ForkJoin

The emperor's thirty years have seen no success in calligraphy and sword

Keywords: Java Multithreading source code

Added by manny on Sat, 11 Dec 2021 06:34:10 +0200