SpringBoot implements asynchronous import and export tasks (implementation idea + super detailed process)

preface:

When the system has a large number of users, and users rely on importing and exporting reports, and the amount of data is relatively large, the user experience of downloading Excel in real time is not ideal.
Therefore, we need to find another implementation mode to reduce the pressure on our server and meet the needs of users without affecting the use of users.
This article introduces the asynchronous import and export mode used in our actual project.
It is mainly divided into several steps:
1, When the user operation is exported, save the operation to the asynchronous task table, record the user information and request parameters, and respond synchronously to the user "the operation is successful, please go to the task center to view".
2, When the user operates the import, the import file is temporarily saved to the server, and the import operation is saved to the asynchronous task table. The user information is recorded, and the synchronous response is "the operation is successful, please go to the task center to view".
3, A "my tasks" menu is required to display the status and operation of user import / export tasks.
4, Timed tasks or task scheduling Quartz are needed to realize the task execution in our asynchronous task table. Multi thread execution can be used to improve the efficiency of task execution.
5, For example, export operation: first create the file to be exported on the server. Because there may be too much exported data and the writing efficiency of Excel is low, you can use Export data in csv format.
To prevent memory overflow caused by too much data in one query of the database, batch query can be used to write csv files in the form of stream, and the status of the asynchronous task table can be updated after the final operation is successful.
6, The download button is provided in the "my task" menu. If the user's export task is in the status of processing completed (successful), the download operation can be carried out. At this time, the scv file generated by the server will be downloaded locally.
7, Because this mode will generate many temporary files, it is recommended to save the temporary files to the oss server. If the resources are sufficient, the module can be used as an asynchronous service, and there is still a lot of space for further optimization.

Main process:

  1. You need an asynchronous task table. The sql for creating the table is posted here
