Introduction of distributed task scheduling system chaconn

Chaconnect is a lightweight distributed task scheduling framework written in Java and based on the SpringBoot framework. The introduction of chaconne ct related components can help you build a distributed task cluster very quickly

Chaconnect feature list

  1. Perfect support for spring boot framework (2.2.0 +)
  2. Support timed tasks set in multiple ways (Cron expression, parameter setting, etc.)
  3. Supports dynamic saving and deletion of tasks
  4. Support annotation configuration scheduled tasks
  5. Two cluster modes (active standby mode and load balancing mode) are supported
  6. Built in a variety of load balancing algorithms, support custom load balancing algorithms
  7. Support retry and failover
  8. Support log tracking
  9. Support task parameter segmentation
  10. Support task dependency (serial dependency and parallel dependency)
  11. Support DAG simulation workflow
  12. Support task custom termination policy
  13. Support task timeout cooling and reset
  14. Support mail alarm

Chaconnect has two cluster deployment modes:

  1. Decentralized deployment mode
    If there is no fixed scheduling center node, the cluster will elect one of the applications as a Leader to conduct task command and scheduling
  2. Centralized deployment mode
    It is divided into two roles: scheduling center and task execution node, and both scheduling center and task execution node support cluster mode
    explain:
    The cluster here refers to the cluster composed of applications participating in task execution (chaconne ct cluster). It is an independent concept from the cluster composed of spring cloud framework
    If the chaconne ct cluster is small, the decentralized deployment mode is recommended. If the cluster is large, both modes can be used according to the actual situation.

The chaconne ct framework consists of two parts:

  1. chaconne-spring-boot-starter
    The core jar package contains all the core functions of chaconne ct
  2. chaconne-console
    Chaconnect web management interface for task management and viewing task running status

Maven:

<dependency>
	  <groupId>com.github.paganini2008.atlantis</groupId>
     <artifactId>chaconne-spring-boot-starter</artifactId>
     <version>1.0-RC1</version>
</dependency>

compatibility:
Jdk 1.8+

Brief introduction of chaconne ct implementation principle

The bottom layer of chaconnect relies on the Trident spring boot starter component to realize the task cluster mode (active standby mode and load balancing mode), and uses the message unicast mechanism (simulated by Redis PubSub) to realize task distribution, load balancing, fragment processing and other advanced features. It should be noted that the definition of cluster in chaconnect framework is consistent with that in Trident. The concept of cluster is equivalent to distinguishing different product groups or companies. At the same time, chaconnect also supports the concept of task group, which is an optional configuration. By default, the group name is the current application name (${spring.application.name}), That is, when there are multiple applications with the same application name, these applications become a task group. Chaconnect supports not only cross group task calls, but also cross cluster task calls.

How to define tasks?

  1. Use annotation @ ChacJob
  2. Inherit ManagedJob class
  3. Implement Job interface
  4. Implement NotManagedJob interface
    explain:
    The first three methods of defining tasks belong to programmatic task definition, that is, defining a task through code, starting and automatically loading it through the Spring framework
    The fourth definition method is used to define dynamic tasks, that is, the user submits a task on the Web interface (chaconnect console) or directly creates a task by calling the HTTP API. The difference is that the task object does not belong to the Bean managed by the Spring context.

Example code:

Create task with annotation
@ChacJob
@ChacTrigger(cron = "*/5 * * * * ?")
public class DemoCronJob {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoCronJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoCronJob's return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoCronJob is failed by cause: {}", e.getMessage(), e);
	}

}
Create task in interface mode
@Component
public class HealthCheckJob extends ManagedJob {

	@Override
	public Object execute(JobKey jobKey, Object arg, Logger log) {
		log.info(info());
		return UUID.randomUUID().toString();
	}

	@Override
	public Trigger getTrigger() {
		return GenericTrigger.Builder.newTrigger("*/5 * * * * ?").setStartDate(DateUtils.addSeconds(new Date(), 30)).build();
	}

