[concurrent programming: thread pool] is proficient in the source code of java thread pool

Let's look at a piece of code and think about two questions

Take these two questions to see the source code, get twice the result with half the effort!

  • Why is it the fourth error?
  • Why does the third execute first and the second execute later?

Construction method analysis

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

  • In the construction method, only parameter verification is done without any logic processing!
  • ThreadPoolExecutor has seven parameters, which are described below.
  • corePoolSize: the number of core threads, which is also the number of resident threads in the thread pool. Note: there are no threads by default when the thread pool is initialized. When a task comes, it starts to create a thread to execute the task
  • maximumPoolSize: the maximum number of threads. Some non core threads may be added to the number of core threads. A simple understanding is a temporary thread.
  • keepAliveTime: the lifetime of a non core thread. If the idle time of the non core thread exceeds this time, the non core thread will be recycled. If corePoolSize = maximumPoolSize, there is no non core thread, so there is no recycling!
  • Unit: the time unit of keepAliveTime.
  • workQueue: the queue used to save tasks. When the number of tasks exceeds the maximumPoolSize, new tasks will be stored in this queue.
  • threadFactory: the factory class for creating threads. Executors. Factory is used by default Defaultthreadfactory() can also be created using ThreadFactoryBuilder of guava library.
  • handler: the saturation policy when the thread pool cannot continue to receive tasks (the queue is full and the number of threads reaches maximunPoolSize). The values include AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy and DiscardPolicy. The simple understanding is how to handle when the maximumPoolSize is full and the queue is full!

Thread pool execution: analysis of execute() method

    public void execute(Runnable command) {
	// If the task is empty, throw a null pointer exception directly!
        if (command == null)
            throw new NullPointerException();

	// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
        int c = ctl.get();

	// Judge whether the current number of threads is less than corePoolSize. The number of core threads is processed here, and the corresponding code above is thread 1.
        if (workerCountOf(c) < corePoolSize) {
	    // Create a new thread with the addWord method using the parameter entry task
            if (addWorker(command, true))
		// If the thread can be created successfully, it indicates that the execution has been completed! Direct return
                return;

	    // Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
            c = ctl.get();
        }

	// If the above task does not create a thread and the status is running, the current task will be placed in the queue
        if (isRunning(c) && workQueue.offer(command)) {
	    // Successfully put into the queue. Get the current thread pool data again
            int recheck = ctl.get();

	    // check again. If the status changes to non running after the task is added to the queue (it may be that the thread pool is shut down at this point of execution), you need to remove the current task.
            if (! isRunning(recheck) && remove(command))
		// If it is not running and the removal is successful, execute the rejection policy.
                reject(command);
	    // Judge whether the number of valid threads in the current worker thread pool is 0
            else if (workerCountOf(recheck) == 0)
		// Add workers, empty tasks, non core threads! To ensure that the thread pool is running, there must be a task executing
                addWorker(null, false);
        }
	// The core thread is exhausted and cannot be added to the queue. Then increase the number of non core threads!
        else if (!addWorker(command, false))
	    // Failure to increase the number of non core threads indicates that the queue is full and the threads are full. Call reject method
            reject(command);
    }

  • Seeing this can actually explain why the fourth error is reported! The first is placed in the core thread, the second is placed in the queue, the third is in the non core thread, and the fourth calls the reject method.

What are the status of the thread pool?

    // runState is stored in the high-order bits
    // The top three bits indicate the thread pool status 
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