CREATE TABLE `async_import_export_task` (
  `task_id` bigint(20) NOT NULL COMMENT 'task id',
  `task_name` varchar(255) DEFAULT NULL COMMENT 'Task name',
  `company_id` bigint(20) DEFAULT NULL COMMENT 'company id',
  `user_id` bigint(20) DEFAULT NULL COMMENT 'user id',
  `priority` tinyint(255) DEFAULT NULL COMMENT 'Priority 0-10',
  `req_param` varchar(8000) DEFAULT NULL COMMENT 'Request parameters',
  `task_status` tinyint(255) DEFAULT NULL COMMENT 'Task status',
  `created_time` datetime DEFAULT NULL COMMENT 'Creation time',
  `deal_time` datetime DEFAULT NULL COMMENT 'processing time ',
  `finish_time` datetime DEFAULT NULL COMMENT 'Completion time',
  `result` mediumtext COMMENT 'Processing results',
  `tag` varchar(255) DEFAULT NULL COMMENT 'File path:\n When importing tasks, the path is user file upload oss Path of\n When exporting tasks, the path is the generated file in oss Path of',
  `task_code` varchar(255) DEFAULT NULL COMMENT 'Task code\r\n     * D-A-01 Download the call record of the day\r\n',
  `task_type` tinyint(4) DEFAULT NULL COMMENT '     * Task type\n     * 1: Upload (import)\n     * 2: Download (export)',
  PRIMARY KEY (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  1. Some maven dependencies may be required to implement the process. I have posted my own here. If necessary, I can introduce them
    <properties>
        <java.version>1.8</java.version>
        <lombok.version>1.16.18</lombok.version>
        <fastjson.version>1.2.39</fastjson.version>
        <commons.io.version>2.5</commons.io.version>
        <commons.lang3.version>3.5</commons.lang3.version>
        <mysql.connector.version>8.0.16</mysql.connector.version>
        <mybatis-plus-version>3.1.0</mybatis-plus-version>
        <druid-version>1.0.29</druid-version>
        <velocity-version>2.0</velocity-version>
        <swagger2-version>2.7.0</swagger2-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- introduce aop relevant -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!--Lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
            <version>${lombok.version}</version>
        </dependency>
        <!--springboot Configuration Metadata -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--Json-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--io Common tools -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons.io.version}</version>
        </dependency>
        <!--commons-lang3 Tool class -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons.lang3.version}</version>
        </dependency>

        <!--use Mysql database-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.connector.version}</version>
        </dependency>

        <!--mybatis-plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus-version}</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>${mybatis-plus-version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid-version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-engine-core</artifactId>
            <version>${velocity-version}</version>
        </dependency>

        <!--swagger2 integrate-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${swagger2-version}</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${swagger2-version}</version>
        </dependency>

        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>swagger-bootstrap-ui</artifactId>
            <version>1.9.6</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.10</version>
        </dependency>
    </dependencies>
  1. Let's paste my implementation code according to the above process
  • Specify the path to save the temporary file in application Configuration in YML
temp:
  file:
    path: D:\\
  • You will need several tool classes and entity classes of the AsyncImportExportTask table. You can copy them directly into the project
package com.blog.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.springframework.format.annotation.DateTimeFormat;

import java.util.Date;


/**
 * Asynchronous import and export task table
 *
 * @author LiWT
 * @date 2021-06-27
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("async_import_export_task")
public class AsyncImportExportTask {

    /**
     * Task id
     */
    @TableId(value = "task_id", type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * Task name
     */
    private String taskName;

    /**
     * Company id
     */
    private Long companyId;

    /**
     * User id
     */
    private Long userId;

    /**
     * Task priority
     * Works only for tasks of the current user
     */
    private Integer priority;

    /**
     * Request parameters
     */
    private String reqParam;


    /**
     * D-A-01 Download the call record of the day
     * D-A-02 Download call history
     * U-C-01 Import customer data
     * D-C-01 Export customer data
     * U-U-01 Import user data
     * D-U-01 Export user data
     */
    private String taskCode;

    /**
     * Task status
     * 1: init
     * 2: running
     * 3: finish(success)
     * 4: finish(fail)
     */
    private Integer taskStatus;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createdTime;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date dealTime;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date finishTime;

    private String tag;

    /**
     * Processing results
     */
    private String result;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @TableField(exist = false)
    private Date beginTime;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @TableField(exist = false)
    private Date endTime;

    /**
     * Task type
     * 1: Upload (import)
     * 2: Download (export)
     */
    private Integer taskType;

}
  • System unified return type R
package com.blog.utils;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.*;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;

/**
 * @author LiWT
 * @date 2020-12-04
 */
@Builder
@ToString
@NoArgsConstructor
@Accessors(chain = true)
@ApiModel(value="R object", description="The encapsulated object that returns the information")
public class R<T> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * Success flag
     */
    private static final Integer SUCCESS = 200;
    /**
     * Failure flag
     */
    private static final Integer FAIL = 500;

    @Getter
    @Setter
    @ApiModelProperty(value = "Interface code: 200-success; five hundred-Failure; four hundred and one-Insufficient permissions")
    private int code;

    @Getter
    @Setter
    @ApiModelProperty(value = "Interface prompt information")
    private String msg;


    @Getter
    @Setter
    @ApiModelProperty(value = "Interface return data")
    private T data;

    @Getter
    @Setter
    @ApiModelProperty(value = "Logical processing time")
    private Long respTime;


    public static <T> R<T> ok() {
        return restResult(null, SUCCESS, null);
    }

    public static <T> R<T> ok(T data) {
        return restResult(data, SUCCESS, null);
    }

    public static <T> R<T> ok(T data, String msg) {
        return restResult(data, SUCCESS, msg);
    }

    public static <T> R<T> failed() {
        return restResult(null, FAIL, null);
    }

    public static <T> R<T> failed(String msg) {
        return restResult(null, FAIL, msg);
    }

    public static <T> R<T> failed(T data) {
        return restResult(data, FAIL, null);
    }

    public static <T> R<T> failed(T data, String msg) {
        return restResult(data, FAIL, msg);
    }
    public static <T> R<T> isOk(boolean isOk, String msg){
        if(isOk) {
            return restResult(null, SUCCESS, msg + "success");
        } else {
            return restResult(null, FAIL, msg + "fail, Please try again");
        }
    }

    public static <T> R<T> result(int code, String msg, T data) {
        return restResult(data, code, msg);
    }

    private static <T> R<T> restResult(T data, int code, String msg) {
        R<T> apiResult = new R<>();
        apiResult.setCode(code);
        apiResult.setData(data);
        apiResult.setMsg(msg);
        if(StringUtils.isBlank(msg)) {
            if(SUCCESS != code) {
                apiResult.setMsg("operation failed");
            } else {
                apiResult.setMsg("Operation succeeded");
            }
        }
        return apiResult;
    }

    public R(int code, String msg, T data, Long respTime) {
        this.code = code;
        this.msg = msg;
        this.data = data;
        this.respTime = respTime;
    }
}

  • Tool class: SpringContextHolder
