Problems encountered in using a single thread (new Thread().start):
- Frequent creation and destruction of threads
- Thread execution data is large and high-frequency, and CPU context switching is frequent, resulting in a waste of CPU resources
The above problems make us wonder how to reuse threads and introduce pool technology
For example: connection pool, object pool, thread pool
Pool technology
The core is resource reuse, so that idle threads can be reused
What conditions do thread pools need to meet before they can be brought to reuse?
- Create a series of threads during initialization
- Let the thread not end, through the blocking queue, similar to the producer consumer model + Plus blocking queue (thread safe)
- Wake up a thread to execute the run method when necessary
- When the queue is full, add consumer threads to dynamically expand capacity
- If the expansion cannot be implemented, a rejection policy shall also be provided
- Report an error and throw the exception directly up. AbortPolicy: discards the task and throws RejectedExecutionException, which is the default rejection policy of the thread pool
- Discard DiscardPolicy directly
- Directly call the run method (used by newCachedThreadPool) CallerRunsPolicy
- Remove the head task (the longest waiting task), and then add the new task to the DiscardOldestPolicy
- It is saved and not recycled. It needs to be customized
Thread pool in Java
1. Detailed explanation of parameters
public ThreadPoolExecutor(int corePoolSize, //Number of core threads int maximumPoolSize, //Maximum number of threads long keepAliveTime, //Worker thread (non core thread) lifetime TimeUnit unit, //Company BlockingQueue<Runnable> workQueue, //Blocking queue for storing tasks ThreadFactory threadFactory, //Initialize thread factory RejectedExecutionHandler handler) {} //Reject policy
- The thread survival time is approximately fixed, and the run method ends when it is executed. The worker thread recycling method is to block the fetching task from the queue. It ends after timeout and can be recycled
2. Fixed thread pool newFixedThreadPool
- Number of core threads = maximum number of threads
- The blocking queue uses LinkedBlockingQueue
- Reject the policy and use the default AbortPolicy to report an error
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
3. Single thread pool newSingleThreadExecutor
- Just one thread
- The blocking queue uses LinkedBlockingQueue
- Reject the policy and use the default AbortPolicy to report an error
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
4. Cache thread pool newCachedThreadPool
- How many thread pools to execute
- Synchronous queue yes, it is a blocking queue without structure and storage. It will be blocked when there is no consumer
- The default AbortPolicy of the reject policy reports an error
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
5. Timed thread pool newScheduledThreadPool
- Implemented by ScheduledThreadPoolExecutor, Max integer MAX_ VALUE
- Blocking queue DelayedWorkQueue delay queue
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
Using thread pools in Java
Taking the use of fixed thread pool as an example, others are similar.
ExecutorService threadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+",implement"); }); } //result pool-1-thread-2,implement pool-1-thread-2,implement pool-1-thread-3,implement pool-1-thread-1,implement
If the existing thread pool of Java cannot match the current business, the most reasonable thread pool should be customized
Thread pool source code analysis
- Construction method
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; //Blocking queue defined during initialization this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- AtomicInteger ctl focuses on this member variable. It records the lock ID and the number of threads in the current thread pool according to this ID. it is a thread safe shaping variable
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //Number of active threads in the thread pool (lower 29 bits) - > 29 private static final int COUNT_BITS = Integer.SIZE - 3; //Maximum number of worker threads - > 536870911 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //Various status of thread pool - > - 536870912 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; //Currently runnable thread pit private static int runStateOf(int c) { return c & ~CAPACITY; } //Number of threads currently working private static int workerCountOf(int c) { return c & CAPACITY; } //The unsafe class and its compareAndSwapInt method are used to ensure thread safety public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
The thread pool adopts the atomic Integer identifier, and the 32-bit Integer is divided into two parts. The binary bits (32 bits) of AtomicInteger of atomic class are divided into the upper 3 bits (indicating the running state of the thread pool) and the lower 29 bits (indicating the number of working threads)
1.execute executes a task
//It is carried out in three steps public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //Get current ctl int c = ctl.get(); //If the number of working threads is less than the core thread, start a new thread directly //From here, we know that the initialization core threads are delayed initialization if (workerCountOf(c) < corePoolSize) { //Open a new thread and give it the current Runnable to execute if (addWorker(command, true)) return; c = ctl.get(); } //The second step is to judge the running state of the thread pool and add the task to the blocking queue //If the offer is added successfully, true will be returned; otherwise, false will be returned if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //A task that has died invokes the reject policy if (! isRunning(recheck) && remove(command)) reject(command); //Recheck the working thread. If not, add a thread that is not considered else if (workerCountOf(recheck) == 0) addWorker(null, false); } //If the task cannot be queued, try adding a new task worker thread else if (!addWorker(command, false)) reject(command); }
2. Add a task with addworker()
//Save the set of the core thread Worker private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //Get thread pool status int rs = runStateOf(c); //A series of inspection and judgment if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //Gets the number of threads in the work int wc = workerCountOf(c); //If the number of existing threads is greater than the maximum login capacity or the core is easy, false will be returned directly if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //Add a thread with cas //return ctl.compareAndSet(expect, expect + 1); if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //Tasks wrap the tasks to be executed with Worker //Two variables in Worker, firstTask thread(new thread) w = new Worker(firstTask); //new thread in worker final Thread t = w.thread; if (t != null) { //Lock final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //Check the status of the current thread pool again int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //Throw an exception if the t thread has executed if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //Save Worker to set workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //If a task is added successfully, start the worker thread immediately if (workerAdded) { t.start(); //Next, we come to the run method in the Worker workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
3. The worker class starts the worker thread
//It implements thread Runnable and inherits thread concurrency AQS private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //The constructor stores the thread that needs to be executed and creates a new thread at the same time Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } }
When there are tasks on it, it will be packaged into a worker with two variables.
- firstTask: save the task to be executed
- Thread: it can be seen from the above that a thread is directly new
Then, after adding the member variable workers successfully, the new thread will be started directly. Let's directly find the run method in the worker class
//Worker#run public void run() { runWorker(this); } final void runWorker(Worker w) { //Current thread Thread wt = Thread.currentThread(); //Get and execute task addWorker add Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // Release the lock first to allow interrupts boolean completedAbruptly = true; try { //Get the task getTask() and analyze it below while (task != null || (task = getTask()) != null) { w.lock(); //If the thread pool is stopped or the thread is interrupted, the current thread sends an interrupt ID if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //Some monitoring can be customized before thread execution beforeExecute(wt, task); Throwable thrown = null; try { //Call the run method that executes the task task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //Some monitoring can be customized after thread execution afterExecute(task, thrown); } } finally { //After the task is executed, clear the record of the execution times of the current thread and release the lock task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //If an exception occurs during execution, remove the worker thread processWorkerExit(w, completedAbruptly); } }
4. getTask() in the worker class gets the task to be executed
Don't think about it. There must be a blockage
- workQueue
//The blocking queue defined during initialization is preceded by workqueue Offer (command) add task private final BlockingQueue<Runnable> workQueue;
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? //spin for (;;) { int c = ctl.get(); //Judging thread pool status int rs = runStateOf(c); // Check if queue empty only if necessary. //Does the thread pool stop checking whether the queue is empty if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //Gets the number of worker threads (if there is no task, the core thread will block) int wc = workerCountOf(c); // Are workers subject to culling? //Whether to set the survival time or if the current number of worker threads is greater than the number of cores, you need to set the survival time boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //(the worker thread is larger than the maximum thread 𞓜 there is a lifetime & & timeout) //&&(the worker thread is greater than 1 𞓜 the queue is empty) the worker thread is running the last task //Then directly subtract a worker thread if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //return ctl.compareAndSet(expect, expect - 1) if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //It is extracted from the blocking queue according to whether there is a survival time | blocking timeout Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //If it is still null after timeout, it indicates that the queue is empty. The current thread declares that the cycle ends and jumps out of the loop and returns null //Without the identity of the core thread, whether it can become a core thread depends on who executes it last timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
5.processWorkerExit() worker thread execution completion or exception handling
private void processWorkerExit(Worker w, boolean completedAbruptly) { //If it is an unexpected exit, workerCount-- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //Lock to count the number of tasks executed and remove the Worker in the WorkerSet final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //Record the number of tasks executed, and then remove them from the worker set completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //Determine the status of thread pool and whether to terminate threads according to the status tryTerminate(); int c = ctl.get(); //If the thread pool state is a non stop state less than stop if (runStateLessThan(c, STOP)) { //If there is no unexpected exit, you need to destroy the thread according to the survival flag of the number of running core threads if (!completedAbruptly) { //If KeepAliveTime is set, the core thread will also be destroyed int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //If the core thread keeps at least 0 and the queue is not empty //If the thread pool is not stopped, a worker thread is reserved if (min == 0 && ! workQueue.isEmpty()) min = 1; //If the current total number of Worker threads is greater than the required number of workers, it is returned directly if (workerCountOf(c) >= min) return; // replacement not needed } //Create a new worker core thread addWorker(null, false); } }
Source code summary
When executing a task, if the current working thread is smaller than the core thread, directly initialize a new core thread (worker) and store it in the member variable HashSet workers. If the addition is successful, the thread in the worker will be started.
If the worker thread is larger than the core thread, judge the running status of the thread pool, add it to the blocking queue with offer () (adding successfully returns true, otherwise false), and check the running status of the thread pool. If it is not running, remove the task and call the reject policy. If it is normal but the worker thread is 0, create a new worker thread without task
If the offer() fails to be added, the worker thread will be added directly. If the addition fails, the reject policy will be called
The worker implements the thread Rannable and inherits the AQS. It circulates in the run method to block the tasks to be executed in the queue. getTask() obtains the tasks to be executed, and then directly executes them in the run method. Here, the mutex in AQS is used
getTask() gets the task, or determines whether to block or block with timeout according to whether to set the survival time or whether the number of current working threads is greater than the number of core threads. If it is a timeout blocking, it means that the queue is null, and then the working thread can be destroyed
If KeepAliveTime is set, the core thread will also be destroyed. However, if the queue is not empty but the number of threads is 0, a new worker will be created
That is the whole content of this chapter.
Previous: Mysterious veil of Java local thread variable ThreadLocal
Next: Use of thread pool worker ForkJoin
The emperor's thirty years have seen no success in calligraphy and sword