Spring Batch Learning Records

Background: Before I looked at the company's data import platform, imported user order records, using SpringBook's own Scheduled task scheduling function, and then notified developers by e-mail whether the task scheduling was successful. Overall, the function is to support the existing business, but developers can not clearly understand the status of task operation and task scheduling is not flexible enough. Originally, I wanted to develop a set by myself, thinking that I might not consider it thoroughly enough, resulting in slow development progress. Whenever there are new changes, it will hurt my nerves and bones. So I found Spring Batch on the Internet. First, I looked at the official website of Spring Batch.

Spring Batch is a lightweight, comprehensive batch framework designed to develop powerful batch applications that are critical to the day-to-day operation of enterprise systems. Spring Batch builds the desired Spring Framework features (productivity, POJO-based development methods and general ease of use), while enabling developers to easily access and utilize more advanced enterprise services when necessary. Spring Batch is not a scheduling framework. There are many excellent enterprise schedulers (such as Quartz, Tivoli, Control-M, etc.) in both commercial and open source domains. It is intended to be used with the scheduler, not to replace the scheduler.

Spring Batch provides reusable functions that are critical for processing large amounts of records, including record/trace, transaction management, job processing statistics, job restart, skip and resource management. It also provides more advanced technical services and functions to achieve extremely high capacity and high performance batch operations through optimization and partitioning technology. Spring Batch can be used for two simple use cases (e.g., reading files into a database or running stored procedures) and for complex large use cases (e.g., moving large amounts of data between databases, transforming it, etc.). Mass batch jobs can process large amounts of information in a highly scalable manner using the framework.

Spring Batch is more inclined to Job's configuration. If you schedule it, you won't be anxious to read it carefully for the time being. Then you'll go to study enterprise scheduling programs like Quartz.

ATP: Let's start with a little Demo without going into the implementation of Spring Batch in detail.

Tool software:

IDEA: 2018.2

Java: 1.8.0_171

Gradle: 4.10.2

Demo address: https://gitee.com/leonchen21/SpringBatchDemo/tree/SpringBatchDemo_01

 

First, create a SpringBoot Gradle project

SpringBootVersion:2.1.6

Add development support, as shown in the figure

First, configure Spring Batch and create a configuration class as follows

package person.leon.batch.springbatchdemo.config;

import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * Custom Batch Basic Configuration
 * 
 * @author leon
 * @since 2019/6/28 15:41
 */
@Configuration
public class BatchConfigurerConfig extends DefaultBatchConfigurer {

    @Autowired
    private DataSource dataSource;

    @Override
    public JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        //Job asynchronous execution for http Request invocation
//        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();


        return jobLauncher;
    }

    @Override
    public JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionManager(getTransactionManager());
        factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        factory.setTablePrefix("BATCH_");
        factory.setMaxVarCharLength(1000);
        return factory.getObject();
    }

    @Override
    public JobExplorer createJobExplorer() throws Exception {
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        jobExplorerFactoryBean.setDataSource(dataSource);
        jobExplorerFactoryBean.afterPropertiesSet();
        return jobExplorerFactoryBean.getObject();
    }

}

Then add the following configuration to the application. properties file

# SpringBatch automatically executes table-building statements
spring.batch.initialize-schema=ALWAYS

The Spring Batch framework itself needs some table support, so we need to configure it to create the framework itself, and we need to add some default database configurations.

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/batchdemo?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true&useSSL=false&autoReconnect=true&failOverReadOnly=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=123456

Second, the first Job configuration

ATP: Job configuration can be configured as an xml file. This article only uses JavaConfig configuration for the time being.

Now, we have a task to import data from a csv file into the mysql database.

The csv file location is shown in the following figure

mysql database is temporarily configured with the default database above, which should be processed separately.

1. Create ImportProvinceJobConfig class

import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

@Configuration
@EnableBatchProcessing
@Slf4j
public class ImportProvinceJobConfig {
    @Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;
}

@ Configuration declares that this class is a configuration class

@ EnableBatch Processing automatically completes some important attribute dependencies on batch work, and if not declared, jobBuilderFactory and stepBuilderFactory will report errors

@ Slf4j Lombok's log annotation is equivalent to adding the attribute private static final Logger log = LoggerFactory.getLogger(ImportProvinceJobConfig.class) to the ImportProvinceJobConfig class.

JobBuilderFactory for creating Job instances

StepBuilderFactory is used to create Step instances

2. Read

