8. Tools for sharing models
8.1 thread pool
Compared with pooling technology, it is common. Thread pool, database connection pool, Http connection pool and so on are all applications of this idea. The idea of pooling technology is mainly to reduce the consumption of resources and improve the utilization of resources.
Thread pools provide a way to limit and manage resources (including performing a task). Each thread pool also maintains some basic statistics, such as the number of completed tasks.
Here, the benefits of using thread pool are mentioned in the art of Java Concurrent Programming:
- Reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing the created threads.
- Improve response speed. When the task arrives, the task can be executed immediately without waiting for the thread to be created.
- Improve thread manageability. Threads are scarce resources. If they are created without restrictions, they will not only consume system resources, but also reduce the stability of the system. Using thread pool can be used for unified allocation, tuning and monitoring.
8.1.1 custom thread pool
Example code design steps: Test18.java
- Step 1: customize the rejection policy interface
- Step 2: customize the task queue
- Step 3: Customize thread pool
- Step 4: Test
8.1.2 ThreadPoolExecutor
Executor framework (mainly composed of three parts)
- Task (Runnable /Callable)
The Runnable interface or Callable interface that needs to be implemented to execute the task. Both Runnable interface and Callable interface implementation classes can be executed by ThreadPoolExecutor or ScheduledThreadPoolExecutor.
- Task executor
As shown in the figure above, it includes the core interface Executor of the task execution mechanism and the ExecutorService interface inherited from the Executor interface. The two key classes ThreadPoolExecutor and ScheduledThreadPoolExecutor implement the ExecutorService interface.
Many underlying class relationships are mentioned here, but in fact, we need to pay more attention to the ThreadPoolExecutor class, which is still used very frequently in our actual use of thread pool.
- Results of asynchronous calculation (Future)
The Future interface and the FutureTask class, the implementation class of the Future interface, can represent the results of asynchronous calculation.
When we submit the implementation class of Runnable interface or Callable interface to ThreadPoolExecutor or ScheduledThreadPoolExecutor for execution. (a FutureTask object will be returned when we call the submit() method)
4. Schematic diagram of executor framework
- The main thread must first create a task object that implements the Runnable or Callable interface.
- The created object implementing the Runnable/Callable interface is directly handed over to ExecutorService for execution: ExecutorService.execute (Runnable command)) or the Runnable object or Callable object can be submitted to ExecutorService for execution (ExecutorService.submit (Runnable task) or ExecutorService.submit (Callable < T > task)).
- If ExecutorService.submit (...) is executed, ExecutorService will return an object that implements the Future interface (we just mentioned the difference between executing the execute() method and the submit() method, and submit() will return a FutureTask object).
- Finally, the main thread can execute FutureTask.get() method to wait for the task to complete. The main thread can also execute FutureTask.cancel (boolean mayInterruptIfRunning) to cancel the task
1) Thread pool status
ThreadPoolExecutor uses the upper 3 bits of int to indicate the thread pool status, and the lower 29 bits to indicate the number of threads
Compare numerically (the first bit is the sign bit), terminated > tidying > stop > shutdown > running
This information is stored in an atomic variable ctl. The purpose is to combine the thread pool state with the number of threads, so that cas atomic operation can be used once
Assign values
// c is the old value and ctlOf returns the new value ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); // rs is the upper 3 bits representing the thread pool status, wc is the lower 29 bits representing the number of threads, and ctl is to merge them private static int ctlOf(int rs, int wc) { return rs | wc; }
2) Construction method
Let's take a look at the thread method with the most parameters
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ }
- corePoolSize number of core threads (maximum number of threads reserved)
- maximumPoolSize maximum number of threads (core threads plus emergency threads)
- keepAliveTime the survival time of the emergency thread (the core thread has no survival time, and the core thread will run all the time)
- Unit time unit - for emergency threads
- workQueue blocking queue
- threadFactory thread factory - a good name for thread creation
- handler reject policy
- There are no threads in the thread pool at first. When a task is submitted to the thread pool, the thread pool will create a new thread to execute the task.
- When the number of threads reaches the corePoolSize and no threads are idle, add tasks, and the newly added tasks will be added to the workQueue queue
Queue until there are free threads. - If a bounded queue is selected for the queue, a line of maximumPoolSize - corePoolSize number will be created when the task exceeds the queue size
Cheng came to help. - If the thread reaches maximumPoolSize and there are still new tasks, the reject policy will be executed. The first four implementations of the rejection policy jdk are provided below, and other well-known frameworks also provide implementations
- ThreadPoolExecutor.AbortPolicy makes the caller throw RejectedExecutionException, which is the default policy
- ThreadPoolExecutor.CallerRunsPolicy lets the caller run the task
- ThreadPoolExecutor.DiscardPolicy abandon this task
- Threadpoolexecutor.discardolddestpolicy discards the earliest task in the queue and replaces this task
- The implementation of Dubbo will record the log and dump the thread stack information before throwing the RejectedExecutionException exception
Convenient positioning problem - Netty's implementation is to create a new thread to perform tasks
- The implementation of ActiveMQ, with timeout waiting (60s), attempts to put into the queue, similar to the previously customized rejection policy
- The implementation of PinPoint, which uses a rejection policy chain, will try each rejection policy in the policy chain one by one
- After the peak has passed, if the emergency thread exceeding the corePoolSize has no task to do for a period of time, it needs to end to save resources. This time is determined by
keepAliveTime and unit.
According to this construction method, the JDK Executors class provides many factory methods to create thread pools for various purposes
3) newFixedThreadPool
This is the factory method provided by the executors class to create a thread pool! Executors is a tool class of the Executor framework! Test19.java
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
From the source code, we can see that the new ThreadPoolExecutor(xxx) method actually calls the construction method of the complete parameters mentioned earlier, using the default thread factory and rejection policy!
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
characteristic
- Number of core threads = = maximum number of threads (no emergency threads were created), so there is no timeout
- The blocking queue is unbounded and can hold any number of tasks
- It is applicable to tasks with known task volume and relatively time-consuming
4) newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
characteristic
- The number of core threads is 0, and the maximum number of threads is Integer.MAX_VALUE, the idle lifetime of the emergency thread is 60s, which means
- All are emergency threads (can be recycled after 60s)
- Emergency threads can be created indefinitely
- The queue is implemented by SynchronousQueue, which is characterized by no capacity and no thread to get it. It can't be put in (hand in money and hand in money)
(2) SynchronousQueue test code Test20.java - The whole thread pool shows that the number of threads will increase continuously according to the number of tasks, and there is no upper limit. When the task is completed and idle for 1 minute, the line will be released
Cheng. It is suitable for the situation where the number of tasks is dense but the execution time of each task is short
5) newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }
Usage scenario:
- You want multiple tasks to be queued. When the number of threads is fixed to 1 and the number of tasks is more than 1, it will be put into an unbounded queue. When the task is completed, the only thread will not be released.
- difference:
- The difference from creating a single thread execution task: create a single thread serial execution task yourself. If the task fails to execute and terminates, there is no remedy, and a new thread will be created in the thread pool to ensure the normal operation of the pool
- Executors.newSingleThreadExecutor() the number of threads is always 1 and cannot be modified
- The FinalizableDelegatedExecutorService applies the decorator mode and only exposes the ExecutorService interface because
This cannot call methods unique to ThreadPoolExecutor
- The FinalizableDelegatedExecutorService applies the decorator mode and only exposes the ExecutorService interface because
- The difference between Executors.newFixedThreadPool(1) and the initial time is 1: Executors.newFixedThreadPool(1) is 1 at the beginning, and can be modified later. It is ThreadPoolExecutor object that is exposed to the outside world, and can be modified by calling setCorePoolSize after strong transformation.
The disadvantages of the thread pool object returned by Executors are as follows:
- FixedThreadPool and singlethreadexecution: the queue length allowed for requests is Integer.MAX_VALUE, which may accumulate a large number of requests, resulting in OOM.
- CachedThreadPool and ScheduledThreadPool: the number of threads allowed to be created is Integer.MAX_VALUE, a large number of threads may be created, resulting in OOM.
To put it bluntly, use bounded queues to control the number of threads created.
In addition to avoiding OOM, the two fast thread pools provided by Executors are not recommended:
- In actual use, you need to manually configure the parameters of the thread pool according to your machine's performance and business scenarios, such as the number of core threads, the task queue used, the saturation strategy, and so on.
- We should explicitly name our thread pool to help us locate the problem.
6) Submit task
Test21.java
// Perform tasks void execute(Runnable command); // Submit the task and use the return value Future to obtain the task execution result. The principle of Future is to use the protective pause mode we mentioned earlier to accept the returned result. The main thread can execute futureask. Get() method to wait for the task execution to complete <T> Future<T> submit(Callable<T> task); // Submit all tasks in tasks <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // Submit all tasks in tasks with timeout <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // Submit all tasks in the tasks. Which task is successfully executed first will return the execution result of this task, and other tasks will be cancelled <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // Submit all tasks in tasks. Which task is successfully executed first will return the execution result of this task. Other tasks will be cancelled with timeout <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
7) Close thread pool
Test22.java
shutdown
/* The thread pool state changes to SHUTDOWN - No new tasks will be received - However, the submitted tasks will be completed, including those in the waiting queue - This method does not block the execution of the calling thread */ void shutdown();
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // Modify thread pool status advanceRunState(SHUTDOWN); // Only idle threads are interrupted interruptIdleWorkers(); onShutdown(); // Extension point ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // Attempt to terminate (no running thread can terminate immediately) tryTerminate(); }
shutdownNow
/* The thread pool status changes to STOP - No new tasks will be received - The tasks in the queue are returned - And interrupt the executing task in the way of interrupt */ List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // Modify thread pool status advanceRunState(STOP); // Break all threads interruptWorkers(); // Get the remaining tasks in the queue tasks = drainQueue(); } finally { mainLock.unlock(); } // Try to end tryTerminate(); return tasks; }
Other methods
// This method returns true if the thread pool is not in the RUNNING state boolean isShutdown(); // Is the thread pool status TERMINATED boolean isTerminated(); // After calling shutdown, because the method of calling the thread to end the thread is asynchronous and will not return after all tasks are completed, it can use this method to wait if it wants to do other things after the thread pool is TERMINATED boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
Asynchronous mode worker thread
1. Definitions
Let a limited number of worker threads take turns to process an unlimited number of tasks asynchronously. It can also be classified as division of labor mode. Its typical implementation is thread pool, which also reflects the sharing mode in the classical design mode.
For example, the waiter (thread) of Haidilao handles each guest's order (task) in turn. If each guest is equipped with a dedicated waiter, then
The cost is too high (compared with another multi-threaded design pattern: thread per message)
Note that different task types should use different thread pools, which can avoid hunger and improve efficiency
For example, if a restaurant worker has to greet guests (task type A) and cook in the back kitchen (task type B), it is obviously inefficient and divided into
Waiter (thread pool A) and cook (thread pool B) are more reasonable. Of course, you can think of A more detailed division of labor
2. Hunger
Fixed size thread pool will starve Test23.java
- Two workers are two threads in the same thread pool
- What they have to do is: order for the guests and cook in the back kitchen, which are two stages of work
- Guests' ordering: they must finish ordering first, wait for the dishes to be ready and serve. During this period, the workers handling the ordering must wait
- Kitchen cooking: nothing to say, just do it
- For example, worker A handles the order task, and then it waits for worker B to finish the dishes and serve them. They also cooperate very well
But now there are two guests at the same time. At this time, worker A and worker B go to order. At this time, no one cooks and is hungry
The solution is to increase the size of the thread pool, but it is not the fundamental solution. As mentioned earlier, different thread pools are used for different task types, such as Test24.java
-
How large a thread pool is appropriate to create?
Too small will lead to the program can not make full use of system resources and easy to lead to hunger. Too small will lead to more thread context switching and occupy more memory
- CPU intensive computing
Generally, the CPU core count + 1 can achieve the optimal CPU utilization. The + 1 ensures that when the thread is suspended due to page loss fault (operating system) or other reasons, the additional thread can be pushed up to ensure that the CPU clock cycle is not wasted - I/O intensive operation
The CPU is not always busy. For example, when you perform business computing, CPU resources will be used, but when you perform I/O operations, remote RPC calls, including database operations, the CPU will be idle. You can use multithreading to improve its utilization.- The empirical formula is as follows: number of threads = number of cores * expected CPU utilization * total time (CPU calculation time + waiting time) / CPU calculation time
For example, the calculation time of 4-core CPU is 50%, and the other waiting time is 50%. It is expected that the CPU will be 100% utilized. Apply the formula
4 * 100% * 100% / 50% = 8
For example, the calculation time of 4-core CPU is 10%, and the other waiting time is 90%. It is expected that the CPU will be 100% utilized. Apply the formula
4 * 100% * 100% / 10% = 40
- The empirical formula is as follows: number of threads = number of cores * expected CPU utilization * total time (CPU calculation time + waiting time) / CPU calculation time
- CPU intensive computing
8) Task scheduling thread pool
Before the "task scheduling thread pool" function is added, you can use java.util.Timer to realize the timing function. The advantage of Timer is that it is easy to use, but
Because all tasks are scheduled by the same thread, all tasks are executed serially. At the same time, only one task can be executed, the previous one
The delay or exception of a task will affect subsequent tasks. Test25.java
Rewrite with ScheduledExecutorService: Test26.java
- The whole thread pool shows that when the number of threads is fixed and the number of tasks is more than the number of threads, it will be put into an unbounded queue. After the task is completed, these lines
Cheng will not be released. Used to perform tasks that are delayed or repeated. - Use of scheduleAtFixedRate method in ScheduledExecutorService Test27.java
- Use of scheduleWithFixedDelay method in ScheduledExecutorService Test27.java
9) Correctly handle exceptions in executing tasks
It can be found that if a thread in the thread pool throws an exception when executing a task, the default is to interrupt the execution of the task instead of throwing an exception or printing exception information.
Method 1: actively catch exceptions
ExecutorService pool = Executors.newFixedThreadPool(1); pool.submit(() -> { try { log.debug("task1"); int i = 1 / 0; } catch (Exception e) { log.error("error:", e); } });
Method 2: using Future, the error information is encapsulated in the return method of the submit method!
ExecutorService pool = Executors.newFixedThreadPool(1); Future<Boolean> f = pool.submit(() -> { log.debug("task1"); int i = 1 / 0; return true; }); log.debug("result:{}", f.get());
10) Tomcat thread pool
Where does Tomcat use thread pools
- LimitLatch is used to limit current and control the maximum number of connections, similar to Semaphore in J.U.C, which will be described later
- The Acceptor is only responsible for [receiving new socket connections]
- Poller is only responsible for monitoring whether the socket channel has [readable I/O events]
- Once readable, encapsulate a task object (socket processor) and submit it to the Executor thread pool for processing
- The worker thread in the Executor thread pool is ultimately responsible for [processing requests]
Tomcat thread pool extends ThreadPoolExecutor with slightly different behavior
- If the total number of threads reaches maximumPoolSize, the RejectedExecutionException exception will not be thrown immediately. Instead, try to put the task into the queue again. If it still fails, the RejectedExecutionException exception will be thrown
Source code tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { // Re entering the task into the blocking queue if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent.isShutdown() ) throw new RejectedExecutionException( "Executor not running, can't force a command into the queue" ); return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected }
Connector configuration
Executor thread configuration
The daemon thread means that the thread will end with the end of the main thread
The following figure seems to be a bit wrong. When submitting a task < core thread, it should be directly handed over to the core thread for execution.
8.1.3 Fork/Join
- concept
- Fork/Join is a new thread pool implementation added to JDK 1.7. It embodies a divide and conquer idea and is suitable for cpu intensive operations that can split tasks
- The so-called task splitting is to split a large task into small tasks with the same algorithm until it cannot be split and can be solved directly. Some calculations related to recursion, such as merge sorting and Fibonacci sequence, can be solved by divide and conquer
- Fork/Join adds multithreading on the basis of divide and conquer, which can divide and merge each task to different threads, further improving the operation efficiency
- Fork/Join creates a thread pool with the same size as the number of cpu cores by default
- use
Tasks submitted to the Fork/Join thread pool need to inherit RecursiveTask (with return value) or RecursiveAction (without return value), for example
The surface defines a task Test28.java for summing integers between 1 and n
Improved algorithm logic diagram of Test29.java Test29