The addWorker() method is called many times during execution

    private boolean addWorker(Runnable firstTask, boolean core) {
	// A loop ID used to jump out of a loop. java does not recommend it.
        retry:
        for (;;) {
	    // Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
            int c = ctl.get();
	    // Status acquisition of thread pool
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
	    // Check that the queue is empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
		// Get the number of valid threads in the current worker pool
                int wc = workerCountOf(c);

		// The number of valid threads in the current worker thread pool is greater than the maximum number of threads (a maximum value, the maximum value of the last 29 bits)
		// Or when the number of effective threads in the worker thread pool is greater than the maximum number of core (total) threads, execution cannot continue!
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
		// Increase the number of threads by one through CAS (CSA will be described in detail later). Successfully jumped out of the loop!
                if (compareAndIncrementWorkerCount(c))
                    break retry;
		// Query loading: thread pool data
                c = ctl.get();  // Re-read ctl
		// The state of the newly acquired thread pool is inconsistent with the initial state. Start the cycle again!
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
		// Other CAS failed due to workerCount change; Retry internal loop
            }
        }

	// The above code is mainly to cas increase the number of threads by one	

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
	    // An internal class: create threads through threadfactory!
            w = new Worker(firstTask);
	    // Get your Worker thread
            final Thread t = w.thread;
	    // When the thread exists
            if (t != null) {
		// Acquire lock
                final ReentrantLock mainLock = this.mainLock;
		// Lock
                mainLock.lock();
                try {
                    // Recheck while holding lock. Recheck while holding the lock.
                    // Back out on ThreadFactory failure or if ThreadFactory fails or
                    // shut down before lock acquired.  Close before obtaining the lock.

		    // Gets the status of the thread pool
                    int rs = runStateOf(ctl.get());

		    // Less than shutdown is the running status
		    // firstTask is empty when SHUTDOWN. Is to process tasks from the queue! Then you can put it in the collection
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
			// Precheck that is startable
			// What happens is that the thread has not start ed yet
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
			// workers is a HashSet to which the worker object is added
                        workers.add(w);
			// Gets the length of the current HashSet
                        int s = workers.size();
			// The length is relatively large. Record maximum threads
                        if (s > largestPoolSize)
                            largestPoolSize = s;
			// Update status: indicates that the thread can be started
                        workerAdded = true;
                    }
                } finally {
		    // Release the lock!
                    mainLock.unlock();
                }
		// The above creation is successful, and the identification thread can be started
                if (workerAdded) {
		    // Thread start
                    t.start();
		    // Indicates that the thread has started
                    workerStarted = true;
                }
            }
        } finally {
	    // Thread startup failed, processing failed logic
            if (! workerStarted)
		// Failure fallback removes w threads from wokers minus 1 and attempts to end the thread pool
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Learn more about the Worker class just used

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
	// Running woker thread
        final Thread thread;
        /** Initial task to run.  Possibly null. */
	// Incoming tasks
        Runnable firstTask;
        /** Per-thread task counter */
	// For monitoring the number of tasks completed
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
	    // Disable thread interrupt 
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

Running logic of threads in thread pool

  • In the addWorker method of the above code, there is t.start(); Such a line, this line opens the real thread
  • This line of code opens a new thread and calls the Worker's run() method.
  • Then the Worker's run() method calls runWorker(this);

Thread execution: runWorker(this)

    final void runWorker(Worker w) {
	// Gets the current thread
        Thread wt = Thread.currentThread();
	// Get current task
        Runnable task = w.firstTask;
	// Set the current task to null
        w.firstTask = null;
	// Unlock operation, corresponding to setState(-1) initialized by Worker class;
        w.unlock(); // allow interrupts
	// Is the loop exited because of an exception
        boolean completedAbruptly = true;
        try {
	    // There are tasks currently, which can be executed directly
	    // There is no task at present. Get task execution from the queue!
	    // Here you can see the answer to the second question, why do you execute 3 first and then 2
            while (task != null || (task = getTask()) != null) {
		// Lock
                w.lock();
                // If pool is stopping, ensure thread is interrupted; If the pool is stopping, ensure that the thread is interrupted;
                // if not, ensure thread is not interrupted.  If not, make sure the thread is not interrupted.
                //  This requires a check in second case to deal with Shutdown now race while clearing interrupt. It should be considered that the shutdown now method may also be executed during the execution of the if statement, and the shutdown now method will set the status to STOP
		// Thread. The function of the interrupted () method is to test whether the current thread is interrupted (check the interrupt flag), return a boolean and clear the interrupt state. The interrupt state has been cleared during the second call, and a false will be returned.
		// The current thread is interrupted when it is greater than or equal to stop, or in thread Interrupted () can be satisfied, and it is not interrupted
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
		    // Interrupt current thread
                    wt.interrupt();
                try {
		    // Before executing the task, the subclass can implement this hook method to expand the thread pool
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
			// Perform tasks
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
			// After executing the task, the subclass can implement this hook method to expand the thread pool
                        afterExecute(task, thrown);
                    }
                } finally {
		    // Set task to null, and it will be obtained from the queue in the next cycle
                    task = null;
		    // Number of completed tasks + 1
                    w.completedTasks++;
		    // Unlock
                    w.unlock();
                }
            }
	    // Flag does not exit because of an exception
            completedAbruptly = false;
        } finally {
	    // Processing logic for task execution completion
            processWorkerExit(w, completedAbruptly);
        }
    }