package com.blog.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;

/**
 * Save the Spring ApplicationContext as a static variable, and you can take out the ApplicaitonContext at any time, anywhere in any code
 *
 * @author LiWT
 * @date 2021-06-27
 */
@Slf4j
@Service
@Lazy(false)
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {

    private static ApplicationContext applicationContext = null;

    /**
     * Get the ApplicationContext stored in the static variable
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * Implement ApplicationContextAware interface and inject Context into static variables
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        SpringContextHolder.applicationContext = applicationContext;
    }

    /**
     * Get the Bean from the static variable applicationContext and automatically convert it to the type of the assigned object
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
        return (T) applicationContext.getBean(name);
    }

    /**
     * Get the Bean from the static variable applicationContext and automatically convert it to the type of the assigned object
     */
    public static <T> T getBean(Class<T> requiredType) {
        return applicationContext.getBean(requiredType);
    }

    /**
     * Clearing ApplicationContext in SpringContextHolder is Null
     */
    public static void clearHolder() {
        if (log.isDebugEnabled()) {
            log.debug("eliminate SpringContextHolder Medium ApplicationContext:" + applicationContext);
        }
        applicationContext = null;
    }

    /**
     * Publish event
     *
     * @param event
     */
    public static void publishEvent(ApplicationEvent event) {
        if (applicationContext == null) {
            return;
        }
        applicationContext.publishEvent(event);
    }

    /**
     * Implement the DisposableBean interface to clean up static variables when Context is closed
     */
    @Override
    public void destroy() throws Exception {
        SpringContextHolder.clearHolder();
    }

}

  • AsyncImportExportTask Mapper:
package com.blog.mapper;

import cn.hutool.core.date.DateTime;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.blog.entity.AsyncImportExportTask;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 * @author LiWT
 * @date 2021-06-27
 */
@Mapper
@Repository
public interface AsyncImportExportTaskMapper extends BaseMapper<AsyncImportExportTask> {
    /**
     * Query scheduled tasks to be processed according to time and status
     *
     * @param now current time 
     * @param i state
     * @return task list
     */
    List<AsyncImportExportTask> selectByTimeAndStatus(@Param("beginTime") DateTime now, @Param("status") int i);

}

  • AsyncImportExportTask Mapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.blog.mapper.AsyncImportExportTaskMapper">

    <resultMap type="com.blog.entity.AsyncImportExportTask" id="AsyncImportExportTaskResult">
        <id property="taskId" column="task_id"/>
        <result property="taskName" column="task_name"/>
        <result property="taskCode" column="task_code"/>
        <result property="companyId" column="company_id"/>
        <result property="userId" column="user_id"/>
        <result property="priority" column="priority"/>
        <result property="reqParam" column="req_param"/>
        <result property="taskStatus" column="task_status"/>
        <result property="createdTime" column="created_time"/>
        <result property="dealTime" column="deal_time"/>
        <result property="finishTime" column="finish_time"/>
        <result property="result" column="result"/>
        <result property="tag" column="tag"/>
        <result property="taskType" column="task_type"/>
    </resultMap>


    <select id="selectByTimeAndStatus" resultMap="AsyncImportExportTaskResult">
        select * from async_import_export_task where created_time <![CDATA[ < ]]> #{beginTime} and task_status = #{status} for update;
    </select>

</mapper>
  • Step 1: when exporting, save the user's export task to the AsyncImportExportTask table and respond directly to the user
package com.blog.controller;

import com.alibaba.fastjson.JSONObject;
import com.blog.entity.AsyncImportExportTask;
import com.blog.service.IAsyncImportExportTaskService;
import com.blog.utils.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;

/**
 * Asynchronous import and export Controller
 *
 * @Author: LiWT
 * @Date: 2021/6/29 14:39
 */
@RestController
public class AsyncImportExportController {
    @Resource
    private IAsyncImportExportTaskService asyncImportExportTaskService;