	private String info() {
		long totalMemory = Runtime.getRuntime().totalMemory();
		long usedMemory = totalMemory - Runtime.getRuntime().freeMemory();
		return FileUtils.formatSize(usedMemory) + "/" + FileUtils.formatSize(totalMemory);
	}

	@Override
	public long getTimeout() {
		return 60L * 1000;
	}

}
Dynamic tasks:
public class EtlJob implements NotManagedJob {

	@Override
	public Object execute(JobKey jobKey, Object attachment, Logger log) {
		log.info("JobKey:{}, Parameter: {}", jobKey, attachment);
		return null;
	}

}

Task dependency

Task dependency is one of the important features of chaconne ct framework. Task dependency can be divided into serial dependency and parallel dependency,
The so-called serial dependency means that task A is completed and then Task B is executed, that is, Task B depends on task A.
Parallel dependency means, for example, that there are three tasks, namely task A, Task B, and task C. task C can only be executed after task A and Task B are completed, which is similar to the business scenario of countersignature.
Both serial dependency and parallel dependency can share task context parameters and running results, and support user-defined judgment strategies to decide whether to trigger downstream tasks.

DAG (directed acyclic graph)

Based on the combination of serial dependency and parallel dependency, chaconne ct framework provides DAG function and friendly API to simulate business scenarios similar to workflow, which enriches the use scenarios of task dependency.
(for the convenience of examples, tasks are configured by annotation)

Serial dependency example:
@ChacJob
@ChacTrigger(triggerType = TriggerType.DEPENDENT)
@ChacDependency({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoSchedJob", name = "demoSchedJob") })
public class DemoDependentJob {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoDependentJob is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoDependentJob's return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoDependentJob is failed by cause: {}", e.getMessage(), e);
	}

}
Parallel dependency example:

There are three tasks, demotask, demotaskone and demotasktwo
Let demotaskone and demotasktwo finish before executing DemoTask, and DemoTask can obtain the values of demotaskone and demotasktwo after execution

DemoTaskOne:
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskOne {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoTaskOne is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoTaskOne return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoTaskOne is failed by cause: {}", e.getMessage(), e);
	}

}
DemoTaskTwo:
@ChacJob
@ChacTrigger(triggerType = TriggerType.SIMPLE)
public class DemoTaskTwo {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoTaskTwo is running at: {}", DateUtils.format(System.currentTimeMillis()));
		return RandomUtils.randomLong(1000000L, 1000000000L);
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoTaskTwo return value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoTaskTwo is failed by cause: {}", e.getMessage(), e);
	}
	
}

DemoTask:
@ChacJob
@ChacTrigger(cron = "0 0/1 * * * ?", triggerType = TriggerType.CRON)
@ChacFork({ @ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskOne", name = "demoTaskOne"),
		@ChacJobKey(className = "com.chinapex.test.chaconne.job.DemoTaskTwo", name = "demoTaskTwo") })
public class DemoTask {

	@Run
	public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {
		log.info("DemoTask is running at: {}", DateUtils.format(System.currentTimeMillis()));
		TaskJoinResult joinResult = (TaskJoinResult) attachment;
		TaskForkResult[] forkResults = joinResult.getTaskForkResults();
		long max = 0;
		for (TaskForkResult forkResult : forkResults) {
			max = Long.max(max, (Long) forkResult.getResult());
		}
		return max;
	}

	@OnSuccess
	public void onSuccess(JobKey jobKey, Object result, Logger log) {
		log.info("DemoTask return max value is: {}", result);
	}

	@OnFailure
	public void onFailure(JobKey jobKey, Throwable e, Logger log) {
		log.error("DemoTask is failed by cause: {}", e.getMessage(), e);
	}

}
DAG task example

Dag tasks only support API creation at present, and will be continuously improved in the future by adding interface methods to create DAG tasks

@RequestMapping("/dag")
@RestController
public class DagJobController {

