SpringBoot @Async asynchronous multithreading

1, Introduction

1. Concept

Synchronization: synchronization refers to the sequential execution of the whole processing process. When each process is completed, the result is returned.
Asynchrony: asynchronously invoking is just sending the called instruction, and the caller does not need to wait for the called method to be fully executed; Instead, continue with the following process.

2. Asynchronous multithreading overview

In actual project development, many business scenarios need to be completed asynchronously. For example, common functions such as message notification and logging can be executed asynchronously to improve efficiency. Generally speaking, there are two ways to complete asynchronous operations: message queue MQ and thread pool processing ThreadPoolExecutor. The thread pool ThreadPoolTaskExecutor encapsulated by ThreadPoolExecutor provided after spring 4 directly uses annotation to enable @ Async on the method, You can easily use asynchronous threads (don't forget to add @ EnableAsync open annotation function to any Configuration file here)

3. Spring has implemented thread pool

  • SimpleAsyncTaskExecutor: it is not a real thread pool. This class does not reuse threads. By default, a new thread will be created for each call.
  • SyncTaskExecutor: this class does not implement asynchronous call, but a synchronous operation. Where multithreading is not required.
  • ConcurrentTaskExecutor: the adapter class of Executor, which is not recommended. Consider using this class only if the ThreadPoolTaskExecutor does not meet the requirements.
  • SimpleThreadPoolTaskExecutor: is the class of SimpleThreadPool of quartz. This class is required only if the thread pool is used by both quartz and non quartz.
  • ThreadPoolTaskExecutor: most commonly used, recommended. Its essence is to Java util. concurrent. Packaging of ThreadPoolExecutor.

4. Asynchronous method

reference resources: Java 8 asynchronous programming

  • The simplest asynchronous call, the return value is void
  • For asynchronous calls with parameters, asynchronous methods can pass in parameters
  • If there is a return value, it is often called to return Future

2, @ Async default thread pool

1. Default @ Async asynchronous call example

1.1 start asynchronous task

@Configuration
@EnableAsync
public class SyncConfiguration {

}

1.2 mark asynchronous calls on Methods

Add a service class for integral processing@ Async is added to the method to represent that the method is asynchronous processing.

public class ScoreService {

    private static final Logger logger = LoggerFactory.getLogger(ScoreService.class);

    @Async
    public void addScore(){
        //TODO simulates sleeping for 5 seconds, which is used for integral processing
        try {
            Thread.sleep(1000*5);
            logger.info("--------------Processing integral--------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. Default thread pool

2.1 disadvantages of executors

In the application of thread pool, refer to Alibaba java development specification: thread pool is not allowed to be created by Executors, and the default thread pool of the system is not allowed. It is recommended to use ThreadPoolExecutor. This processing method makes the development engineers more clear about the operation rules of thread pool and avoid the risk of resource depletion. Disadvantages of Executors' methods:

  • newFixedThreadPool and newSingleThreadExecutor: the main problem is that the stacked request processing queue may consume a lot of memory, even OOM
  • newCachedThreadPool and newScheduledThreadPool: the main problem is that the maximum number of threads is integer MAX_ Value, a very large number of threads may be created, even OOM

2.2 @Async disadvantages

@The default asynchronous configuration of Async uses SimpleAsyncTaskExecutor. The thread pool creates one thread for each task by default. If the system continues to create threads, the system will eventually occupy too much memory and cause OutOfMemoryError error.

To solve the problem of thread creation, SimpleAsyncTaskExecutor provides a current limiting mechanism. The switch is controlled through the concurrencyLimit attribute. When concurrencyLimit > = 0, the current limiting mechanism is turned on. By default, the current limiting mechanism is turned off, that is, concurrencyLimit=-1. When it is turned off, new threads will be created to process tasks. Based on the default configuration, SimpleAsyncTaskExecutor is not a thread pool in the strict sense and cannot achieve the function of thread reuse

3, @ Async custom thread pool

1. Introduction

The user-defined thread pool can control the thread pool in a more fine-grained way, which is convenient to adjust the size configuration of the thread pool, and the thread performs exception control and handling. When setting the system custom thread pool to replace the default thread pool, although it can be set through multiple modes, there is and can only be one thread pool generated by replacing the default thread pool (multiple classes cannot be set to inherit asyncconfigurator). The custom thread pool has the following modes:

  • Re implement the interface AsyncConfigurer
  • Inherit AsyncConfigurerSupport
  • Configure a custom TaskExecutor to replace the built-in task executor

By checking the default calling rules of Spring source code about @ Async, the class that implements the AsyncConfigurer interface in the source code will be queried first. The class that implements this interface is AsyncConfigurerSupport** However, the default configured thread pool and asynchronous processing method are empty, so you need to specify a thread pool whether you inherit or re implement the interface** And re implement the public executor getasyncexecution() method.

2. Several ways for Spring to customize asynchronous thread pool

2.1 configure application yml

Here I use the configuration file injection method. First, configure the configuration file

# Configure the number of core threads
async:
  executor:
    thread:
      core_pool_size: 5
      # Configure the maximum number of threads
      max_pool_size: 5
      # Configure queue size
      queue_capacity: 999
      # Configure the maximum idle time of threads
      keep_alive_seconds: 60
      # Configure the name prefix of threads in the thread pool
      name:
        prefix: test-async-

2.2 implementation interface AsyncConfigurer

@Configuration
@EnableAsync
public class ExecutorConfig1 implements AsyncConfigurer {

    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("open SpringBoot Thread pool!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // Set the number of core threads
        executor.setCorePoolSize(corePoolSize);
        // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full
        executor.setMaxPoolSize(maxPoolSize);
        // Set buffer queue size
        executor.setQueueCapacity(queueCapacity);
        // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located
        executor.setThreadNamePrefix(namePrefix);
        // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads
        // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself,
        // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // Thread pool initialization
        executor.initialize();

        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncServiceExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> logger.error(String.format("Perform asynchronous tasks'%s'", method), ex);
    }
}

2.3 inherit AsyncConfigurerSupport

@Configuration
@EnableAsync
public class ExecutorConfig2 extends AsyncConfigurerSupport {


    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("open SpringBoot Thread pool!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // Set the number of core threads
        executor.setCorePoolSize(corePoolSize);
        // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full
        executor.setMaxPoolSize(maxPoolSize);
        // Set buffer queue size
        executor.setQueueCapacity(queueCapacity);
        // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located
        executor.setThreadNamePrefix(namePrefix);
        // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads
        // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself,
        // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // Thread pool initialization
        executor.initialize();

        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncServiceExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> logger.error(String.format("Perform asynchronous tasks'%s'", method), ex);
    }
}

2.4 configuring a custom TaskExecutor

Since the default thread pool of asyncconfigurator is empty in the source code, Spring uses beanfactory GetBean (TaskExecutor. Class) first checks whether there is a wired process pool. If it is not configured, use beanfactory GetBean (default_task_executor_bean_name, executor. Class), and query whether there is a thread pool with the default name of TaskExecutor.

Therefore, when replacing the default thread pool, you need to set the default thread pool name as TaskExecutor. In this mode, the bottom layer is TaskExecutor Class. When replacing the default thread pool, the thread pool name may not be specified.

@Configuration
@EnableAsync
public class ExecutorConfig3 {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
    public Executor taskExecutor() {
        logger.info("open SpringBoot Thread pool!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // Set the number of core threads
        executor.setCorePoolSize(corePoolSize);
        // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full
        executor.setMaxPoolSize(maxPoolSize);
        // Set buffer queue size
        executor.setQueueCapacity(queueCapacity);
        // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located
        executor.setThreadNamePrefix(namePrefix);
        // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads
        // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself,
        // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // Thread pool initialization
        executor.initialize();

        return executor;
    }
    
    @Bean(name = "myTask")
    public Executor taskExecutor() {
        logger.info("open SpringBoot Thread pool!");

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // Set the number of core threads
        executor.setCorePoolSize(corePoolSize);
        // Set the maximum number of threads. Threads exceeding the number of core threads will be applied only after the buffer queue is full
        executor.setMaxPoolSize(maxPoolSize);
        // Set buffer queue size
        executor.setQueueCapacity(queueCapacity);
        // Set the maximum idle time of threads. Threads that exceed the number of core threads will be destroyed after the idle time arrives
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // Set the prefix of thread name. After setting, it is convenient for us to locate the thread pool where the processing task is located
        executor.setThreadNamePrefix(namePrefix);
        // Set rejection policy: how to handle new tasks when the thread pool reaches the maximum number of threads
        // CALLER_RUNS: when adding to the thread pool fails, the main thread will execute this task by itself,
        // When the thread pool has no processing capacity, the policy will directly run the rejected task in the calling thread of the execute method; If the executor is closed, the task is discarded
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // Thread pool initialization
        executor.initialize();

        return executor;
    }

}

2.5 multithreading

@Async annotation uses the system default or user-defined thread pool (instead of the default thread pool), or multiple thread pools can be set in the project. When calling asynchronously, indicate the name of the thread pool to be called, such as @ Async("mytask").

Reference articles

https://mp.weixin.qq.com/s/ACJgGFofD9HAYqW4u8qJUw

https://juejin.cn/post/6970927877348917261#heading-0

https://www.cnblogs.com/kenx/p/15268311.html

Keywords: Java Spring Boot Back-end

Added by kid_drew on Sun, 13 Feb 2022 02:08:19 +0200