    /**
     * The export interface only saves the export parameters in AsyncImportExportTask
     * The user enterprise ID and user ID should take the user's real information to facilitate the query of my task
     * @param jsonObject Export parameters
     * @return Export succeeded. See my task for details
     */
    @PostMapping("/export/user")
    public R<String> export(@RequestBody JSONObject jsonObject) {
        AsyncImportExportTask asyncImportExportTask = new AsyncImportExportTask();
        asyncImportExportTask.setCreatedTime(new Date());
        asyncImportExportTask.setTaskCode("D-A-01");
        asyncImportExportTask.setTaskName("Export current day call record");
        asyncImportExportTask.setReqParam(jsonObject.toJSONString());
        asyncImportExportTask.setTaskId(Long.parseLong(UUID.randomUUID().toString()));
        asyncImportExportTask.setCompanyId(-1L);//User enterprise ID
        asyncImportExportTask.setTaskStatus(1);
        asyncImportExportTask.setTaskType(2);
        asyncImportExportTask.setUserId(1L);//User ID
        asyncImportExportTaskService.save(asyncImportExportTask);
        return R.ok("Operation succeeded,Please go to the task center to check");
    }
}
  • Step 2: when importing, the user temporarily saves the imported file to the server and saves the import operation to the asynchronous task table. Refer to step 1 interface
  • Step 3: you need a "my task" menu to display the status and operation of user import / export tasks/ The queryMyTask interface queries my task list, and the / downloadFile interface downloads the file of the corresponding task.
package com.blog.controller;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.blog.entity.AsyncImportExportTask;
import com.blog.service.IAsyncImportExportTaskService;
import com.blog.utils.R;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * My task Controller
 *
 * @author LiWT
 * @date 2021-06-27
 */
@RestController
@RequestMapping("/vos/custom/task")
@Api(value = "My mission", tags = {"My mission"})
public class MyTaskController {

    @Resource
    private IAsyncImportExportTaskService asyncImportExportTaskService;

    /**
     * Query my tasks
     *
     * @param page                  Paging parameters
     * @param asyncImportExportTask User ID
     * @return My task list
     */
    @PostMapping("/queryMyTask")
    public R queryMyTask(IPage<AsyncImportExportTask> page, @RequestBody AsyncImportExportTask asyncImportExportTask) {
        asyncImportExportTask.setCompanyId(-1L);
        asyncImportExportTask.setUserId(1L);
        return R.ok(asyncImportExportTaskService.selectJobOrderByPage(page, asyncImportExportTask));
    }

    /**
     * Download task file
     *
     * @param taskId Task ID
     * @return File download path
     */
    @GetMapping("/downloadFile")
    public R downloadFile(Long taskId) {
        return R.ok("File path saved by corresponding task");
    }
}

  • Asynchronous task Service layer IAsyncImportExportTaskService:
package com.blog.service;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import com.blog.entity.AsyncImportExportTask;

import java.util.List;

/**
 * @author LiWT
 * @date 2021-06-27
 */
public interface IAsyncImportExportTaskService extends IService<AsyncImportExportTask> {

    /**
     * Query my task list
     *
     * @param page                  Paging parameters
     * @param asyncImportExportTask Company ID, user ID
     * @return My import and export task list
     */
    IPage<AsyncImportExportTask> selectJobOrderByPage(IPage<AsyncImportExportTask> page, AsyncImportExportTask asyncImportExportTask);

    /**
     * Query tasks with init status
     * @return List of tasks to be processed
     */
    List<AsyncImportExportTask> getTaskAndUpdateStatusSynchronized();
}

  • Asynchronous task ServiceImpl layer AsyncImportExportTaskServiceImpl:
package com.blog.service.impl;

import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.blog.entity.AsyncImportExportTask;
import com.blog.mapper.AsyncImportExportTaskMapper;
import com.blog.service.IAsyncImportExportTaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;


/**
 * @author LiWT
 * @date 2021-06-27
 */
@Service
public class AsyncImportExportTaskServiceImpl extends ServiceImpl<AsyncImportExportTaskMapper, AsyncImportExportTask> implements IAsyncImportExportTaskService {


    @Autowired
    private AsyncImportExportTaskMapper asyncImportExportTaskMapper;

