[Spring technology practice] @ Async mechanism usage skills and asynchronous annotation source code analysis

 

Premise background

To enable asynchronous tasks:

  1. Add @ Async annotation to method

  2. @ EnableAsync on startup class or configuration class

@EnableAsync

Source code comments

Enables Spring's asynchronous method execution capability, similar to functionality found in Spring's XML namespace.To be used together with @Configuration classes as follows, enabling annotation-driven async processing for an entire Spring application context:

  • @The main purpose of EnableAsync is to start the spring asynchronous actuator, which is somewhat similar to the task tag Configuration in xml before. It needs to be used together with the @ Configuration annotation.

Usage mode

 @Configuration @EnableAsync public class AppConfig {}

MyAsyncBean is a user-defined type with one or more methods annotated with either Spring's @Async annotation, the EJB 3.1 @javax.ejb.Asynchronous annotation, or any custom annotation specified via the annotation() attribute. The aspect is added transparently for any registered bean, for instance via this configuration:

  • NyAsyncBean is one or more user-defined objects modified by @ Async annotation, or EJB 3.1 @javax.ejb.Asynchronous annotation, or user-defined specified annotation operation attributes, which are registered to the functional method of the section.

@Configuration public class AnotherAppConfig {     @Bean     public MyAsyncBean asyncBean() {         return new MyAsyncBean();     } }

By default, Spring will be searching for an associated thread pool definition: either a unique TaskExecutor bean in the context, or an Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.

  • By default, spring will first search for a bean of taskexecutor type or an Executor bean named taskexecutor. There is no Executor using SimpleAsyncTaskExecutor

To customize all this, implement AsyncConfigurer and provide:your own Executor through the getAsyncExecutor() method, and your own AsyncUncaughtExceptionHandler through the getAsyncUncaughtExceptionHandler() method.

Override customized UncaughtExceptionHandler/getAsyncExecutor

  • The AsyncConfigurer interface can be implemented to override getAsyncExecutor to obtain the asynchronous executor, and getAsyncUncaughtExceptionHandler to obtain the asynchronous uncaught exception handler

@Configuration @EnableAsync public class AppConfig implements AsyncConfigurer {
     @Override     public Executor getAsyncExecutor() {         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();         executor.setCorePoolSize(7);         executor.setMaxPoolSize(42);         executor.setQueueCapacity(11);         executor.setThreadNamePrefix("MyExecutor-");         executor.initialize();         return executor;     }
     @Override     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {         return MyAsyncUncaughtExceptionHandler();     } }

If only one item needs to be customized, null can be returned to keep the default settings. Consider also extending from AsyncConfigurerSupport when possible.Note: In the above example the ThreadPoolTaskExecutor is not a fully managed Spring bean. Add the @Bean annotation to the getAsyncExecutor() method if you want a fully managed bean. In such circumstances it is no longer necessary to manually call the executor.initialize() method as this will be invoked automatically when the bean is initialized.

Year on year correlation

 <beans>     <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>     <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>     <bean id="asyncBean" class="com.foo.MyAsyncBean"/>     <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> </beans>

Key notes

The mode() attribute controls how advice is applied: If the mode is AdviceMode.PROXY (the default), then the other attributes control the behavior of the proxying. Please note that proxy mode allows for interception of calls through the proxy only; local calls within the same class cannot get intercepted that way.

  • This shows that @Async must be invoked in different ways.

Note that if the mode() is set to AdviceMode.ASPECTJ, then the value of the proxyTargetClass() attribute will be ignored. Note also that in this case the spring-aspects module JAR must be present on the classpath, with compile-time weaving or load-time weaving applying the aspect to the affected classes. There is no proxy involved in such a scenario; local calls will be intercepted as well. / / of course, it can also be woven in the Aspect mode (the jar required by the spring aspects module needs to be introduced)

Functional analysis

@Async

  • This annotation can be used to mark a method executed asynchronously or to mark a class, indicating that all methods in the class are executed asynchronously.

  • The input parameter is optional, but the return value can only be void or Future.(ListenableFuture interface / completable future class)

  • Future is the actual asynchronous return returned by the agent to track the return value of the asynchronous method. Of course, you can also use the AsyncResult class (which implements the ListenableFuture interface) (both Spring and EJB) or the completable future class

  • Adding to a class means that the whole class is used. Adding to a method will override the settings on the class

  • The value field is used to qualify the name of the Executor that executes the method (custom): Executor or TaskExecutor