	@Value("${spring.application.cluster.name}")
	private String clusterName;

	@Value("${spring.application.name}")
	private String applicationName;

	@Autowired
	private JobManager jobManager;

	@GetMapping("/create")
	public Map<String, Object> createDagTask() throws Exception {
		Dag dag = new Dag(clusterName, applicationName, "testDag");// Create a Dag task and specify the cluster name, application name, and task name
		dag.setTrigger(new CronTrigger("0 0/1 * * * ?"));// Set Cron expression
		dag.setDescription("This is only a demo of dag job");// Task description
		DagFlow first = dag.startWith(clusterName, applicationName, "demoDagStart", DemoDagStart.class.getName());// Define the first node
		DagFlow second = first.flow(clusterName, applicationName, "demoDag", DemoDag.class.getName());// Define the second node
                // The second node fork s two child processes
		second.fork(clusterName, applicationName, "demoDagOne", DemoDagOne.class.getName());
		second.fork(clusterName, applicationName, "demoDagTwo", DemoDagTwo.class.getName());
		jobManager.persistJob(dag, "123");
		return Collections.singletonMap("ok", 1);
	}

}

The DAG example above shows that the DAG model provided by the chaconne ct framework supports serial inflow, that is, flow mode, and fork mode for parallel processing. In the above example, the task demoDag fork has two sub processes ("demoDagOne" and "demoDagTwo"), that is, demoDagOne and demoDagTwo are processed at the same time, and then trigger the demoDag task.

Chaconne deployment instructions

In addition to relying on the SpringBoot framework, chaconne ct uses Mysql to store task information by default (only MySQL is supported at present, and more types of databases will be supported in the future), and Redis to save cluster metadata and broadcast messages
Therefore, no matter which deployment method is used, you need to set DataSource and RedisConnectionFactory in your application
Example code:

@Slf4j
@Configuration
public class ResourceConfig {

	@Setter
	@Configuration(proxyBeanMethods = false)
	@ConfigurationProperties(prefix = "spring.datasource")
	public static class DataSourceConfig {

		private String jdbcUrl;
		private String username;
		private String password;
		private String driverClassName;

		private HikariConfig getDbConfig() {
			if (log.isTraceEnabled()) {
				log.trace("DataSourceConfig JdbcUrl: " + jdbcUrl);
				log.trace("DataSourceConfig Username: " + username);
				log.trace("DataSourceConfig Password: " + password);
				log.trace("DataSourceConfig DriverClassName: " + driverClassName);
			}
			final HikariConfig config = new HikariConfig();
			config.setDriverClassName(driverClassName);
			config.setJdbcUrl(jdbcUrl);
			config.setUsername(username);
			config.setPassword(password);
			config.setMinimumIdle(5);
			config.setMaximumPoolSize(50);
			config.setMaxLifetime(60 * 1000);
			config.setIdleTimeout(60 * 1000);
			config.setValidationTimeout(3000);
			config.setReadOnly(false);
			config.setConnectionInitSql("SELECT UUID()");
			config.setConnectionTestQuery("SELECT 1");
			config.setConnectionTimeout(60 * 1000);
			config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");

			config.addDataSourceProperty("cachePrepStmts", "true");
			config.addDataSourceProperty("prepStmtCacheSize", "250");
			config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
			return config;
		}

		@Primary
		@Bean
		public DataSource dataSource() {
			return new HikariDataSource(getDbConfig());
		}

	}

	@Setter
	@Configuration(proxyBeanMethods = false)
	@ConfigurationProperties(prefix = "spring.redis")
	public static class RedisConfig {

		private String host;
		private String password;
		private int port;
		private int dbIndex;

