Distributed timing task -- XXL job learning (2): source code analysis -- actuator startup process
- preface
- 1, Start of actuator
- 1.1 analysis of the core class XxlJobSpringExecutor
- 1.1.1 initJobHandlerRepository()
- 1.1.2 initJobHandlerMethodRepository()
- 1.1.3 GlueFactory.refreshInstance(1)
- 1.1.4 super.start()
- 1.2 analyze the core class xxxljobexecutor
- 1.2.1 XxlJobFileAppender.initLogPath(logPath)
- 1.2.2 initAdminBizList(adminAddresses, accessToken)
- 1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)
- 1.2.4 TriggerCallbackThread.getInstance().start()
- 1.2.5 initEmbedServer(address, ip, port, appname, accessToken)
- 1.3 analysis EmbedServer.start(address, port, appname, accessToken)
preface
Continued: XXL job learning (1): simple demo building
As you can see from the previous article, building a simple distributed demo task scheduling project is mainly composed of three parts:
- Configure and start task scheduling center (XXL job admin)
- Configure and start the business system (actuator)
- Configure the executor and task in the scheduling center web page
In this article, we first conduct in-depth analysis from the source code of configuration and startup of the actuator of the business system.
xxl.job.version Using version 2.2.1-SNAPSHOT
1, Start of actuator
In business timing task system
- Introducing dependency configuration of XXL job
- Add actuator component configuration class XxlJobConfig.java , where the core class XxlJobSpringExecutor is configured
- A new jobhandler class is added, in which there are methods annotated with @ xxxjob ("XXX")
1.1 analysis of the core class XxlJobSpringExecutor
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean { @Override public void afterSingletonsInstantiated() { //. . . . . . . . Omit the specific content of this method temporarily } @Override public void destroy() { super.destroy(); } private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { //. . . . . . . . Omit the specific content of this method temporarily } private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } }
In the source code, we can see that this class inherits the XxlJobExecutor class and implements ApplicationContextAware, smartinitializingsingsingleton and DisposableBean.
When this object is initialized, the afterSingletonsInstantiated() method is called.
@Override public void afterSingletonsInstantiated() { // init JobHandler Repository /*initJobHandlerRepository(applicationContext);*/ // init JobHandler Repository (for method) initJobHandlerMethodRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); // super start try { super.start(); } catch (Exception e) { throw new RuntimeException(e); } }
- initJobHandlerRepository() and initJobHandlerMethodRepository() are used to save the tasks configured in the project in the memory of the project, using concurrentmap < string, ijobhandler > to save, using the spring bean's id as key, and the specific task instance object as value. ;
- Refresh GlueFactory(glue execution factory), refresh it to spring gluefactory, and use spring to load the corresponding instance when executing tasks in glue mode.
- The start() method in the core of the executor's XxlJobExecutor is called.
1.1.1 initJobHandlerRepository()
This method is a Java class used to register bean s with @ JobHandler annotation in the old version, which is not supported in 2.2.1-SNAPSHOT version;
1.1.2 initJobHandlerMethodRepository()
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // init job handler from method String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true); for (String beanDefinitionName : beanDefinitionNames) { Object bean = applicationContext.getBean(beanDefinitionName); Map<Method, XxlJob> annotatedMethods = null; // referred to : org.springframework.context.event.EventListenerMethodProcessor.processBean try { annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MethodIntrospector.MetadataLookup<XxlJob>() { @Override public XxlJob inspect(Method method) { return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); } }); } catch (Throwable ex) { logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex); } if (annotatedMethods==null || annotatedMethods.isEmpty()) { continue; } for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) { Method method = methodXxlJobEntry.getKey(); XxlJob xxlJob = methodXxlJobEntry.getValue(); if (xxlJob == null) { continue; } String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } // execute method if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } method.setAccessible(true); // init and destory Method initMethod = null; Method destroyMethod = null; if (xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } if (xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } // Register jobhandler saves the current scheduled task instance to 'concurrentmap < string, ijobhandler >'. registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } } }
analysis:
- Get all bean objects from applicationContext;
- Use the selectMethods method of MethodIntrospector tool class and the MetadataLookup interface to get map < method, xxljob > (learn the source code of the core method selectMethods of this tool class below)
public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) { final Map<Method, T> methodMap = new LinkedHashMap<>(); Set<Class<?>> handlerTypes = new LinkedHashSet<>(); Class<?> specificHandlerType = null; //Determine whether it is a proxy class if (!Proxy.isProxyClass(targetType)) { //If it's a proxy class, find the actual type specificHandlerType = ClassUtils.getUserClass(targetType); handlerTypes.add(specificHandlerType); } handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType)); //Traverse all found class objects for (Class<?> currentHandlerType : handlerTypes) { final Class<?> targetClass = (specificHandlerType != null ? specificHandlerType : currentHandlerType); ReflectionUtils.doWithMethods(currentHandlerType, method -> { //Get the specified method Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass); //Get the metadata associated with the method, generally referring to the annotation T result = metadataLookup.inspect(specificMethod); if (result != null) { Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) { methodMap.put(specificMethod, result); } } }, ReflectionUtils.USER_DECLARED_METHODS); } return methodMap; }
- The map < method, xxljob > obtained in the second step of the loop, where key is the annotation id and value is the annotation metadata.
- Verify the name attribute of annotation metadata. If it is empty, an exception will be thrown;
- According to the name, get the corresponding task instance from the memory concurrentmap < string, ijobhandler > (this is actually the warehouse of all tasks stored at the time of registration). If it already exists, throw an exception (task conflict);
- The validation input parameter must be String param, because 2.2.1-SNAPSHOT specifies the development Job method. The format of the method is required to be "public return < string > execute (String param).".
- The check out parameter must be in the format of return < string >;
- Inject init() and destroy() methods configured in metadata;
- Save the current scheduled task instance to concurrentmap < string, ijobhandler >.
1.1.3 GlueFactory.refreshInstance(1)
Refresh GlueFactory to spring GlueFactory, and use spring to load the corresponding instance when executing the task in glue mode.
1.1.4 super.start()
call XxlJobExecutor.start() .
1.2 analyze the core class xxxljobexecutor
The properties of the XxlJobExecutor are:
// ---------------------- param ---------------------- private String adminAddresses; private String accessToken; private String appname; private String address; private String ip; private int port; private String logPath; private int logRetentionDays;
The previous step introduced that the final XxlJobSpringExecutor will call the start() method of the XxlJobExecutor. Next, let's continue to see what this method does:
public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server initEmbedServer(address, ip, port, appname, accessToken); }
1.2.1 XxlJobFileAppender.initLogPath(logPath)
logPath is in our configuration executor component xxl.job.executor.logpath log path.
public static void initLogPath(String logPath){ // init if (logPath!=null && logPath.trim().length()>0) { logBasePath = logPath; } // mk base dir File logPathDir = new File(logBasePath); if (!logPathDir.exists()) { logPathDir.mkdirs(); } logBasePath = logPathDir.getPath(); // mk glue dir File glueBaseDir = new File(logPathDir, "gluesource"); if (!glueBaseDir.exists()) { glueBaseDir.mkdirs(); } glueSrcPath = glueBaseDir.getPath(); }
- If the log path is configured, then logBasePath is the address in our configuration file;
- Determine whether the log path exists. If not, create a log directory;
- Generate the gluesource subfolder;
1.2.2 initAdminBizList(adminAddresses, accessToken)
// ---------------------- admin-client (rpc invoker) ---------------------- private static List<AdminBiz> adminBizList; private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList<AdminBiz>(); } adminBizList.add(adminBiz); } } } } public static List<AdminBiz> getAdminBizList(){ return adminBizList; }
This method initializes AdminBizClient according to the deployment of dispatching center and the communication with address adminAddresses and executor TOKENaccessToken. There are three core methods of AdminBizClient
@Override public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); } @Override public ReturnT<String> registry(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class); } @Override public ReturnT<String> registryRemove(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class); }
Provides methods for callback, registry, and registry remove to the dispatch center.
1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)
This method is to initialize the log cleaning thread and automatically clean the expired logs (clean the log files N days ago).
public class JobLogFileCleanThread { private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class); private static JobLogFileCleanThread instance = new JobLogFileCleanThread(); public static JobLogFileCleanThread getInstance(){ return instance; } private Thread localThread; private volatile boolean toStop = false; public void start(final long logRetentionDays){ // limit min value if (logRetentionDays < 3 ) { return; } localThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // clean log dir, over logRetentionDays File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles(); if (childDirs!=null && childDirs.length>0) { // today Calendar todayCal = Calendar.getInstance(); todayCal.set(Calendar.HOUR_OF_DAY,0); todayCal.set(Calendar.MINUTE,0); todayCal.set(Calendar.SECOND,0); todayCal.set(Calendar.MILLISECOND,0); Date todayDate = todayCal.getTime(); for (File childFile: childDirs) { // valid if (!childFile.isDirectory()) { continue; } if (childFile.getName().indexOf("-") == -1) { continue; } // file create date Date logFileCreateDate = null; try { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); logFileCreateDate = simpleDateFormat.parse(childFile.getName()); } catch (ParseException e) { logger.error(e.getMessage(), e); } if (logFileCreateDate == null) { continue; } if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { FileUtil.deleteRecursively(childFile); } } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.DAYS.sleep(1); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory."); } }); localThread.setDaemon(true); localThread.setName("xxl-job, executor JobLogFileCleanThread"); localThread.start(); } public void toStop() { toStop = true; if (localThread == null) { return; } // interrupt and wait localThread.interrupt(); try { localThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } }
We focus on the start() method:
- The number of days to save the actuator log file configured by the actuator component must be greater than 3, otherwise, it will not be cleaned;
- Create a daemons thread, execute once a day( TimeUnit.DAYS.sleep(1););
- Obtain all date file directories under the log path root directory;
- Cycle to determine whether the time difference between the current time (0:00, 0 minute, 0 second and 0 millisecond of the day) and the "yyyy MM DD" corresponding to the date directory is greater than the configured number of days to save the actuator log file;
(todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)
- If the number of days of log saving is exceeded, delete the time directory and all files in the directory.
1.2.4 TriggerCallbackThread.getInstance().start()
public void start() { // valid if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); }
analysis:
- Check whether the dispatch center is configured, if not, an exception will be thrown;
- Create a new callback thread;
- If the actuator is running normally, get a callback in parameter object from the task execution result callback queue linkedblockingqueue < handlecallbackparam > callbackqueue
If the callback is not empty, use the drawto() method to obtain the callback input parameter set in batch, and call the doCallback(callbackParamList) method;HandleCallbackParam callback = getInstance().callBackQueue.take();
- If the actuator terminates, directly use the drawto() method to obtain the callback input parameter set in the callback queue in batches. If the callback input parameter set is not empty, call the doCallback(callbackParamList) method;
- Analyze the following doCallback(callbackParamList) methods:
Cycle all AdminBiz, call the callback(callbackParamList) method to execute the callback, call the callbackLog() method to record the current task execution log and generate the log file, and call the appendFailCallbackFile(callbackParamList) method if there is an exception.private void doCallback(List<HandleCallbackParam> callbackParamList){ boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } }
private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){ // valid if (callbackParamList==null || callbackParamList.size()==0) { return; } // append file byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList); File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); if (callbackLogFile.exists()) { for (int i = 0; i < 100; i++) { callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); if (!callbackLogFile.exists()) { break; } } } FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); }
- If the callback input parameter set is not empty, use the JdkSerializeTool tool tool class to serialize the set as byte [];
- Create the callback log directory in the log root directory, and generate the callback failure record file, XXL job callback - {x}. Log, where {x} is the current timestamp;
- Determine whether the current file exists. If it exists, generate XXL job callback - {x} - i.log (I is a value from 0 to 100)
- Write byte [] to the callback failure record file.
- Create a new callback retry daemons, execute every 30 seconds
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
- When the executor is still running, the callback retry daemons call the retryFailCallbackFile() method.
analysis:private void retryFailCallbackFile(){ // valid File callbackLogPath = new File(failCallbackFilePath); if (!callbackLogPath.exists()) { return; } if (callbackLogPath.isFile()) { callbackLogPath.delete(); } if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) { return; } // load and clear file, retry for (File callbaclLogFile: callbackLogPath.listFiles()) { byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); callbaclLogFile.delete(); doCallback(callbackParamList); } }
- Judge whether the callback failure log record directory exists, and if it does not exist, jump out of the method;
- If there is no sub file in the callback failure log directory, the method will pop up;
- Loop each sub file, use the JdkSerializeTool tool class to change the byte [] read out in the file to callback into the parameter object collection list < handlecallbackparam >, delete the corresponding log record file, and call the doCallback(callbackParamList) method to execute the callback.
1.2.5 initEmbedServer(address, ip, port, appname, accessToken)
// ---------------------- executor-server (rpc provider) ---------------------- private EmbedServer embedServer = null; private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { // fill ip port port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address: default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // start embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); } private void stopEmbedServer() { // stop provider factory try { embedServer.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } }
analysis:
- Initialize the IP address and port of the actuator. If the IP is not configured, the default is the address of the current service. If the port is not configured, the default is 9999;
- Build service call address "http: / {IP"_ port}/"
- call embedServer.start(address, port, appname, accessToken) method.
1.3 analysis EmbedServer.start(address, port, appname, accessToken)
public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { @Override public void run() { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); } }); try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); // start registry startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { if (e instanceof InterruptedException) { logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } else { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); } } finally { // stop try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); }
1.3.1 ExecutorBiz Impl: the implementation class of executorbiz scheduling service
It defines the methods of heartbeat detection, busy detection, triggering task, terminating task and viewing execution log
1.3.1.1 heartbeat detection
@Override public ReturnT<String> beat() { return ReturnT.SUCCESS; }
1.3.1.2 busy detection
@Override public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) { // isRunningOrHasQueue boolean isRunningOrHasQueue = false; JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId()); if (jobThread != null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; } if (isRunningOrHasQueue) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); } return ReturnT.SUCCESS; }
analysis:
- Obtain the corresponding task thread from the task thread warehouse concurrentmap < integer, jobthread > jobthreadreposition according to the jobId;
- If the task thread object is not empty and running or in the queue, it is considered busy.
1.3.1.3 trigger task
@Override public ReturnT<String> run(TriggerParam triggerParam) { // load old: jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // valid: jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change handler or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { try { IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } } } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof ScriptJobHandler && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change script or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // executor block strategy if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect: "+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect: " + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // replace thread (new or exists invalid) if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
analysis:
- Obtain the corresponding task thread from the task thread warehouse concurrentmap < integer, jobthread > jobthreadreposition according to the jobId;
- Verify the task thread and the specific processing instance of the task according to the task type; (if the task is of "BEAN" type, check the concurrenctmap < string according to the processing instance id of the task, Ijobhandler > jobhandlerrepository gets the specific task processing instance. If the task thread is not empty and the task specific instance is different from the task instance obtained from the task thread, the task type must be changed and the task thread before termination must be terminated.)
- If the task thread is not empty, the blocking processing policy of the task is obtained. If the policy is to discard subsequent scheduler, the request will be discarded and returned to failure; if the policy is to override previous scheduling, then the task thread is set to null, and the removal cause is set to "xxxxxx".
- If the task thread is null, call XxlJobExecutor.registJobThread() preservation;
Create a new JobThread; judge whether the old value already exists in the task thread warehouse concurrentmap < integer, JobThread > jobthreadreposition; if so, destroy the object and interrupt the thread.public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; }
- Call the pushTriggerQueue() method to put the input parameter into the execution queue of the task thread.
Judge whether the task is executed repeatedly according to the log record idpublic ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; }
1.3.1.4 terminate task
@Override public ReturnT<String> kill(KillParam killParam) { // kill handlerThread, and create new one JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId()); if (jobThread != null) { XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job."); return ReturnT.SUCCESS; } return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed."); }
Get the task thread object according to the jobId, and call the removeJobThread() method to terminate the thread.
public static JobThread removeJobThread(int jobId, String removeOldReason){ JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); return oldJobThread; } return null; }
1.3.1.5 view execution log
@Override public ReturnT<LogResult> log(LogParam logParam) { // log filename: logPath/yyyy-MM-dd/9999.log String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId()); LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum()); return new ReturnT<LogResult>(logResult); }
1.3.2 create a daemons for the dispatching center to call
1.3.2.1 expose the RPC service of a task scheduling to the scheduling center for calling
Using netty_http starts the http service and binds the port you passed in to set the actuator. Using EmbedHttpServerHandler to process tasks in the scheduling center's scheduling executor
1.3.2.2 register the address of the current actuator to the dispatching center
startRegistry(appname, address)
public void startRegistry(final String appname, final String address) { // start registry ExecutorRegistryThread.getInstance().start(appname, address); }
1.3.2.2.1 analyze ExecutorRegistryThread class
public void start(final String appname, final String address){ // valid if (appname==null || appname.trim().length()==0) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null."); return; } if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } registryThread = new Thread(new Runnable() { @Override public void run() { // registry while (!toStop) { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> registryResult = adminBiz.registry(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { if (!toStop) { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } } catch (InterruptedException e) { if (!toStop) { logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); } } } // registry remove try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { if (!toStop) { logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e); } } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory."); } }); registryThread.setDaemon(true); registryThread.setName("xxl-job, executor ExecutorRegistryThread"); registryThread.start(); }
- Verify the configuration of appname and dispatching center address adminAddresses of the actuator (cannot be empty);
- Create a registry thread (register every 30 seconds);
- In the running state, all adminbizclients configured by the circular actuator call the registry() method to initiate registration;
- After the service is offline, the registryRemove() method is called to initiate the registration removal.