Distributed timing task -- XXL job learning: source code analysis -- actuator startup process

Distributed timing task -- XXL job learning (2): source code analysis -- actuator startup process

preface

Continued: XXL job learning (1): simple demo building

As you can see from the previous article, building a simple distributed demo task scheduling project is mainly composed of three parts:

  1. Configure and start task scheduling center (XXL job admin)
  2. Configure and start the business system (actuator)
  3. Configure the executor and task in the scheduling center web page

In this article, we first conduct in-depth analysis from the source code of configuration and startup of the actuator of the business system.
xxl.job.version Using version 2.2.1-SNAPSHOT

1, Start of actuator

In business timing task system

  1. Introducing dependency configuration of XXL job
  2. Add actuator component configuration class XxlJobConfig.java , where the core class XxlJobSpringExecutor is configured
  3. A new jobhandler class is added, in which there are methods annotated with @ xxxjob ("XXX")

1.1 analysis of the core class XxlJobSpringExecutor

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
 
 	@Override
    public void afterSingletonsInstantiated() {
		//. . . . . . . .  Omit the specific content of this method temporarily
    }

	@Override
    public void destroy() {
        super.destroy();
    }

	 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
		//. . . . . . . .  Omit the specific content of this method temporarily
	}

	private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}

In the source code, we can see that this class inherits the XxlJobExecutor class and implements ApplicationContextAware, smartinitializingsingsingleton and DisposableBean.

When this object is initialized, the afterSingletonsInstantiated() method is called.

    @Override
    public void afterSingletonsInstantiated() {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
  1. initJobHandlerRepository() and initJobHandlerMethodRepository() are used to save the tasks configured in the project in the memory of the project, using concurrentmap < string, ijobhandler > to save, using the spring bean's id as key, and the specific task instance object as value. ;
  2. Refresh GlueFactory(glue execution factory), refresh it to spring gluefactory, and use spring to load the corresponding instance when executing tasks in glue mode.
  3. The start() method in the core of the executor's XxlJobExecutor is called.

1.1.1 initJobHandlerRepository()

This method is a Java class used to register bean s with @ JobHandler annotation in the old version, which is not supported in 2.2.1-SNAPSHOT version;

1.1.2 initJobHandlerMethodRepository()

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // init job handler from method
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   
        // referred to :  org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }

        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method method = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            if (xxlJob == null) {
                continue;
            }

            String name = xxlJob.value();
            if (name.trim().length() == 0) {
                throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
            }
            if (loadJobHandler(name) != null) {
                throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
            }

            // execute method
            if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            method.setAccessible(true);

            // init and destory
            Method initMethod = null;
            Method destroyMethod = null;

            if (xxlJob.init().trim().length() > 0) {
                try {
                    initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                    initMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }
            if (xxlJob.destroy().trim().length() > 0) {
                try {
                    destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                    destroyMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }

            // Register jobhandler saves the current scheduled task instance to 'concurrentmap < string, ijobhandler >'.
            registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
        }
    }

}

analysis:

  1. Get all bean objects from applicationContext;
  2. Use the selectMethods method of MethodIntrospector tool class and the MetadataLookup interface to get map < method, xxljob > (learn the source code of the core method selectMethods of this tool class below)
public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) {
	final Map<Method, T> methodMap = new LinkedHashMap<>();
	Set<Class<?>> handlerTypes = new LinkedHashSet<>();
	Class<?> specificHandlerType = null;
	//Determine whether it is a proxy class
	if (!Proxy.isProxyClass(targetType)) {
		//If it's a proxy class, find the actual type
		specificHandlerType = ClassUtils.getUserClass(targetType);
		handlerTypes.add(specificHandlerType);
	}
	handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType));
	//Traverse all found class objects
	for (Class<?> currentHandlerType : handlerTypes) {
		final Class<?> targetClass = (specificHandlerType != null ? specificHandlerType : currentHandlerType);
		
		ReflectionUtils.doWithMethods(currentHandlerType, method -> {
			//Get the specified method
			Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass);
			//Get the metadata associated with the method, generally referring to the annotation
			T result = metadataLookup.inspect(specificMethod);
			if (result != null) {
				Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
				if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) {
					methodMap.put(specificMethod, result);
				}
			}
		}, ReflectionUtils.USER_DECLARED_METHODS);
	}

	return methodMap;
}
  1. The map < method, xxljob > obtained in the second step of the loop, where key is the annotation id and value is the annotation metadata.
  2. Verify the name attribute of annotation metadata. If it is empty, an exception will be thrown;
  3. According to the name, get the corresponding task instance from the memory concurrentmap < string, ijobhandler > (this is actually the warehouse of all tasks stored at the time of registration). If it already exists, throw an exception (task conflict);
  4. The validation input parameter must be String param, because 2.2.1-SNAPSHOT specifies the development Job method. The format of the method is required to be "public return < string > execute (String param).".
  5. The check out parameter must be in the format of return < string >;
  6. Inject init() and destroy() methods configured in metadata;
  7. Save the current scheduled task instance to concurrentmap < string, ijobhandler >.