Read the data from the csv file as follows

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.core.io.ClassPathResource;

    /**
* Read data from a file
*/
@Bean
public FlatFileItemReader<Province> reader() {
return new FlatFileItemReaderBuilder<Province>()
//Define the name of the read instance
.name("provinceItemReader")
//In strict mode, Execution Context if the input resource does not exist,
//reader throws an exception. Otherwise, it will record the problem and continue.
//Default to true
.strict(true)
//The data separator type separator for each row of the source file is "."
// .lineTokenizer(new DelimitedLineTokenizer())
//Define parsing characters
.encoding(StandardCharsets.UTF_8.name())
//Define resources (read data sources)
//Spring resource document https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#resources
.resource(new ClassPathResource("provinces.csv"))
//Number of rows ignored at the top of the file
.linesToSkip(0)
//If linesToSkip is set to 2, this handler interface is called twice
//.skippedLinesCallback(handler)
.delimited()
//Define field correspondence
.names(provinceSet)
//Conversion to Object
.fieldSetMapper(new ProvinceFieldSetMapper())
.build();
}

ATP: Flat File is a file that contains records without relative structure (as a comma-separated value (CSV)

FlatFileItemReader This class provides the basic functionality for reading and parsing Flat File. The core of this class is Resource and LineMapper.

Where Resource: Spring Core Resource Relevant documents LineMapper: Given the current row number and the associated row number, the mapper should return the result domain object

The implementation of LineMapper in this code is implemented by DelimitedLineTokenizer (default) and ProvinceFieldSetMapper.

The DelimitedLineTokenizer function generates String arrays separated by commas for each line of csv files

ProvinceField Set Mapper is defined by us, and the code is as follows

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import person.leon.batch.springbatchdemo.entity.Province;

/**
* Conversion from file reading to object
*
* @author: Leon
* @date: 2019/6/29 17:24
*/
public class ProvinceFieldSetMapper implements FieldSetMapper<Province> {

@Override
public Province mapFieldSet(FieldSet fieldSet) throws BindException {
Province province = new Province();
province.setId(fieldSet.readLong("id"));
province.setCreateTime(fieldSet.readDate("createTime", "yyyy-MM-dd HH:mm:ss"));
province.setUpdateTime(fieldSet.readDate("updateTime", "yyyy-MM-dd HH:mm:ss"));
province.setProvinceId(fieldSet.readString("provinceId"));
province.setProvinceName(fieldSet.readString("provinceName"));
province.setDisplay(fieldSet.readBoolean("display"));
province.setApkChannel(fieldSet.readString("apkChannel"));
province.setBsChannel(fieldSet.readString("bsChannel"));
return province;
}
}




import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.util.Date;

/**
* Provincial Information
*
* @author leon
* @date 2019/4/25 11:09
*/
@Data
public class Province {

private Long id;
private Date createTime;
private Date updateTime;
private String provinceId;
private String provinceName;
private boolean display;
private String apkChannel;
private String bsChannel;

}

This class implements the mapFieldSet method of the interface FieldSetMapper, where the FieldSet is an array of strings read out of the csv file.

The main function of this class is to convert the read character creation into an object.

3. Conversion

The object generated by reading data is converted, because there is no special requirement, the original object is returned directly this time.

@Bean
public ProvinceItemProcessor processor() {
return new ProvinceItemProcessor();
}

/**
* Transformation
*/
private class ProvinceItemProcessor implements ItemProcessor<Province, Province> {
@Override
public Province process(Province item) throws Exception {
return item;
}
}

4. Writing

/**
* Insert statement
*/
private String insertSql = "INSERT INTO province " +
" (id, create_time, update_time, province_id, province_name, display, apk_channel, bs_channel) " +
" VALUES (" +
":id, :createTime, :updateTime, :provinceId, :provinceName, :display, :apkChannel, :bsChannel)";

 

/**
* Write to database
*
* @param dataSource
* @return
*/
@Bean
public JdbcBatchItemWriter<Province> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Province>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql(insertSql)
.dataSource(dataSource)
.build();
}

JdbcBatchItemWriter uses the batch function NamedParameter JdbcTemplate to execute batch statements for all provided projects

5. Building Job and Step

@Bean
@Qualifier("importProvinceJob")
public Job importProvinceJob(
JobAroundListener listener,
@Qualifier("importProvinceStep") Step step1) {
return jobBuilderFactory.get("importProvinceJob")
// Configuring Job does not support restarting (a JobRestart Exception exception is thrown at this time)
// Default support for restart
//.preventRestart()
// job parameter declaration verifier
.validator(new DefaultJobParametersValidator())
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}

@Bean
@Qualifier("importProvinceStep")
public Step step1(JdbcBatchItemWriter<Province> writer) {
return stepBuilderFactory.get("importProvinceStep")
.<Province, Province>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
Configure Job instance beans and Step instance beans, Job and Step are one-to-many relationships.

II. Running Job

Two modes of operation:

1) Start the SpringBook project and automatically execute Job;

2) Call Job through http request.

To prevent Job execution at project startup, add configuration in application.properties

spring.batch.job.enabled=false

Enter localhost:8080/run in the browse address bar

Verify the execution results, connect to mysql database, query the table province to see if there is data, and then check whether there is data in Spring Batch related tables.

Keywords: PHP Spring Database MySQL Java

Added by jcbarr on Tue, 02 Jul 2019 21:20:05 +0300