ElasticJob ‐ Lite: job listener

ElasticJob ‐ Lite provides a job listener to perform listening before and after task execution. Listeners are divided into regular listeners executed by each job node and distributed listeners executed only by a single node in the distributed scenario (distributed listeners currently have bugs). After the development of the job dependency (DAG) function, you may consider deleting the job listener function. To implement your own regular listeners and distributed listeners, you need to add SPI implementation.

Source code analysis

Job listener factory class:

package org.apache.shardingsphere.elasticjob.infra.listener;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;

import java.util.Optional;
import java.util.Properties;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ElasticJobListenerFactory {
    
    // Registering a job listener in a static block
    static {
        ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
    }
    
    ...
}

Job listeners are registered in the job listener factory. After class initialization, the job listener factory class has registered all job listeners through ElasticJobServiceLoader class. Relevant codes of ElasticJobServiceLoader class:

    private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>> TYPED_SERVICES = new ConcurrentHashMap<>();
    
    private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();
 
    public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
        if (TYPED_SERVICES.containsKey(typedService)) {
            return;
        }
        // Use the ServiceLoader class to load the service (job listener) and store it in the ConcurrentMap
        ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
    }
    
    // Store the job listener in ConcurrentMap for future use
    private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
    }

The ServiceLoader class is the SPI provided by Java, SPI (Service Provider Interface) is a service provider discovery mechanism built into JDK. It can be used to enable framework extension and replacement components. It is mainly used by framework developers. Different manufacturers can make different implementations for the same interface, such as java.sql.Driver interface. MySQL and PostgreSQL provide corresponding implementations for users, while Java SPI mechanism can Find a service implementation for an interface. The main idea of SPI mechanism in Java is to move the control of assembly outside the program. This mechanism is particularly important in modular design, and its core idea is decoupling.

The only requirement for the ServiceLoader class to work properly is that the service provider classes must have a parameterless constructor so that they can be instantiated during loading. The service provider is identified by placing the service provider configuration file in META-INF/services in the resource directory, The file name is the fully qualified name of the service type (such as the fully qualified name of ElasticJobListener class). The file contains the fully qualified name list of the specific service provider class (the fully qualified name list of ElasticJobListener implementation class), one for each line. The spaces, tabs and empty lines around each name will be ignored. The file must be encoded in UTF-8.

General listener

If the job processes the files of the job server and deletes the files after processing, consider using each node to perform the cleanup task. This type of task is easy to implement and does not need to consider whether the global distributed task is completed. This type of listener should be used as much as possible.

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>job</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>

Job definition (@ Component annotation must be added, otherwise Spring Boot cannot perceive the job):

package com.kaven.job;

import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:02
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */

@Component
public class MySimpleJob implements SimpleJob {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @SneakyThrows
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println(formatter.format(new Date()) + " : " + shardingContext.getShardingParameter() + "Data backup...");
        Thread.sleep(2000);
        System.out.println(formatter.format(new Date()) + " : " + shardingContext.getShardingParameter() + "Complete data backup.");
    }
}

General listener (implement ElasticJobListener interface):

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author: ITKaven
 * @Date: 2021/12/18 15:46
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class MySimpleJobListener implements ElasticJobListener {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    // Parameterless constructor
    public MySimpleJobListener(){}

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        System.out.println(formatter.format(new Date()) + " : Prepare the data backup environment.");
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        System.out.println(formatter.format(new Date()) + " : Clean up the data backup environment.");
    }

    @Override
    public String getType() {
        return "MySimpleJobListener";
    }
}

The return value of the getType method is the second key that stores the job listener (concurrentmap < class <? Extensions typedspi >, concurrentmap < string, typedspi > >, concurrentmap < class <? Extensions typedspi >, concurrentmap < string, class <? Extensions typedspi > >).

        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());

Startup class:

package com.kaven.job;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: ITKaven
 * @Date: 2021/12/16 20:21
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */

@SpringBootApplication
public class Server {
    public static void main(String[] args) {
        SpringApplication.run(Server.class);
    }
}

Profile:

elasticjob:
  reg-center:
    server-lists: "192.168.31.173:9000"
    namespace: "my-job"
    connection-timeout-milliseconds: 40000
    max-retries: 5
  jobs:
    MySimpleJob:
      elasticJobClass: com.kaven.job.MySimpleJob
      shardingTotalCount: 3
      cron: "30 * * * * ?"
      description: "The job has three slices and is executed every other minute"
      overwrite: true
      jobListenerTypes: MySimpleJobListener     # Match the appropriate job listener based on the return value of the getType method
      shardingItemParameters: "0=Beijing,1=Shanghai,2=Shenzhen"

server:
  port: 8080

Add SPI implementation.

Allow applications to start in parallel (the port needs to be modified each time, otherwise the port will conflict):

The output is shown in the following figure:



The output results are in line with expectations.

Distributed listener

If the job processes database data, only one node needs to complete the data cleaning task after processing. This type of task processing is complex and needs to synchronize the status of jobs in the distributed environment. Timeout settings are provided to avoid deadlock caused by job non synchronization. It should be used with caution. At present, the distributed listener of ElasticJob Lite has a Bug. The blogger has been fooled for a long time. He thought there was a problem with the operation. Finally, he found a Bug in the source code, and someone mentioned it on Github( shardingsphere-elasticjob/issues/487).

However, the blogger still demonstrates it and analyzes the causes of this Bug.

Distributed listener (inheriting AbstractDistributeOnceElasticJobListener abstract class):

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author: ITKaven
 * @Date: 2021/12/18 17:05
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class MyDistributeOnceSimpleJobListener extends AbstractDistributeOnceElasticJobListener {

    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final long startedTimeoutMilliseconds = 10000;
    private static final long completedTimeoutMilliseconds = 10000;

    public MyDistributeOnceSimpleJobListener() {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println(formatter.format(new Date()) + " : Insert a backup task into the database. The task status is incomplete.");
    }

    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println(formatter.format(new Date()) + " : Update the status of the corresponding backup task in the database to complete.");
    }

    @Override
    public String getType() {
        return "MyDistributeOnceSimpleJobListener";
    }
}

The AbstractDistributeOnceElasticJobListener abstract class also implements the ElasticJobListener interface.

Profile:

      jobListenerTypes: MySimpleJobListener,MyDistributeOnceSimpleJobListener

Add SPI implementation.

The output is shown in the following figure:

The distributed listener may be executed by multiple job partition nodes in a job scheduling.

Bug cause analysis

The doBeforeJobExecutedAtLastStarted method and the doAfterJobExecutedAtLastCompleted method of distributed listeners are called in the beforeJobExecuted method and the afterJobExecuted method of the parent class (AbstractDistributeOnceElasticJobListener) respectively, and no distributed locks are used in the calling process, which may be executed by multiple job fragmentation nodes.

That's all for ElasticJob Lite's job listener. If the blogger has something wrong or you have different opinions, you are welcome to comment and supplement.

Keywords: elastic-job

Added by rea|and on Mon, 20 Dec 2021 01:41:32 +0200