1.1.3 GlueFactory.refreshInstance(1)

Refresh GlueFactory to spring GlueFactory, and use spring to load the corresponding instance when executing the task in glue mode.

1.1.4 super.start()

call XxlJobExecutor.start() .

1.2 analyze the core class xxxljobexecutor

The properties of the XxlJobExecutor are:

    // ---------------------- param ----------------------
    private String adminAddresses;
    private String accessToken;
    private String appname;
    private String address;
    private String ip;
    private int port;
    private String logPath;
    private int logRetentionDays;

The previous step introduced that the final XxlJobSpringExecutor will call the start() method of the XxlJobExecutor. Next, let's continue to see what this method does:

public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
}

1.2.1 XxlJobFileAppender.initLogPath(logPath)

logPath is in our configuration executor component xxl.job.executor.logpath log path.

public static void initLogPath(String logPath){
	// init
	if (logPath!=null && logPath.trim().length()>0) {
		logBasePath = logPath;
	}
	// mk base dir
	File logPathDir = new File(logBasePath);
	if (!logPathDir.exists()) {
		logPathDir.mkdirs();
	}
	logBasePath = logPathDir.getPath();

	// mk glue dir
	File glueBaseDir = new File(logPathDir, "gluesource");
	if (!glueBaseDir.exists()) {
		glueBaseDir.mkdirs();
	}
	glueSrcPath = glueBaseDir.getPath();
}
  1. If the log path is configured, then logBasePath is the address in our configuration file;
  2. Determine whether the log path exists. If not, create a log directory;
  3. Generate the gluesource subfolder;

1.2.2 initAdminBizList(adminAddresses, accessToken)

    // ---------------------- admin-client (rpc invoker) ----------------------
    private static List<AdminBiz> adminBizList;
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {

                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }
    public static List<AdminBiz> getAdminBizList(){
        return adminBizList;
    }

This method initializes AdminBizClient according to the deployment of dispatching center and the communication with address adminAddresses and executor TOKENaccessToken. There are three core methods of AdminBizClient

    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }

Provides methods for callback, registry, and registry remove to the dispatch center.

1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)

This method is to initialize the log cleaning thread and automatically clean the expired logs (clean the log files N days ago).

