1, Introduction
1. Concept
Synchronization: synchronization refers to the sequential execution of the whole processing process. When each process is completed, the result is returned.
Asynchrony: asynchronously invoking is just sending the called instruction, and the caller does not need to wait for the called method to be fully executed; Instead, continue with the following process.
2. Asynchronous multithreading overview
In actual project development, many business scenarios need to be completed asynchronously. For example, common functions such as message notification and logging can be executed asynchronously to improve efficiency. Generally speaking, there are two ways to complete asynchronous operations: message queue MQ and thread pool processing ThreadPoolExecutor. The thread pool ThreadPoolTaskExecutor encapsulated by ThreadPoolExecutor provided after spring 4 directly uses annotation to enable @ Async on the method, You can easily use asynchronous threads (don't forget to add @ EnableAsync open annotation function to any Configuration file here)
3. Spring has implemented thread pool
- SimpleAsyncTaskExecutor: it is not a real thread pool. This class does not reuse threads. By default, a new thread will be created for each call.
- SyncTaskExecutor: this class does not implement asynchronous call, but a synchronous operation. Where multithreading is not required.
- ConcurrentTaskExecutor: the adapter class of Executor, which is not recommended. Consider using this class only if the ThreadPoolTaskExecutor does not meet the requirements.
- SimpleThreadPoolTaskExecutor: is the class of SimpleThreadPool of quartz. This class is required only if the thread pool is used by both quartz and non quartz.
- ThreadPoolTaskExecutor: most commonly used, recommended. Its essence is to Java util. concurrent. Packaging of ThreadPoolExecutor.
4. Asynchronous method
reference resources: Java 8 asynchronous programming
- The simplest asynchronous call, the return value is void
- For asynchronous calls with parameters, asynchronous methods can pass in parameters
- If there is a return value, it is often called to return Future
2, @ Async default thread pool
1. Default @ Async asynchronous call example
1.1 start asynchronous task
@Configuration @EnableAsync public class SyncConfiguration { }
1.2 mark asynchronous calls on Methods
Add a service class for integral processing@ Async is added to the method to represent that the method is asynchronous processing.
public class ScoreService { private static final Logger logger = LoggerFactory.getLogger(ScoreService.class); @Async public void addScore(){ //TODO simulates sleeping for 5 seconds, which is used for integral processing try { Thread.sleep(1000*5); logger.info("--------------Processing integral--------------------"); } catch (InterruptedException e) { e.printStackTrace(); } } }
2. Default thread pool
2.1 disadvantages of executors
In the application of thread pool, refer to Alibaba java development specification: thread pool is not allowed to be created by Executors, and the default thread pool of the system is not allowed. It is recommended to use ThreadPoolExecutor. This processing method makes the development engineers more clear about the operation rules of thread pool and avoid the risk of resource depletion. Disadvantages of Executors' methods:
- newFixedThreadPool and newSingleThreadExecutor: the main problem is that the stacked request processing queue may consume a lot of memory, even OOM
- newCachedThreadPool and newScheduledThreadPool: the main problem is that the maximum number of threads is integer MAX_ Value, a very large number of threads may be created, even OOM
2.2 @Async disadvantages
@The default asynchronous configuration of Async uses SimpleAsyncTaskExecutor. The thread pool creates one thread for each task by default. If the system continues to create threads, the system will eventually occupy too much memory and cause OutOfMemoryError error.
To solve the problem of thread creation, SimpleAsyncTaskExecutor provides a current limiting mechanism. The switch is controlled through the concurrencyLimit attribute. When concurrencyLimit > = 0, the current limiting mechanism is turned on. By default, the current limiting mechanism is turned off, that is, concurrencyLimit=-1. When it is turned off, new threads will be created to process tasks. Based on the default configuration, SimpleAsyncTaskExecutor is not a thread pool in the strict sense and cannot achieve the function of thread reuse
3, @ Async custom thread pool
1. Introduction
The user-defined thread pool can control the thread pool in a more fine-grained way, which is convenient to adjust the size configuration of the thread pool, and the thread performs exception control and handling. When setting the system custom thread pool to replace the default thread pool, although it can be set through multiple modes, there is and can only be one thread pool generated by replacing the default thread pool (multiple classes cannot be set to inherit asyncconfigurator). The custom thread pool has the following modes:
- Re implement the interface AsyncConfigurer
- Inherit AsyncConfigurerSupport
- Configure a custom TaskExecutor to replace the built-in task executor
By checking the default calling rules of Spring source code about @ Async, the class that implements the AsyncConfigurer interface in the source code will be queried first. The class that implements this interface is AsyncConfigurerSupport** However, the default configured thread pool and asynchronous processing method are empty, so you need to specify a thread pool whether you inherit or re implement the interface** And re implement the public executor getasyncexecution() method.
2. Several ways for Spring to customize asynchronous thread pool
2.1 configure application yml
Here I use the configuration file injection method. First, configure the configuration file
# Configure the number of core threads async: executor: thread: core_pool_size: 5 # Configure the maximum number of threads max_pool_size: 5 # Configure queue size queue_capacity: 999 # Configure the maximum idle time of threads keep_alive_seconds: 60 # Configure the name prefix of threads in the thread pool name: prefix: test-async-
2.2 implementation interface AsyncConfigurer
@Configuration @EnableAsync public class ExecutorConfig1 implements AsyncConfigurer { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("open SpringBoot Thread pool!"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Set the number of core threads executor.setCorePoolSize(corePoolSize); // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full executor.setMaxPoolSize(maxPoolSize); // Set buffer queue size executor.setQueueCapacity(queueCapacity); // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives executor.setKeepAliveSeconds(keepAliveSeconds); // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located executor.setThreadNamePrefix(namePrefix); // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself, // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // Thread pool initialization executor.initialize(); return executor; } @Override public Executor getAsyncExecutor() { return asyncServiceExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> logger.error(String.format("Perform asynchronous tasks'%s'", method), ex); } }
2.3 inherit AsyncConfigurerSupport
@Configuration @EnableAsync public class ExecutorConfig2 extends AsyncConfigurerSupport { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { logger.info("open SpringBoot Thread pool!"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Set the number of core threads executor.setCorePoolSize(corePoolSize); // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full executor.setMaxPoolSize(maxPoolSize); // Set buffer queue size executor.setQueueCapacity(queueCapacity); // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives executor.setKeepAliveSeconds(keepAliveSeconds); // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located executor.setThreadNamePrefix(namePrefix); // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself, // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // Thread pool initialization executor.initialize(); return executor; } @Override public Executor getAsyncExecutor() { return asyncServiceExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> logger.error(String.format("Perform asynchronous tasks'%s'", method), ex); } }
2.4 configuring a custom TaskExecutor
Since the default thread pool of asyncconfigurator is empty in the source code, Spring uses beanfactory GetBean (TaskExecutor. Class) first checks whether there is a wired process pool. If it is not configured, use beanfactory GetBean (default_task_executor_bean_name, executor. Class), and query whether there is a thread pool with the default name of TaskExecutor.
Therefore, when replacing the default thread pool, you need to set the default thread pool name as TaskExecutor. In this mode, the bottom layer is TaskExecutor Class. When replacing the default thread pool, the thread pool name may not be specified.
@Configuration @EnableAsync public class ExecutorConfig3 { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.keep_alive_seconds}") private int keepAliveSeconds; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME) public Executor taskExecutor() { logger.info("open SpringBoot Thread pool!"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Set the number of core threads executor.setCorePoolSize(corePoolSize); // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full executor.setMaxPoolSize(maxPoolSize); // Set buffer queue size executor.setQueueCapacity(queueCapacity); // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives executor.setKeepAliveSeconds(keepAliveSeconds); // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located executor.setThreadNamePrefix(namePrefix); // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself, // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // Thread pool initialization executor.initialize(); return executor; } @Bean(name = "myTask") public Executor taskExecutor() { logger.info("open SpringBoot Thread pool!"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // Set the number of core threads executor.setCorePoolSize(corePoolSize); // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full executor.setMaxPoolSize(maxPoolSize); // Set buffer queue size executor.setQueueCapacity(queueCapacity); // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives executor.setKeepAliveSeconds(keepAliveSeconds); // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located executor.setThreadNamePrefix(namePrefix); // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself, // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // Thread pool initialization executor.initialize(); return executor; } }
2.5 multithreading
@Async annotation uses the system default or user-defined thread pool (instead of the default thread pool), or multiple thread pools can be set in the project. When calling asynchronously, indicate the name of the thread pool to be called, such as @ Async("mytask").
Reference articles
https://mp.weixin.qq.com/s/ACJgGFofD9HAYqW4u8qJUw