Multithreading and thread pool

catalogue

thread

ThreadPool

7 parameters of thread pool

Blocking queue of thread pool

Rejection policy of thread pool (RejectedExecutionHandler)

Monitor the running status of thread pool

Partial source code analysis

thread

Thread is the smallest unit for scheduling CPU resources. The thread model is divided into KLT model and ULT model, the KLT model used by the JVM, and the java thread maintains a 1:1 mapping relationship with the OS thread. That is to say, there will be a corresponding thread in the operating system, and the java thread has a variety of declared states

NEW: NEW

RUNNABLE: run

Bolded: blocked

WAITING: WAITING

TIMED_WAITING: timeout waiting

TERMINATED: TERMINATED

The status switching is shown in the following figure:


 

Inheritance relationship of related classes:

ThreadPool

Thread pool, as its name implies, is a thread cache. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. Therefore, thread pool is provided in Java to uniformly allocate, tune and monitor threads

In web development, the server needs to accept and process requests, so a thread will be allocated for each request. If a new thread is created for each request, it is very easy to implement, but there is a problem:

If there are a lot of concurrent requests, but the execution time of each thread is very short, threads will be created and destroyed frequently, which will greatly reduce the efficiency of the system. It may appear that the server spends more time and resources on creating new threads and destroying threads for each request than on processing actual user requests

So is there a way to finish one task without being destroyed, but can continue to perform other tasks???

This is the purpose of thread pool. Thread pool provides a solution to the overhead and insufficient resources of thread life cycle. By reusing threads for multiple tasks, the overhead created by threads is allocated to multiple tasks

When to use thread pool???

  • The processing time of a single task is relatively short
  • The number of tasks to be handled is large

Advantages:

  • 1. Thread reuse to reduce the consumption caused by thread creation and destruction (reduce resource consumption)
  • 2. Improve the response speed. When the task arrives, it can be executed without waiting for the creation of the thread
  • 3. Improve thread manageability and control the maximum number of concurrent threads Thread is a scarce resource. If it is created without restriction, it will not only consume system resources, but also reduce the stability of the system. Using thread pool can carry out unified allocation, tuning and monitoring

Implementation method of thread:

​//The class that implements the Runnable interface will be executed by Thread, representing a basic task
@FunctionalInterface
public interface Runnable {
    //The run method is what it actually executes
    public abstract void run();
}

//Callable also represents a basic task. The difference from the Runnable interface is that it receives generics and returns / / content after executing the task
@FunctionalInterface
public interface Callable<V> {
   
    V call() throws Exception;
}



​

Description of inheritance (Implementation) structure:

As can be seen from the figure, there is an important sub interface ExecutorService under Executor, which defines the specific behavior of thread pool

  • Execute (Runnable command): execute tasks of Runnable type
  • submit(task): it can be used to submit a Callable or Runnable task and return a Future object representing the task
  • shutdown(): close the thread pool after completing the submitted task and no longer receive new tasks
  • shutdown(): close the thread pool after stopping all tasks
  • isTerminated(): test whether all tasks have been executed
  • isShutdown(): test whether the ExecutorService has been shut down

Executors is a tool class that provides common methods for configuring thread pools

  • The core implementation class of thread pool is ThreadPoolExecutor
  1. Task: that is, work unit, including the interface to be implemented by the executed task: Runnable interface or Callable interface;
  2. Task execution: that is, the execution mechanism that assigns tasks to multiple threads, including the Executor interface and the ExecutorService interface inherited from the Executor interface.
  3. The results of asynchronous calculation include the Future interface and the FutureTask class that implements the Future interface.
  • There are two ready-made methods for thread pool execution: submit() and execute()
    • The Callable object has a return value, so the submit () method is used;
    • Runnable has no return value. You can use execute() method and submit() method When using the submit method: use the submit (Runnable task) or submit(Runnable task, Object result) method, use the null returned by the submit (Runnable task) method, or use the submit(Runnable task, Object result) method to return the result.
//Accept a Runnable interface (AbstractExecutorService)
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
//Accept a Runnable interface (AbstractExecutorService)
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
//Accept a Callable interface (AbstractExecutorService)
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

//Accept a Runnable interface (ThreadPoolExecutor)
public void execute(Runnable command) {
    
}

Key properties of thread pool

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl is a field that controls the running state of the thread pool and the number of effective threads in the thread. It contains two parts of information: the running state of the thread pool (runState) and the number of effective threads in the thread pool (workCount). Here you can see that integer type is used to save; The upper 3 bits save runState and the lower 29 bits save workCount and count_ Bits is 29, and capability is 1. Shift 29 bits to the left minus 1 (29 ones). This constant represents the upper limit of workCount, which is about 500 million