public class JobLogFileCleanThread {
    private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);

    private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
    public static JobLogFileCleanThread getInstance(){
        return instance;
    }

    private Thread localThread;
    private volatile boolean toStop = false;
    public void start(final long logRetentionDays){

        // limit min value
        if (logRetentionDays < 3 ) {
            return;
        }

        localThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        // clean log dir, over logRetentionDays
                        File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                        if (childDirs!=null && childDirs.length>0) {

                            // today
                            Calendar todayCal = Calendar.getInstance();
                            todayCal.set(Calendar.HOUR_OF_DAY,0);
                            todayCal.set(Calendar.MINUTE,0);
                            todayCal.set(Calendar.SECOND,0);
                            todayCal.set(Calendar.MILLISECOND,0);

                            Date todayDate = todayCal.getTime();

                            for (File childFile: childDirs) {

                                // valid
                                if (!childFile.isDirectory()) {
                                    continue;
                                }
                                if (childFile.getName().indexOf("-") == -1) {
                                    continue;
                                }

                                // file create date
                                Date logFileCreateDate = null;
                                try {
                                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                    logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                                } catch (ParseException e) {
                                    logger.error(e.getMessage(), e);
                                }
                                if (logFileCreateDate == null) {
                                    continue;
                                }

                                if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                    FileUtil.deleteRecursively(childFile);
                                }

                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try {
                        TimeUnit.DAYS.sleep(1);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");

            }
        });
        localThread.setDaemon(true);
        localThread.setName("xxl-job, executor JobLogFileCleanThread");
        localThread.start();
    }

    public void toStop() {
        toStop = true;

        if (localThread == null) {
            return;
        }

        // interrupt and wait
        localThread.interrupt();
        try {
            localThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

}

We focus on the start() method:

  1. The number of days to save the actuator log file configured by the actuator component must be greater than 3, otherwise, it will not be cleaned;
  2. Create a daemons thread, execute once a day( TimeUnit.DAYS.sleep(1););
  3. Obtain all date file directories under the log path root directory;
  4. Cycle to determine whether the time difference between the current time (0:00, 0 minute, 0 second and 0 millisecond of the day) and the "yyyy MM DD" corresponding to the date directory is greater than the configured number of days to save the actuator log file;
(todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) 
  1. If the number of days of log saving is exceeded, delete the time directory and all files in the directory.

1.2.4 TriggerCallbackThread.getInstance().start()

public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // normal callback
            while(!toStop){
                try {
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // callback list param
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // callback, will retry if error
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

            // last callback
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();


    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(!toStop){
                try {
                    retryFailCallbackFile();
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
        }
    });
    triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();

}

analysis:

  1. Check whether the dispatch center is configured, if not, an exception will be thrown;
  2. Create a new callback thread;
  3. If the actuator is running normally, get a callback in parameter object from the task execution result callback queue linkedblockingqueue < handlecallbackparam > callbackqueue
    HandleCallbackParam callback = getInstance().callBackQueue.take();
    
    If the callback is not empty, use the drawto() method to obtain the callback input parameter set in batch, and call the doCallback(callbackParamList) method;
  4. If the actuator terminates, directly use the drawto() method to obtain the callback input parameter set in the callback queue in batches. If the callback input parameter set is not empty, call the doCallback(callbackParamList) method;
  5. Analyze the following doCallback(callbackParamList) methods:
        private void doCallback(List<HandleCallbackParam> callbackParamList){
            boolean callbackRet = false;
            // callback, will retry if error
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
                    if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                        callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                        callbackRet = true;
                        break;
                    } else {
                        callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
                    }
                } catch (Exception e) {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
                }
            }
            if (!callbackRet) {
                appendFailCallbackFile(callbackParamList);
            }
        }
    
    Cycle all AdminBiz, call the callback(callbackParamList) method to execute the callback, call the callbackLog() method to record the current task execution log and generate the log file, and call the appendFailCallbackFile(callbackParamList) method if there is an exception.
    private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
        // valid
        if (callbackParamList==null || callbackParamList.size()==0) {
            return;
        }
    
        // append file
        byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
    
        File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
        if (callbackLogFile.exists()) {
            for (int i = 0; i < 100; i++) {
                callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
                if (!callbackLogFile.exists()) {
                    break;
                }
            }
        }
        FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
    }
    
    • If the callback input parameter set is not empty, use the JdkSerializeTool tool tool class to serialize the set as byte [];
    • Create the callback log directory in the log root directory, and generate the callback failure record file, XXL job callback - {x}. Log, where {x} is the current timestamp;
    • Determine whether the current file exists. If it exists, generate XXL job callback - {x} - i.log (I is a value from 0 to 100)
    • Write byte [] to the callback failure record file.
  6. Create a new callback retry daemons, execute every 30 seconds
    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  7. When the executor is still running, the callback retry daemons call the retryFailCallbackFile() method.
    private void retryFailCallbackFile(){
    
        // valid
        File callbackLogPath = new File(failCallbackFilePath);
        if (!callbackLogPath.exists()) {
            return;
        }
        if (callbackLogPath.isFile()) {
            callbackLogPath.delete();
        }
        if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
            return;
        }
    
        // load and clear file, retry
        for (File callbaclLogFile: callbackLogPath.listFiles()) {
            byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
            List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
    
            callbaclLogFile.delete();
            doCallback(callbackParamList);
        }
    
    }
    
    analysis:
    • Judge whether the callback failure log record directory exists, and if it does not exist, jump out of the method;
    • If there is no sub file in the callback failure log directory, the method will pop up;
    • Loop each sub file, use the JdkSerializeTool tool class to change the byte [] read out in the file to callback into the parameter object collection list < handlecallbackparam >, delete the corresponding log record file, and call the doCallback(callbackParamList) method to execute the callback.