		@Bean
		@ConditionalOnMissingBean(RedisConnectionFactory.class)
		public RedisConnectionFactory redisConnectionFactory() {
			RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
			redisStandaloneConfiguration.setHostName(host);
			redisStandaloneConfiguration.setPort(port);
			redisStandaloneConfiguration.setDatabase(dbIndex);
			redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
			JedisClientConfiguration.JedisClientConfigurationBuilder jedisClientConfiguration = JedisClientConfiguration.builder();
			jedisClientConfiguration.connectTimeout(Duration.ofMillis(60000)).readTimeout(Duration.ofMillis(60000)).usePooling()
					.poolConfig(jedisPoolConfig());
			JedisConnectionFactory factory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration.build());
			return factory;
		}

		@Bean
		public JedisPoolConfig jedisPoolConfig() {
			JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
			jedisPoolConfig.setMinIdle(1);
			jedisPoolConfig.setMaxIdle(10);
			jedisPoolConfig.setMaxTotal(200);
			jedisPoolConfig.setMaxWaitMillis(-1);
			jedisPoolConfig.setTestWhileIdle(true);
			return jedisPoolConfig;
		}

	}

}
Decentralized deployment of Chaconne

Add the @ enablechaconnembeddedmode annotation to the main function of your Spring application, and then start
Example code:

@EnableChaconneEmbeddedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {

	public static void main(String[] args) {
		final int port = 8088;
		System.setProperty("server.port", String.valueOf(port));
		SpringApplication.run(YourApplicationMain.class, args);
	}

}
Chaconne centralized deployment
  1. To start the dispatching center, you need to create a new SpringBoot project, annotate the main function with @ EnableChaconneDetachedMode, and specify it as the production side
    Example code:
@EnableChaconneDetachedMode(DetachedMode.PRODUCER)
@SpringBootApplication
public class ChaconneManagementMain {

	public static void main(String[] args) {
		SpringApplication.run(ChaconneManagementMain.class, args);
	}
}
Don't forget to configure DataSource and RedisConnectionFactory
  1. Add the @ EnableChaconneDetachedMode annotation to the main function of your Spring application (the default is the consumer side), and then start
@EnableChaconneDetachedMode
@SpringBootApplication
@ComponentScan
public class YourApplicationMain {

	public static void main(String[] args) {
		final int port = 8088;
		System.setProperty("server.port", String.valueOf(port));
		SpringApplication.run(YourApplicationMain.class, args);
	}

}

Chaconnect console instructions

Chaconnect console is a Web project for task management and viewing provided by chaconne ct framework. It also supports decentralized deployment and centralized deployment mode. The default port is 6140
The following functions are provided:

  1. Save task and view task information
  2. Pause and resume tasks
  3. Delete task
  4. Run task manually
  5. View task statistics by day
  6. View the task runtime log

At present, the chaconnect console project is still under continuous maintenance. Some functions are slightly rough (task JSON editor), and some functions are not open yet.
Similarly, the chaconnect console is also a SpringBoot project
Source code:

@EnableChaconneClientMode
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class ChaconneConsoleMain {

	static {
		System.setProperty("spring.devtools.restart.enabled", "false");
		File logDir = FileUtils.getFile(FileUtils.getUserDirectory(), "logs", "indi", "atlantis", "framework", "chaconne", "console");
		if (!logDir.exists()) {
			logDir.mkdirs();
		}
		System.setProperty("DEFAULT_LOG_BASE", logDir.getAbsolutePath());
	}

	public static void main(String[] args) {
		SpringApplication.run(ChaconneConsoleMain.class, args);
		System.out.println(Env.getPid());
	}

}

The annotation @ enablechaconnenclientmode indicates that a task management client is enabled
After startup, enter the home page address: http://localhost:6140/chaconne/index
First enter the overview page:

Task list:

Create task:

Click Show Json to display data in Json format:

Task details:

Task log:

Task statistics:

You can view the statistics of each task (by day)

Finally, the source code address of distributed task scheduling system chaconnect is attached: https://github.com/paganini2008/chaconne.git
Interested friends can study its source code

Keywords: Java Distribution microservice

Added by greedyisg00d on Sat, 15 Jan 2022 04:17:17 +0200