How to get a task from a queue: getTask()

    private Runnable getTask() {
	// Defines the timeout flag: whether the task timed out the last time it was fetched from the blocking queue
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
	    // Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
            int c = ctl.get();
	    // Gets the status of the thread pool
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. Check that the queue is empty only if necessary.
	    // The current status is greater than or equal to SHUTDOWN and (greater than or equal to STOP or the queue is empty)
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
		// The thread has stopped, and the current workerCount is reduced by one. To offset the addition of one to the method of compareAndIncrementWorkerCount(c) in addWorker
                decrementWorkerCount();
                return null;
            }

	    // Get the number of valid threads in the current worker pool
            int wc = workerCountOf(c);

            // Are workers subject to culling?  Will workers be eliminated?
	    // If false (the default), the core thread remains active even when idle. If true, the core thread uses keepAliveTime timeout to wait for work.
	    // WC > corepoolsize, indicating that the number of threads in the current thread pool is greater than the number of core threads; For these threads that exceed the number of core threads, timeout control is required
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

	    // The number of valid threads in the current worker pool is greater than the maximum number of threads required
	    // Timed & & timedout if true, it means that the current operation needs timeout control, and the last time the task was obtained from the blocking queue timed out
	    // The number of valid threads is greater than 1, or the blocking queue is empty
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
		// Then try subtracting workerCount by one. To offset the addition of one to the method compareAndIncrementWorkerCount(c) in addWorker
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
		// If it is true, the timeout is controlled through the poll method of blocking the queue. If the task is not obtained within the keepAliveTime time, null is returned;
		// Otherwise, through the take method, if the queue is empty at this time, the take method will block until the queue is not empty.
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
		// Get the data from the queue and return it directly
                if (r != null)
                    return r;
		// If no data is obtained, it indicates that it has timed out, and timedOut is set to true
                timedOut = true;
            } catch (InterruptedException retry) {
		// If the current thread is interrupted when obtaining the task, set timedOut to false and return to loop retry
                timedOut = false;
            }
        }
    }

What to do after task execution: processWorkerExit()

  • It is mainly used for thread cleaning
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
	// If completedAbruptly is true, an error occurred
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
	    // Careful subtraction operation
            decrementWorkerCount();

	// Acquisition theory
        final ReentrantLock mainLock = this.mainLock;
	// Lock
        mainLock.lock();
        try {
	    // Number of completed tasks per worker thread
            completedTaskCount += w.completedTasks;
	    // Remove current work task
            workers.remove(w);
        } finally {
	    // Unlock
            mainLock.unlock();
        }

	// Judge whether to end the thread pool according to the thread pool status
        tryTerminate();
	// Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
        int c = ctl.get();
	// The current status is less than the STOP status
        if (runStateLessThan(c, STOP)) {
	    // Not an abnormal interrupt
            if (!completedAbruptly) {
		// If false (the default), the core thread remains active even when idle. If true, the core thread uses keepAliveTime timeout to wait for work.
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
		// Keep the logic of at least one Worker
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
		// The current number of jobs is greater than or equal to the number of core threads
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
	    // Continue to execute new tasks, empty tasks, get from the queue!
            addWorker(null, false);
        }
    }

When will the thread pool end: tryTerminate();

  • This method needs to be triggered
    final void tryTerminate() {
        for (;;) {
	    // Get the data of the thread pool: the upper 3 bits represent the thread pool status, and the lower 29 bits represent the number of threads
            int c = ctl.get();

	    // RUNNING cannot be stopped because it is still RUNNING;
	    // TIDYING or TERMINATED, because there are no running threads in the thread pool; Other restrictions are being groomed or have stopped.
	    // HUTDOWN and the waiting queue is not empty. At this time, the task in the workQueue should be executed.
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
	    // If the number of threads is not 0
            if (workerCountOf(c) != 0) { // Eligible to terminate
		// Interrupt an idle worker thread
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

	    // Acquire lock
            final ReentrantLock mainLock = this.mainLock;
	    // Lock
            mainLock.lock();
            try {
		// Trying to set the status to TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
			// Set the status to TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
			// Wake up called the thread awaitTermination() waiting for the thread pool to terminate
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
		// Unlock
                mainLock.unlock();
            }
            // else retry on failed CAS retry
        }
    }

Conclusion

  • Get more valuable articles and let's become architects together!
  • Paying attention to the official account enables you to have a deeper understanding of MySQL and concurrent programming.
  • This official account is not advertising!!! Update daily!!!

Keywords: Concurrent Programming

Added by baby_g on Wed, 12 Jan 2022 21:27:18 +0200