    @Override
    public IPage<AsyncImportExportTask> selectJobOrderByPage(IPage<AsyncImportExportTask> page, AsyncImportExportTask asyncImportExportTask) {
        QueryWrapper<AsyncImportExportTask> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq(asyncImportExportTask.getTaskStatus() != null, "task_status", asyncImportExportTask.getTaskStatus());
        queryWrapper.like(StringUtils.hasText(asyncImportExportTask.getTaskName()), "task_name", asyncImportExportTask.getTaskName());
        if (asyncImportExportTask.getBeginTime() != null) {
            queryWrapper.ge("created_time", asyncImportExportTask.getBeginTime());
        }
        if (asyncImportExportTask.getEndTime() != null) {
            queryWrapper.le("created_time", asyncImportExportTask.getEndTime());
        }
        queryWrapper.eq("user_id", asyncImportExportTask.getUserId());
        queryWrapper.eq("company_id", asyncImportExportTask.getCompanyId());
        queryWrapper.orderByDesc("created_time");
        return asyncImportExportTaskMapper.selectPage(page, queryWrapper);
    }

    @Override
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public List<AsyncImportExportTask> getTaskAndUpdateStatusSynchronized() {
        // Query tasks with init status
        List<AsyncImportExportTask> list = asyncImportExportTaskMapper.selectByTimeAndStatus(DateUtil.date(), 1);
        if (CollectionUtils.isEmpty(list)) {
            return null;
        }
        List<Long> taskIds = new ArrayList<>();
        list.forEach(asyncImportExportTask -> {
            taskIds.add(asyncImportExportTask.getTaskId());
        });
        UpdateWrapper<AsyncImportExportTask> updateWrapper = new UpdateWrapper<>();
        updateWrapper.set("task_status", 2);
        updateWrapper.set("deal_time", new Date());
        updateWrapper.in("task_id", taskIds);
        asyncImportExportTaskMapper.update(null, updateWrapper);
        return list;
    }

}
  • Step 4: schedule tasks to implement the task execution in our asynchronous task table. Multithreading can be used to improve the efficiency of task execution.
package com.blog.task;

import com.blog.entity.AsyncImportExportTask;
import com.blog.service.AsyncTaskCommonService;
import com.blog.service.IAsyncImportExportTaskService;
import com.blog.utils.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author LiWT
 * @date 2021-06-27
 */
@Component
@Slf4j
public class AsyncImExportDataHandle {

    static ThreadPoolExecutor.CallerRunsPolicy policy = new ThreadPoolExecutor.CallerRunsPolicy();

    static ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200), new ThreadFactory() {
        int num = 0;

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("async-task-thread-" + ++num);
            return thread;
        }
    }, policy);


    @Autowired
    private IAsyncImportExportTaskService asyncImportExportTaskService;

    public void dealAsyncTask() {
        List<AsyncImportExportTask> list = asyncImportExportTaskService.getTaskAndUpdateStatusSynchronized();
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        log.info("====> Number of tasks to be processed:{}", list.size());
        for (AsyncImportExportTask asyncImportExportTask : list) {
            ((AsyncTaskCommonService) SpringContextHolder.getBean(asyncImportExportTask.getTaskCode())).invoke(asyncImportExportTask);
        }

        /*list.forEach(asyncImportExportTask -> CompletableFuture.runAsync(
                () -> ((AsyncTaskCommonService)SpringContextHolder.getBean(asyncImportExportTask.getTaskCode())).invoke(asyncImportExportTask), executor));*/
    }
}
  • Scheduled tasks:
package com.blog.task.job;

import com.blog.task.AsyncImExportDataHandle;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * Check the asynchronous tasks to be processed every other minute
 *
 * @author LiWT
 * @date 2021-06-27
 */
@Slf4j
@Component
@EnableScheduling
public class AsyncImExportDataTask {
    @Autowired
    private AsyncImExportDataHandle asyncImExportDataHandle;

    @Scheduled(cron = "0 0/1 * * * ?")
    private void asyncImExportData() {
        asyncImExportDataHandle.dealAsyncTask();
    }
}
  • Public Service AsyncTaskCommonService. The exported implementation class needs to implement the Service override invoke() method
package com.blog.service;

import com.blog.entity.AsyncImportExportTask;

/**
 * @author LiWT
 * @date 2021-06-27
 */
public interface AsyncTaskCommonService {
    /**
     * Perform tasks
     *
     * @param asyncImportExportTask
     */
    void invoke(AsyncImportExportTask asyncImportExportTask);
}
  • Step 5: first create the file to be exported on the server. Because there may be too much exported data and the writing efficiency of Excel is low, you can use Export data in csv format.
    To prevent the memory overflow caused by too much data in one query of the database, batch query is used to write csv files in the form of stream, and the status of the asynchronous task table is updated after the final operation is successful.