@EnableAsync

  • Start the spring asynchronous actuator, which is similar to the task tag Configuration in xml. It needs to be used together with the @ Configuration annotation

  • By default, spring will first search for a bean of taskexecutor type or an Executor bean named taskexecutor. There is no Executor using SimpleAsyncTaskExecutor

  • For the [override customized UncaughtExceptionHandler/getAsyncExecutor] in the above chapter, the AsyncConfigurer interface can be copied, getAsyncExecutor obtains asynchronous executor, and getAsyncUncaughtExceptionHandler obtains asynchronous uncaught exception handler

  • @Configuration

  • The annotation class is basically the same as xml, but the thread name prefix can also be customized by using the annotation class (AppConfig - "getAsyncExecutor -" setThreadNamePrefix "above)

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface EnableAsync {
   /**This attribute is used to support user-defined asynchronous annotation. By default, scan spring's @ Async and EJB3.1's @ code @ javax. EJB. Asynchronous * indicate the 'Async' annotation type to be detected at either class * or method level. * < p > by default, both spring's @ {@ link Async} annotation and the EJB3.1 * {@ code @ javax. EJB. Asynchronous}  annotation will be detected.    * <p>This attribute exists so that developers can provide their own    * custom annotation type to indicate that a method (or all methods of    * a given class) should be invoked asynchronously.    */   Class<? extends Annotation> annotation() default Annotation.class;
   /**Indicates whether CGLIB subclass proxy needs to be created. It is applicable only when AdviceMode=PROXY. Note that when set to true, other spring managed beans will also be upgraded to CGLIB subclass proxy * indicate why subclass based (CGLIB) proxies are to be created as opposed * to standard Java interface based proxies. * < p > < strong > applicable only if the {@ link # mode} is set to {@ link advicemode #proxy} < / strong >. * < p > the default is {@ code false} .    * <p>Note that setting this attribute to {@code true} will affect <em>all</em>    * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.    * For example, other beans marked with Spring's {@code @Transactional}  annotation    * will be upgraded to subclass proxying at the same time. This approach has no    * negative impact in practice unless one is explicitly expecting one type of proxy    * vs. another —  for example, in tests.    */   boolean proxyTargetClass() default false;
   /**Indicate how the asynchronous notification will be implemented. The default PROXY is {@ link advicemode #PROXY}. If you need to support non asynchronous methods in the same class to call another asynchronous method, you need to set it to AspectJ * indicate how async advice should be applied. * < p > < b > the default is {@ link advicemode #PROXY} .</b>    * Please note that PROXY mode allows for interception of calls through the PROXY    * only. Local calls within the same class cannot get intercepted that way;  an    * {@link Async} annotation on such a method within a local call will be ignored    * since Spring's interceptor does not even kick in for such a runtime scenario.    * For a more advanced mode of interception, consider switching this to    * {@link AdviceMode#ASPECTJ}.    */   AdviceMode mode() default AdviceMode.PROXY;
   /**Indicate the order in which the {@ link asyncannotation bean postprocessor} * should be applied. * < p > the default is {@ link ordered #lower_precision} in order to run * after all other post processors,  so that it can add an advisor to    * existing proxies rather than double-proxy.    */   int order() default Ordered.LOWEST_PRECEDENCE;}

Execution process

The core annotation is @ Import(AsyncConfigurationSelector.class), which is the selectImports() method belonging to the ImportSelector interface. The source code is as follows:

Querier: Based on the pattern AdviceMode defined in @ EnableAsync, add it to the class marked @ Configuration to determine the implementation class of the abstract asynchronous Configuration class

/** * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class. * * @author Chris Beams * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
    /**     * {@inheritDoc}     * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for     * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively     */    @Override    public String[] selectImports(AdviceMode adviceMode) {        switch (adviceMode) {            case PROXY://If the PROXY is configured, use proxyasyncconfiguration return new string [] {proxyasyncconfiguration. Class. Getname()}; Case ASPECTJ: / / if ASPECTJ is configured, use proxyasyncconfiguration return new string [] {async_execution_aspect_configuration_class_name}; default:                return null;        }    }}

JDK interface proxy - select a class ProxyAsyncConfiguration:

@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {    @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");  if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {            bpp.setAsyncAnnotationType(customAsyncAnnotation);        }        if (this.executor != null) {            bpp.setExecutor(this.executor);        }        if (this.exceptionHandler != null) {            bpp.setExceptionHandler(this.exceptionHandler);        }        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));        bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));        return bpp;    }}
  • If the user defines the annotation attribute in @ EnableAsync, that is, the asynchronous annotation type

  • Create a new asynchronous annotation bean post processor

  • Set Executor: sets the thread task Executor

  • AsyncUncaughtExceptionHandler: set exception handler

  • Set whether to upgrade to CGLIB subclass proxy. It is not enabled by default

  • Set execution priority, default to last execution

ProxyAsyncConfiguration has two points:

  1. It inherits the AbstractAsyncConfiguration class

  2. Defines a bean: AsyncAnnotationBeanPostProcessor

AbstractAsyncConfiguration

/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * The abstract asynchronous configuration class encapsulates the general structure to support the asynchronous method execution capability of spring * @ author Chris beans * @ author Stephane Nicoll * @ since 3.1 * @ see enableasync */@Configurationpublic abstract class AbstractAsyncConfiguration implements ImportAware {
    protected AnnotationAttributes enableAsync;//Annotation properties of enableAsync
    protected Executor executor;//Thread task executor designed by Doug Lea Lao Li Tou
    protected AsyncUncaughtExceptionHandler exceptionHandler;//Exception handler

    @Override    public void setImportMetadata(AnnotationMetadata importMetadata) {        this.enableAsync = AnnotationAttributes.fromMap(                importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));        if (this.enableAsync == null) {            throw new IllegalArgumentException(                    "@EnableAsync is not present on importing class " + importMetadata.getClassName());        }    }
    /**     * Collect any {@link AsyncConfigurer} beans through autowiring.     */    @Autowired(required = false)    void setConfigurers(Collection<AsyncConfigurer> configurers) {        if (CollectionUtils.isEmpty(configurers)) {            return;        }        if (configurers.size() > 1) {            throw new IllegalStateException("Only one AsyncConfigurer may exist");        }        AsyncConfigurer configurer = configurers.iterator().next();        this.executor = configurer.getAsyncExecutor();        this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();    }}

The main settings are the following three points

  • 1) Annotation properties

  • 2) Asynchronous task executor

  • 3) Exception handler