ctl related methods

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • runStateOf: get running state
  • workerCountOf: get the number of active threads
  • ctlOf: get the values of running status and number of active threads

There are five states of thread pool:

private static final int RUNNING    = -1 << COUNT_BITS;   //The upper 3 bits are 111
private static final int SHUTDOWN   =  0 << COUNT_BITS;   //The upper 3 digits are 000
private static final int STOP       =  1 << COUNT_BITS;   //The upper 3 digits are 001
private static final int TIDYING    =  2 << COUNT_BITS;   //The upper 3 digits are 010
private static final int TERMINATED =  3 << COUNT_BITS;   //The upper 3 digits are 011

Graphical interpretation:

 

  • RUNNING
    • Status description: when the thread pool is in RUNNING status, it can receive new tasks and process added tasks
    • State switching: the initialization state of the thread pool is RUNNING. In other words, once the thread pool is created, it is in the RUNNING state, and the number of tasks in the thread pool is 0
  • SHUTDOWN
    • Status description: when the thread pool is in shutdown status, it does not accept new tasks, but can handle added tasks
    • State switching: when calling the shutdown() interface of the thread pool, the thread pool has running - > shutdown
  • STOP 
    • Status description: when the thread pool is in the STOP state, it will not accept new tasks, will not process added tasks, and will interrupt the tasks being processed
    • State switching: when calling the shutdown now() interface of the thread pool, the thread pool has running or shutdown - > stop
  • TIDYING
    • Status description: when all tasks have been terminated and the number of tasks recorded by ctl is 0, the thread pool will change to TIDYING status and execute the hook function terminated() The terminated() function is empty in the ThreadPoolExecutor class. If the user wants to handle the process pool when it becomes TIDYING, it can be implemented by overloading the terminated() function
    • State switching: when the thread is in SHUTDOWN state, the blocking queue is empty and the tasks executed in the thread pool are also empty, it will be stopped - > tidying. When the thread is in STOP state and the tasks executed in the thread pool are empty, it will be stopped - > tidying
  • TERMINATED
    • Status description: when the thread pool is completely TERMINATED, it becomes TERMINATED
    • State switching: when the thread pool is in TIDYING state, after executing terminated(), it will be TERMINATED by TIDYING - > TERMINATED. The conditions for entering TERMINATED are as follows:
      • Thread pool is not in RUNNING state
      • Thread pool state is not TIDYING state or TERMINATED state
      • If the thread pool state is SHUTDOWN and workQueue is empty
      • workQueue is 0
      • Set TERMINATED status successfully

Execution diagram of thread pool:

  • After creating the thread pool, start waiting for requests
  • When the execute() method is called to submit the task to the thread pool, the thread pool has just been created and there is no thread in it; The threads created first are called core threads
  • The execute() method continuously submits the task, and the thread pool will make a judgment. If the number of running threads is less than the number of core threads, create a core thread to execute the task immediately
  • Note: if the first core thread created by submitting the first task has completed the task, the second task submitted will not use the first core thread, but will create the second core thread. It will not reuse the created core thread until the set number of core threads is reached
  • If the number of running threads is greater than or equal to the number of core threads, put this task into the queue. When the core thread completes other tasks, go to the queue to get the task
  • If the queue is full and the number of running threads is less than the maximum number of threads, create a non core thread to execute the task
  • If the queue is full and the number of running threads is equal to the maximum number of threads (non core threads + maximum threads), start the saturation aggregation strategy
  • When a thread completes a task, it will take a task from the queue to execute,
  • When the number of threads in the thread pool exceeds corePoolSize, when the idle time reaches keepAliveTime, the redundant threads will be destroyed until only corePoolSize threads are left

Illustrate the above process:

//Thread executor (accept one interface)
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
     /*
     *There are three steps:
     * 1. If there are fewer threads running than corePoolSize, try using the given command as the first thread,
     *    Start a new thread task. The call to addWorker will automatically check the running status and * workerCount, so as to prevent possible increase and return false when it should not.
     * 2. If the task can be queued successfully, we still need to check again whether we should add a thread
     *   (Because the pool has been closed since the last check) or since entering this method. So * we recheck the status. If necessary, the rollback queue has stopped in the following cases. If there is no thread, * start a new thread.
     * 3. If the task cannot be queued, try adding a new task line. If we fail, we know we've been
     *    Closed or saturated, so reject this task.
     */
    int c = ctl.get();
    //Gets the number of threads in the current thread pool
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //Judge the life state of the current thread pool; Meet the previous condition and put it in the queue
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //If the queue is full, a non core thread is created
    else if (!addWorker(command, false))
    //If the non core thread is full, execute the reject policy
        reject(command);
}

Three defined thread pools

  • FixedThreadPool