package com.blog.service.impl.async;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.blog.entity.AsyncImportExportTask;
import com.blog.entity.User;
import com.blog.mapper.UserMapper;
import com.blog.service.AsyncTaskCommonService;
import com.blog.service.IAsyncImportExportTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;

/**
 * @author LiWT
 * @date 2021-06-27
 */
@Component("D-A-01")
@Slf4j
public class ExportUserInfoData implements AsyncTaskCommonService {

    @Value("${temp.file.path}")
    private String baseTempFilePath;

    @Resource
    private UserMapper userMapper;

    @Autowired
    private IAsyncImportExportTaskService asyncImportExportTaskService;

    @Override
    public void invoke(AsyncImportExportTask asyncImportExportTask) {
        log.info(Thread.currentThread().getName() + "Export call history...");

        //When exporting parameters, you need to export data according to parameters
        final String reqParam = asyncImportExportTask.getReqParam();

        //Create a csv file to the specified path
        String filePath = baseTempFilePath + UUID.randomUUID() + ".csv";
        File file = new File(filePath);
        if (!file.exists()) {
            if (!file.getParentFile().exists()) {
                file.getParentFile().mkdirs();
            }
        }
        try (FileWriter fw = new FileWriter(file, true)) {
            if (!file.exists()) {
                file.createNewFile();
            }

            // Add BOM header to prevent Chinese garbled code when opening csv in excel
            final byte[] bytes = {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
            fw.write(getChars(bytes));
            fw.write("full name,nickname,address,birthday,Gender,mobile phone");
            fw.write("\r\n");
            boolean continueFlag = true;
            int num = 0;
            //Query the required data in paging mode and write it to the csv file. At this time, the FileWriter only performs flush and does not close
            while (continueFlag) {
                //Customize paging parameters to query 1000 pieces of data each time
                IPage<User> page = new Page<>();
                page.setCurrent(++num);
                page.setSize(1000);

                QueryWrapper<User> queryWrapper = new QueryWrapper<>();
                IPage<User> userPageInfo = userMapper.selectPage(page, queryWrapper);
                final List<User> rows = userPageInfo.getRecords();
                //If the number of queries is less than 1000, set the continueFlag to false to jump out of the loop
                if (rows.size() < 1000) {
                    continueFlag = false;
                }
                //Write the data of each query to csv
                for (User row : rows) {
                    write(fw, row);
                }
                fw.flush();
            }
        } catch (Exception e) {
            log.error("invoke FileWriter error:", e);
        }
        String tag = filePath.substring(1);
        asyncImportExportTask.setResult("Mission success");
        asyncImportExportTask.setFinishTime(new Date());
        asyncImportExportTask.setTaskStatus(3);
        asyncImportExportTask.setTag(tag);
        asyncImportExportTaskService.saveOrUpdate(asyncImportExportTask);
        log.info(Thread.currentThread().getName() + "Export task completed!!!!!!!!!!!!");
    }

    public static char[] getChars(byte[] bytes) {
        Charset cs = StandardCharsets.UTF_8;
        ByteBuffer bb = ByteBuffer.allocate(bytes.length);
        bb.put(bytes);
        bb.flip();
        CharBuffer cb = cs.decode(bb);
        return cb.array();
    }


    private static void write(FileWriter fw, User row) {
        try {
            fw.write(row.getName() + "\t,\""
                    + row.getNickName() + "\"\t,\""
                    + row.getAddress() + "\"\t,\""
                    + row.getBirthday() + "\"\t,\""
                    + row.getSex() + "\"\t,\""
                    + row.getMobile() + "\"\t");
            fw.write("\r\n");
        } catch (IOException e) {
            log.error("write error:", e);
        }
    }

}
  1. The implementation process ends here. This paper provides a solution to optimize the import and export function in scenarios with a certain number of users, a large amount of data and a certain amount of concurrency.
    If the scheme can be implemented, there is still a lot of room for subsequent optimization, which can basically meet daily needs.

Keywords: Spring Boot csv asynctask

Added by dougp23 on Wed, 26 Jan 2022 15:51:39 +0200