method:

  • 1) setImportMetadata setting annotation properties

  • 2) setConfigurers sets the asynchronous task executor and exception handler

AsyncAnnotationBeanPostProcessor

During Bean initialization of AsyncAnnotationBeanPostProcessor class: AsyncAnnotationAdvisor asynchronous annotation aspect is constructed in the setBeanFactory method of BeanFactoryAware interface.

@Override    public void setBeanFactory(BeanFactory beanFactory) {        super.setBeanFactory(beanFactory);        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);        if (this.asyncAnnotationType != null) {            advisor.setAsyncAnnotationType(this.asyncAnnotationType);        }        advisor.setBeanFactory(beanFactory);        this.advisor = advisor;    }

AsyncAnnotationAdvisor:

Establish an asynchronous AOP aspect interception operation processing mechanism through AsyncAnnotationAdvisor

  • AsyncAnnotationBeanPostProcessor -> postProcessAfterInitialization()

  • Specific post processing: the post bean processing of AsyncAnnotationBeanPostProcessor is implemented through its parent class abstractadvising beanpostprocessor,

  • This class implements the BeanPostProcessor interface and replicates the postProcessAfterInitialization method

@Override    public Object postProcessAfterInitialization(Object bean, String beanName) {        if (bean instanceof AopInfrastructureBean) {            // Ignore AOP infrastructure such as scoped proxies.            return bean;        }        // Add Advisor to bean ProxyFactory - < advised support - > advised if (bean instanceof advised) {advised advised = (advised) bean; if (! Advised. Isfrozen() & & iseligible (aoutils. Gettargetclass (bean))) {/ / add our local Advisor to the existing proxy's Advisor chain... If (this.beforeExistingAdvisors) {                    advised.addAdvisor(0, this.Advisor);                }                else {                    advised.addAdvisor(this.Advisor);                }                return bean;            }        } //Construct ProxyFactory proxy factory, add proxy interface, set aspect, and finally return proxy class: aopproxy if (iseligible (bean, beanname)) {ProxyFactory proxyfactory = prepareproxyfactory (bean, beanname); if (! ProxyFactory. Isproxytargetclass()) {evaluateproxyinterfaces (bean. Getclass()) , ProxyFactory);            }            ProxyFactory.addAdvisor(this.Advisor);            customizeProxyFactory(ProxyFactory);            return ProxyFactory.getProxy(getProxyClassLoader());        }        // No async proxy needed.        return bean;    }
  • isEligible is used to determine whether this class or a method in this class contains annotations. AsyncAnnotationAdvisor implements the PointcutAdvisor interface.

  • Proxy AopProxy interface. We actually generate JdkDynamicAopProxy here

// Get the interceptor chain of the method list < Object > chain = this.advised.getinterceptorsanddynamicinception advice (method, targetclass); / / check where we have any advice. If we don't, we can fallback on direct / / reflective invocation of the target, and avoid creating a methodinvocation.if (chain. Isempty()) {    // We can skip creating a MethodInvocation: just invoke the target directly    // Note that the final invoker must be an InvokerInterceptor so we know it does    // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.    Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args) ; retval = aoputils.invokejoinpointusingreflection (target, method, argstouse);}else {/ / construct invocation = new reflectivemethodinvocation (proxy, target, method, args, targetclass, chain); / / proceed to the joinpoint through the interceptor chain. Retval = invocation. Proceed();}
  • @The interceptor of Async annotation is AsyncExecutionInterceptor, which inherits the MethodInterceptor interface, and MethodInterceptor is the Advice (pointcut processor) in AOP specification.

  • If the chain is not empty, execute the second branch, construct ReflectiveMethodInvocation, and then execute the processed method.

@Override    public Object proceed() throws Throwable {        //    If there is no interceptor, directly execute the proxy method if (this. Currentinterceptorindex = = this. Interceptorsanddynamicmethodmatchers. Size() - 1) {return invokejoinpoint();} object interceptorinterceptionadvice = this. Interceptorsanddynamicmethodmatchers. Get (+ + this. Currentinterceptorindex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {            // Evaluate dynamic method matcher here: static part will already have            // been evaluated and found to match.            InterceptorAndDynamicMethodMatcher dm =                    (InterceptorAndDynamicMethodMatcher)  interceptorOrInterceptionAdvice;            if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {                return dm.interceptor.invoke(this);            }            else  {                // Dynamic matching failed.                // Skip this interceptor and invoke the next in the chain.                return proceed();            }        }        else  {            // It's an interceptor, so we just invoke it: The pointcut will have            // been evaluated statically before this object was constructed.            return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);        }    }

The core method is InterceptorAndDynamicMethodMatcher.interceptor.invoke(this), which actually executes AsyncExecutionInterceptor.invoke.

public Object invoke(final MethodInvocation invocation) throws Throwable {        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);        if (executor == null) {            throw new IllegalStateException(//If there is no custom asynchronous task executor, the following line of error is reported. Do not worry. You can execute "No executor specified and no default executor set on AsyncExecutionInterceptor either" by default;}
        Callable<Object> task = new Callable<Object>() {            @Override            public Object call() throws Exception {                try {                    Object result = invocation.proceed();                    if (result instanceof Future) {                        return ((Future<?>) result).get();//Blocking waits for execution to complete and gets the result}} catch (executionexception Ex) {handleerror (ex.getcause(), userdeclaredmethod, invocation. Getarguments());} catch (throwable Ex) {handleerror (ex, userdeclaredmethod, invocation. Getarguments) ());} return null;}}; / / submit a task to the executor return doSubmit(task, executor, invocation.getMethod().getReturnType());}

Execute the core method doSubmit

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {        if (completableFuturePresent) {//First judge whether there is a class of completable future, and use completable future to execute tasks first. Future < Object > result = completable futuredelete.processcompletable future (ReturnType, task, executor); if (result! = null) {return result;} }//The return value is the future that can be listened to. The callback function has been defined: addcallback if (listenablefuture. Class. Isassignablefrom (ReturnType)) {return ((asynclistenabletaskexecutor) executor). Submitlistenable (task);} / / the return value is future else if (future. Class. Isassignablefrom (ReturnType)) {return executor.submit (task);} else {/ / no return value executor.submit (task); return null;}}

Overall process

  • Start with the annotation: @ enableasync - > proxyasyncconfiguration class constructs a bean (type: AsyncAnnotationBeanPostProcessor)

  • Go from the bean life cycle of AsyncAnnotationBeanPostProcessor: AOP advisor aspect initialization (setBeanFactory()) - > AOP - generate proxy class AopProxy (postProcessAfterInitialization()) - > AOP pointcut execution (InvocationHandler.invoke)

  Also interested in other technologies, scan the QR code below and add assistant wechat:

 

 

 

Keywords: Java Spring

Added by Leviathan on Wed, 13 Oct 2021 02:24:38 +0300