FixedThreadPool: a thread pool with a fixed number of threads. The maximum number of threads in the thread pool maxmumPoolSize is equal to the number of core thread pools corePoolSize. LinkedBlockingQueue is used as a blocking queue. Therefore, when a task cannot be executed immediately, it will be added to the blocking queue. It is suitable for meeting the needs of resource management, The scenario of appropriately limiting the number of current threads is suitable for servers with heavy load

  • SingleThreadExecutor

Singlethreadexecution: it is a thread pool with only one thread. It is often used in scenarios where threads need to be executed sequentially, and only one task can be executed at any time, rather than multiple threads executing at the same time.
Because the blocking queue uses LinkedBlockingQueue, it is the same as FixedThreadPool,
maximumPoolSize, keepAliveTime are invalid.

  • CachedThreadPool

CachedThreadPool: the synchronized queue is used as the blocking queue. The synchronized queue is a blocking queue that does not store elements to achieve "one-to-one delivery". That is, each time a task is put into the queue, it must wait for a thread to take the task. Otherwise, the task will be blocked all the time, If a thread wants to take a task, it must always block until it knows that a task is put into the blocking queue The maximumPoolSize of CachedThreadPool is integer MUX_ Value, so CachedThreadPool is an unbounded thread pool, that is, threads can be created continuously. The corePoolSize is 0, so the task is submitted directly through the blocking queue in the CachedThreadPool.
Many short-term asynchronous tasks are executed. The thread pool creates new threads as needed, but the previously built threads will be reused and can be expanded when they are available

Note: because the blocking queue is an infinite two-way queue, if the shutDownNow() or shutDown() method is not called, the thread pool will not reject the task. If the threads in the thread pool are occupied all the time, the FixedThreadPool will not reject the task. maximumPoolSize and keepAliveTime are invalid. Because the blocking queue is infinite, the number of threads must be less than or equal to corePoolSize, so keepAliveTime is invalid

7 parameters of thread pool

/**
  * Thread pool core parameters
  * @param corePoolSize:Number of core threads
  * @param maximumPoolSize:Maximum number of threads
  * @param keepAliveTime:The lifetime of a thread after it is idle
  * @param unit:Time unit
  * @param workQueue:Blocking queue for storing tasks
  * @param threadFactory:Thread factory class
  * @param handler:Saturation strategy when the maximum number of threads and blocking queue are full
  */
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • keepAliveTime: the survival time of redundant idle threads. When the number of threads in the thread pool exceeds corePoolSize, when the idle time reaches keepAliveTime, the redundant threads will be destroyed until only corePoolSize threads are left
  • threadFactory: refers to the thread factory that generates the working threads in the thread pool. It is used to create threads. It is generally the default
  • ThreadFactory is a thread factory. Used to create threads. Why use a thread factory here? In fact, it is to uniformly set some parameters when creating threads, such as whether to guard threads. Thread some characteristics, such as priority. Threads created through this ThreadFactory can guarantee the same characteristics. First of all, it is an interface class, and there is only one method, that is, to create a thread.

Which thread pool is used?? How to reasonably set parameters:

In work, which of the three methods of creating thread pool is more single / fixed / variable??


Note: thread pools are not allowed to be created using ecutors, but through ThreadPoolExecutor

FixedThreadPool and SingleThreadPool: the allowed request queue length is integer max+value. It may cause a large number of requests, resulting in oom

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
Number of threads allowed to be created for pool.integer and pool.thread max_ Balloon, a large number of threads may be created, resulting in oom
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

Custom thread pool:
How should the maximum number of threads be determined
CPU intensive: how many cores does the computer have, that is, how many + 1 to obtain the number of CPU cores runtime getRuntime. The return value of availableprocessors is of type int (dynamically obtaining the number of cores of the computer)
IO intensive: 2*CPU cores + 1
Number of CPU cores * [1+ (I/O time consumption / CPU time consumption)]

Blocking queue of thread pool

All known implementation classes

  • ArrayBlockingQueue: bounded blocking queue composed of array structure (with capacity)
  • LinkedBlockingQueue: bounded blocking queue composed of linked lists (default size integer.max_value)
  • Synchronousqueue (synchronous handover queue): this queue does not store elements. Each operation of inserting put() must wait for another thread to call the operation of deleting take(), which is similar to the one-to-one buffering of consumers
  • DelayQueue: delay unbounded blocking queue implemented by priority queue
  • LinkedTransferQueue: an unbounded blocking queue composed of linked lists
  • PriorityBlockingQueue: an unbounded blocking queue that supports prioritization
  • LinkedBlockingDeque: a bidirectional blocking queue composed of linked lists

The above blocking queue will have the following methods

Method typeThrow exceptionIf there is a return value, no exception will be thrownBlocking wait (dead wait)Timeout wait
add to

add(e)