1.2.5 initEmbedServer(address, ip, port, appname, accessToken)

    // ---------------------- executor-server (rpc provider) ----------------------
    private EmbedServer embedServer = null;

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

        // generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address: default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }

    private void stopEmbedServer() {
        // stop provider factory
        try {
            embedServer.stop();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

analysis:

  1. Initialize the IP address and port of the actuator. If the IP is not configured, the default is the address of the current service. If the port is not configured, the default is 9999;
  2. Build service call address "http: / {IP"_ port}/"
  3. call embedServer.start(address, port, appname, accessToken) method.

1.3 analysis EmbedServer.start(address, port, appname, accessToken)

public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {

        @Override
        public void run() {

            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });


            try {
                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } else {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                }
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

1.3.1 ExecutorBiz Impl: the implementation class of executorbiz scheduling service

It defines the methods of heartbeat detection, busy detection, triggering task, terminating task and viewing execution log

1.3.1.1 heartbeat detection

    @Override
    public ReturnT<String> beat() {
        return ReturnT.SUCCESS;
    }

1.3.1.2 busy detection

    @Override
    public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) {

        // isRunningOrHasQueue
        boolean isRunningOrHasQueue = false;
        JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId());
        if (jobThread != null && jobThread.isRunningOrHasQueue()) {
            isRunningOrHasQueue = true;
        }

        if (isRunningOrHasQueue) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
        }
        return ReturnT.SUCCESS;
    }

analysis:

  1. Obtain the corresponding task thread from the task thread warehouse concurrentmap < integer, jobthread > jobthreadreposition according to the jobId;
  2. If the task thread object is not empty and running or in the queue, it is considered busy.

1.3.1.3 trigger task

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old: jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid: jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect: "+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect: " + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // push data to queue
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

analysis:

  1. Obtain the corresponding task thread from the task thread warehouse concurrentmap < integer, jobthread > jobthreadreposition according to the jobId;
  2. Verify the task thread and the specific processing instance of the task according to the task type; (if the task is of "BEAN" type, check the concurrenctmap < string according to the processing instance id of the task, Ijobhandler > jobhandlerrepository gets the specific task processing instance. If the task thread is not empty and the task specific instance is different from the task instance obtained from the task thread, the task type must be changed and the task thread before termination must be terminated.)
  3. If the task thread is not empty, the blocking processing policy of the task is obtained. If the policy is to discard subsequent scheduler, the request will be discarded and returned to failure; if the policy is to override previous scheduling, then the task thread is set to null, and the removal cause is set to "xxxxxx".
  4. If the task thread is null, call XxlJobExecutor.registJobThread() preservation;
    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }
    
        return newJobThread;
    }
    
    Create a new JobThread; judge whether the old value already exists in the task thread warehouse concurrentmap < integer, JobThread > jobthreadreposition; if so, destroy the object and interrupt the thread.
  5. Call the pushTriggerQueue() method to put the input parameter into the execution queue of the task thread.
    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
    	// avoid repeat
    	if (triggerLogIdSet.contains(triggerParam.getLogId())) {
    		logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
    		return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
    	}
    
    	triggerLogIdSet.add(triggerParam.getLogId());
    	triggerQueue.add(triggerParam);
           return ReturnT.SUCCESS;
    }
    
    Judge whether the task is executed repeatedly according to the log record id

1.3.1.4 terminate task

@Override
public ReturnT<String> kill(KillParam killParam) {
    // kill handlerThread, and create new one
    JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId());
    if (jobThread != null) {
        XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job.");
        return ReturnT.SUCCESS;
    }

    return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed.");
}

Get the task thread object according to the jobId, and call the removeJobThread() method to terminate the thread.

public static JobThread removeJobThread(int jobId, String removeOldReason){
    JobThread oldJobThread = jobThreadRepository.remove(jobId);
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();

        return oldJobThread;
    }
    return null;
}

1.3.1.5 view execution log

@Override
public ReturnT<LogResult> log(LogParam logParam) {
    // log filename: logPath/yyyy-MM-dd/9999.log
    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId());

    LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum());
    return new ReturnT<LogResult>(logResult);
}

1.3.2 create a daemons for the dispatching center to call

1.3.2.1 expose the RPC service of a task scheduling to the scheduling center for calling

Using netty_http starts the http service and binds the port you passed in to set the actuator. Using EmbedHttpServerHandler to process tasks in the scheduling center's scheduling executor

1.3.2.2 register the address of the current actuator to the dispatching center

startRegistry(appname, address)

public void startRegistry(final String appname, final String address) {
    // start registry
    ExecutorRegistryThread.getInstance().start(appname, address);
}
1.3.2.2.1 analyze ExecutorRegistryThread class
public void start(final String appname, final String address){

    // valid
    if (appname==null || appname.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    if (!toStop) {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }

            // registry remove
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}
  1. Verify the configuration of appname and dispatching center address adminAddresses of the actuator (cannot be empty);
  2. Create a registry thread (register every 30 seconds);
  3. In the running state, all adminbizclients configured by the circular actuator call the registry() method to initiate registration;
  4. After the service is offline, the registryRemove() method is called to initiate the registration removal.

Keywords: Spring snapshot Java jvm

Added by Azad on Thu, 25 Jun 2020 05:17:05 +0300