elastic-job Source Code Interpretation and zookeeper Node Monitoring from Source Code

Elastic job uses ZK to do distributed coordination, including fragmentation parameter configuration, fragmentation, selection of master node, job status and a series of configuration and operation status data are stored in zk, including console, which directly makes configuration items effective by modifying ZK node data.
So how does elastic Job perceive when the zk node parameters change?


ListenerManager.png

When initializing the scheduler Facade property in the JobScheduler object, the Listenter Manager was initialized in the construction method. The object mainly does one thing, which is to monitor the changes of the data of each zk node.
In this diagram, you can see that all listeners monitoring zk nodes are implemented through some listeners. Look at class diagrams:

zk monitoring.png

From the class diagram, all zk node monitoring is to implement the TreeCacheListener interface, so what is the TreeCacheListener interface?

In curator, three caching modes, PathCache, NodeCache and TreeCache, are provided, which support monitoring of these three caches, and then indirectly monitor zk nodes. They are as follows:

  • Path Cache can monitor the child nodes of a node
    • PathChildrenCacheListener Node Listener Interface
  • Node Cache monitors a particular node
    • NodeCacheListener node listener interface
  • In addition to monitoring nodes, Tree Cache can also monitor the sub-nodes of the node, such as the combination of Path Cache and Node Cache, and monitor the whole tree.
    • TreeCacheListener Node Listener Interface

Looking back at the previous blog, in elastic-Job, treeCache is used to cache data, and in JobScheduler.init() method, JobRegistry. getInstance (). RegiserJob () method is called, which calls regCenter. addCacheData ("/"+jobName);

 
 public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
  JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        //Register jobs here and add TreeCache
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
 jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
  }

public void registerJob(final String jobName, final JobScheduleController jobScheduleController, final CoordinatorRegistryCenter regCenter) {
        schedulerMap.put(jobName, jobScheduleController);
        regCenterMap.put(jobName, regCenter);
        //Add a TreeCache
        regCenter.addCacheData("/" + jobName);
    }

A treeCache is added to the job registration to monitor the jobname named node and its children. Therefore, when the node and its children change, all the Liisteners listening on the treeCache will be aware. Each Listener makes its own obligation logic according to its own logic, such as whether the node is changed, whether EventType is the type that it cares about, and so on.

class CronSettingAndJobEventChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
                JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getCron());
            }
        }
    }

For example, CronSetting AndJob Event Changed JobListener, the Listener, when the node data changes, such as modifying the cron expression saved on zk, first the Listener will determine whether it is a config node, whether the type of node occurs is an update type, whether it has modified the node information, whether the job has stopped, and restart reschedu if these conditions are met. The scheduler corresponding to the job restores the relevant configuration of the job.

Added by student101 on Sun, 19 May 2019 02:41:09 +0300