Offer(e)Put(e)Offer(e,time,unit)
removeremovePoll()TakePoll(time,unit)
inspectelement()Peek(not available)(not available)
typeexplain
Throw exceptionWhen the blocking queue is full, add ing elements to the queue will throw IllegalStateException:Queue full. When the blocking queue is empty, NoSuchElementException will be thrown when removing elements from the queue
Special valueWhen inserting a method, success is true and failure is false; When the method is removed, the element of the queue is returned successfully. If there is no element in the queue, null is returned
Always blockedWhen the blocking queue is full, continue to put elements into the queue. The queue will block the production thread until it knows the put thread or responds to an interrupt to exit. When the blocking queue is empty, the consumer view take s elements from the queue. The queue will block the consumer thread until the queue is available
Timeout exitWhen the blocking queue is full, the queue will block the producer thread for a certain period of time. After the time limit is exceeded, the producer thread will exit

Queues should be selected according to the scenario: if you do not want tasks to be discarded, it is recommended to use unbounded queues, or in consideration of the capacity of the JVM, you can define a rejection policy to store tasks in redis. When the capacity of the blocking queue is less than 50%, monitor and put tasks in redis into the blocking queue

Rejection policy of thread pool (RejectedExecutionHandler)

  • AbortPolicy the abort policy (which is used by default) throws an exception RejectedExecutionException
   /**
     * The handler of the rejected task throws {@ code RejectedExecutionException}
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • DiscardPolicy discard strategy: silently discard the tasks that cannot be processed without any processing or throwing exceptions. This is the best strategy if tasks are allowed to be lost
/**
  * The handler of the rejected task will automatically abandon the rejected task.
  */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Do nothing, which will have the effect of discarding the task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
     }
  • DiscardOldestPolicy discards the old task policy: discards the old task in the blocking queue without throwing an exception
/**
  * A handler for a rejected task that discards the oldest unprocessed request and then retries {@ code execute}, except 
  * The non actuator is closed, in which case the task is discarded.
  */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Gets and ignores the next task that the actuator will perform (if a task is immediately available), and then tries to execute the task again 
         * r,Unless the actuator is closed, in which case the task r will be discarded.
         *
         * @param r Runnable task requested
         * @param e The performer who attempted to perform this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
  • CallerRunsPolicy caller's running policy: the main thread executes redundant tasks (it will neither throw exceptions nor abandon tasks, and return some tasks to the caller)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         *Execute the task r in the caller's thread unless the actuator is closed, in which case the task is discarded.
         *
         * @param r Runnable task requested
         * @param e The performer who attempted to perform this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }


//Demo:
public class Main {
    public static void main(String[] args) {
    //The main thread submits the task
       
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("The thread that calls the current task:"+Thread.currentThread().getName());
            }
        };

        runnable.run();
    }
}
//Thread calling the current task for printing results: main
  • The user-defined rejection policy needs to implement the RejectedExecutionHandler interface

Select the rejection policy to use as needed

Monitor the running status of thread pool

You need to focus on four methods in ThreadPoolExecutor

  • Shutdown(): start an orderly shutdown, in which previously submitted tasks will be executed, but no new tasks will be accepted.
  • Shutdown now(): try to stop all active tasks, stop the processing of waiting tasks, and return to the list of tasks waiting to be executed.
  • beforeExecutor(Thread t, Runnable r): calling methods before executing a given Runnable in a given thread.
  • afterExecutor(Runnable r, Throwable t): after the specified Runnable is executed, the method is called.

Partial source code analysis

Detailed source code analysis will produce another article

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //Create threads through the thread factory and pass subsets as parameters
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }






private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        //Judge the status of thread pool
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Create a Worker: put the task into the Worker and initialize the Worker
            //A new thread has been created through the thread factory
            w = new Worker(firstTask);
            //Assign the thread in Woreker to t
            final Thread t = w.thread;
            //Judgment t is not empty
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //Execution locking: controlling concurrency
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //State judgment
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //Put w in the works collection
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //Finally, it will call the run() method in work
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }



//The runWorker() method in the thread pool is called
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //Get the firstTask in the Worker
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //while loop (why threads can be reused)
            //task = getTask() == null will jump out of the loop
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //When an exception is encountered, this sentence will not be executed
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


//Keep getting from the queue
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
         

            // Are workers subject to culling?
            //Return true if the time expires or the current number of worker s is greater than the number of cores
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    //Blocking when the queue is empty and the condition is not met
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    //When the current number of threads is less than the number of core threads: follow this logic
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

//When an exception is encountered, it will go to this. completedAbruptly is true
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        //Judge whether the status of the current thread pool is less than STOP, that is, RUNNING SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //Add another worker
            addWorker(null, false);
        }
    }

Keywords: Java Multithreading

Added by fastfingertips on Fri, 11 Feb 2022 19:41:57 +0200