Collect and monitor asynchronous task error logs

1, Requirement description

When encountering a task at work, conduct error log monitoring on the asynchronous tasks executed in the thread pool opened in the scheduled task to ensure that the scheduled task can run normally, or when the first exception occurs, the operation and maintenance personnel can quickly find the problem task through the interface.

The following is part of the pseudo code of the scheduled task (omitting the specific business):

class Test2 {
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(200));

    @Scheduled(cron = "0 0 6 * * ?")
    public void testScheuled() {
        //Simulate business: get a List
        List<String> strArr = new ArrayList<>();
        for (String str : strArr) {
            //Start asynchronous task
            threadPool.execute(() -> {
                //Handle major business
                // ...
            });
        }
    }

	//..... Omit other scheduled tasks
}

The above code mainly lists the logical process, and the specific business code is omitted. Similarly, there are many scheduled tasks of this class. Only one is listed here for demonstration.

Code analysis: a scheduled task has a for loop that submits asynchronous tasks to the thread pool multiple times.

Requirement: what we need to do here is to monitor the error information of the main business executed by the thread. If there is an error, we need to record the exception, Record and display the exception information (a scheduled task is a unit, and only record the exception of any error thread. In short, if all threads succeed, there is nothing wrong, but if all or part of the errors occur, only the error information of one thread can be captured).

2, Code implementation (regardless of the beauty of the code)

In view of the above requirements, we have preliminarily realized the function, which is also very simple:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(200));

@Scheduled(cron = "0 0 6 * * ?")
    public void testScheuled() {
        Map<String, String> errMap = new ConcurrentHashMap<>(2);

        //Simulate business: get a List
        List<String> strArr = new ArrayList<>();
        for (String str : strArr) {
            //Start asynchronous task
            threadPool.execute(() -> {
                try {
                    //Handle major business
                    // ...
                } catch (Exception e) {
                    if (errMap.get(ERRMSG) == null) {
                        errMap.put(ERRMSG, ERRMSG);
                        //Set monitoring results 
                        setCheckResult("Bit code", "The monitoring result is False", e.getMessage(), 1, 1);
                    }
                } finally {
                    if (errMap.get(ERRMSG) == null && errMap.get(SUCCESS) == null) {
                        errMap.put(SUCCESS, SUCCESS);
                        //Set monitoring results
                        setCheckResult("Bit code", "Monitoring results True", "", 1, 1);
                    }
                }
            });
        }
    }
    
    private void setCheckResult(String checkCode, String checkResult, String checkErr, int dateValue, int field) {
       //....
       //Write to database
	   //.....
    }

Code explanation: compared with the changes of the original code and the new code, it is obvious that try... catch... finally... Is added to the run method of the thread, and the relevant error information is recorded in it. The specific error information processing has been encapsulated into a method. Here, we will not focus on its implementation, that is, recording the database, and then searching and displaying it in the interface. (error information processing is implemented in different ways according to individual needs and different scenarios, so I won't introduce it too much)

Map<String, String> errMap = new ConcurrentHashMap<>(2);// An identifier used to judge whether an exception has occurred
Briefly, why use a map instead of a local variable as an error flag?
Ordinary variables are not used, because lambada expressions are used, so they are inaccessible.
ConcurrentHashMap, which can be accessed in lambada expressions, is thread safe.
...
I haven't had time to delve into why maps can be accessed but ordinary variables can't.

So far, the code has been written, the function has been perfectly realized, and the following problems have been completely prevented:

  1. When all threads have errors, error messages are written to the database many times. After map control, one thread will be written only once after it is caught that there is an error, and others will be ignored.
  2. When all are successful, the success information is written to the database many times. Similarly, after the map control, it is written only once.
  3. An execution fails and the database is recorded as F. how to update the database to S after repair? You know, every time a scheduled task is re executed, the map is re created, so this problem can be ignored.

When the task is completed, I am happy to hand in the task, but I didn't expect to be called back. Reason: the implementation code is cumbersome, the code is aggressive, there are many repeated codes, there are many regular tasks of the project, and each regular task is added once, which is very tiring.

3, Code optimization (decorator mode + template method mode)

Looking at so many repeated codes on the screen, it's normal to be called back. I'm still too delicious... Hahahahahhah

After referring to several design patterns, I made the following corrections to my code:
The core business code is as follows:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(200));


    @Scheduled(cron = "0 0 6 * * ?")
    public void testScheuled() {
        Map<String, String> errMap = new ConcurrentHashMap<>(2);
        //Simulate business: get a List
        List<String> strArr = new ArrayList<>();
        for (String str : strArr) {
            //Start asynchronous task
            threadPool.execute((new ExecutorTask("Bit code", "Monitoring results True", "", 1, 1,errMap){
                @Override
                public void task() {
                    //Handle major business
                    // ...
                }
            }));
        }
    }

Non core business code:

abstract class ExecutorTask implements Runnable{
    private String code;
    private int dateValue;
    private int field;
    private Map<String, String> errMap;
    public ExecutorTask(String code,int dateValue,int field,Map<String, String> errMap){
        this.code = code;
        this.dateValue = dateValue;
        this.field = field;
        this.errMap = errMap;
    }

    public abstract void task();
    public void run(){
        try {
            task();
        } catch (Exception e) {
            if (errMap.get(ERRMSG) == null) {
                errMap.put(ERRMSG, ERRMSG);
                setCheckResult(code, EnumType.CheckResult.F, e.getMessage(), dateValue, field);
            }
        } finally {
            if (errMap.get(ERRMSG) == null && errMap.get(SUCCESS) == null) {
                errMap.put(SUCCESS, SUCCESS);
                setCheckResult(code, EnumType.CheckResult.S, SPACE_STRING, dateValue, field);
            }
        }
    }

    private void setCheckResult(String checkCode, String checkResult, String checkErr, int dateValue, int field) {
       //....
       //Write to database
	   //.....
    }
}

After a meal of operation, the non core business code is successfully extracted and processed, the code intrusion of the original scheduled task is reduced, and the implementation is less cumbersome. When adding the scheduled task, you only need to focus on the core logic. Obviously, it succeeded in extracting most of the non core code.

Briefly explain the ExecutorTask class
ExecutorTask implements the runable interface and implements the run method. In the run method, catch exception operation is performed on the task method, and then the task is implemented by subclasses, and the implemented code is our core business code.
Among them, several design modes of 23 design modes are used for reference, and I can't tell them clearly. I feel very similar to adapter mode, decorator mode and template method mode, which you can understand.

At this point, I went to hand in the task happily, but I called back for the following reasons:

  1. This line of map < string, string > errmap = new in the core code
    ConcurrentHashMap<>(2); What do people think when they see it? What is this and why does it appear in my code?
  2. And this line ThreadPool Execute ("new executortask"), "monitoring result True", "1,
    1. Errmap), how can you guarantee that others will obediently give you a new ExecutorTask and obediently pass in the following ghost parameters, and now lambada expressions are popular. What's the ghost of this internal class?

I also thought about these problems. I thought I could muddle through. I also wanted to test the team leader's attitude. It seems that I think too much... Hahahahha... For these problems, I returned to my seat and fell into a long meditation

4, Code re optimization (decorator mode + template method mode + adapter)

Looking at these problems, they are all problems. After thinking about it, maybe we can encapsulate the thread pool, so we get the following code:

Core business code:

MyThreadPoolExecutor threadPool = new MyThreadPoolExecutor(5,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(200));

    @Scheduled(cron = "0 0 6 * * ?")
    public void testScheuled() {
        //To enable monitoring, these three parameters are required to record error information. According to different scheduled tasks, the corresponding values are passed in
        threadPool.beginMonitor("Bit code",1, 1);

        //Simulate business: get a List
        List<String> strArr = new ArrayList<>();
        for (String str : strArr) {
            //Start asynchronous task
            threadPool.execute(()->{
                //...
                //Core business logic
                //...
            });
        }
    }

Non core business code:

class MyThreadPoolExecutor{
    private ThreadPoolExecutor poolExecutor;
    private String code;
    private int dateValue;
    private int unit;
    private boolean monitor = false;
    private Map<String, String> errMap;
    public MyThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue) {
        this.poolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory());
    }

    public void beginMonitor(String code, int dateValue, int unit){
        this.code = code;
        this.dateValue = dateValue;
        this.unit = unit;
        this.monitor = true;
        this.errMap = new ConcurrentHashMap<>(2);
    }

    public void execute(Runnable run){
        if (monitor){
            ExecutorTask executorTask = new ExecutorTask(code,dateValue,unit,errMap,run);
            poolExecutor.execute(executorTask);
        }else {
            poolExecutor.execute(run);
        }
    }
}

class ExecutorTask implements Runnable {
    private String code;
    private int dateValue;
    private int unit;

    private Runnable task;
    private Map<String, String> errMap;

    public ExecutorTask(String code, int dateValue, int unit, Map<String, String> errMap,Runnable task) {
        this.code = code;
        this.dateValue = dateValue;
        this.unit = unit;

        this.task = task;
        this.errMap = errMap;
    }

    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            if (errMap.get(ERRMSG) == null) {
                errMap.put(ERRMSG, ERRMSG);
                setCheckResult(code, EnumType.CheckResult.F, e.getMessage(), dateValue, unit);
            }
        } finally {
            if (errMap.get(ERRMSG) == null && errMap.get(SUCCESS) == null) {
                errMap.put(SUCCESS, SUCCESS);
                setCheckResult(code, EnumType.CheckResult.S, SPACE_STRING, dateValue, unit);
            }
        }
    }

    private void setCheckResult(String checkCode, String checkResult, String checkErr, int dateValue, int field) {
        //....
        //Write to database
        //.....
    }
}

As can be seen from the above code, the main core business code has been cleaned up a lot. Compared with the original code, it is almost no difference and has been perfectly optimized.
The main difference from the original code: the thread pool is encapsulated, and the function of execute is enhanced.

Looking at this code, I thought, I changed it again and again, overturned it again and again, and finally it was clean, so I wanted to hand in this code quickly and see what he said
Just as I was about to move away, I suddenly remembered that I had been overthrown twice before. This time, I should be careful and check it carefully again to see if there are any bug s~~~~~~~

Sure enough, I found the problem myself this time. Alas~~~

In the custom thread pool, those member variables are public resources, and each scheduled task is a thread. Each will call the beginMonitor method once, which leads to thread safety problems in those member variables.

Is there no solution? How to ensure the thread safety of those member variables? Due to the epidemic situation, the company left work early, left work with problems, took the bus and headphones, still thinking about various solutions all the way back to renting....

5, Solve thread safety problems

In the evening, I turned on the computer again, stared at the code and thought, occasionally Baidu ~ ~ ~ no solution.
Turn off the computer to sleep, lie in bed and still think... Is it back to the origin? No, wait, wait... ThreadLocal... I forgot it and threw up~

Get up immediately. It's only early in the morning. It's quite early... Don't hurry to sleep. After three or two times of trouble, it's done.
Core business code remains unchanged:

    MyThreadPoolExecutor threadPool = new MyThreadPoolExecutor(5,
            10,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(200));

    @Scheduled(cron = "0 0 6 * * ?")
    public void testScheuled() {
        //Turn on monitoring
        threadPool.beginMonitor("Bit code",1, 1);

        //Simulate business: get a List
        List<String> strArr = new ArrayList<>();
        for (String str : strArr) {
            //Start asynchronous task
            threadPool.execute(()->{
                //...
                //Core business logic
                //...
            });
        }
    }

Change the member variable of thread pool to ThreadLocal thread variable:

class MyThreadPoolExecutor{
    private ThreadPoolExecutor poolExecutor;
    ThreadLocal<String> code = new ThreadLocal();;
    ThreadLocal<Integer> dateValue = new ThreadLocal();;
    ThreadLocal<Integer> unit = new ThreadLocal();;
    ThreadLocal<Map> errMap = new ThreadLocal();
    ThreadLocal<Boolean> monitor = new ThreadLocal();

    public MyThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue) {
        this.poolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory());
    }

    public void beginMonitor(String code, int dateValue, int unit){
        this.code.set(code);
        this.dateValue.set(dateValue);
        this.unit.set(unit);
        this.monitor.set(true);
        this.errMap.set(new ConcurrentHashMap<>(2));
    }

    public void execute(Runnable run){
        if (monitor.get() != null && monitor.get()){
            ExecutorTask executorTask = new ExecutorTask(code.get(),dateValue.get(),unit.get(),errMap.get(),run);
            poolExecutor.execute(executorTask);
        }else {
            poolExecutor.execute(run);
        }
    }
}

class ExecutorTask implements Runnable {
    private String code;
    private int dateValue;
    private int unit;

    private Runnable task;
    private Map<String, String> errMap;

    public ExecutorTask(String code, int dateValue, int unit, Map<String, String> errMap,Runnable task) {
        this.code = code;
        this.dateValue = dateValue;
        this.unit = unit;

        this.task = task;
        this.errMap = errMap;
    }

    public void run() {
        try {
            task.run();
        } catch (Exception e) {
            if (errMap.get(ERRMSG) == null) {
                errMap.put(ERRMSG, ERRMSG);
                setCheckResult(code, EnumType.CheckResult.F, e.getMessage(), dateValue, unit);
            }
        } finally {
            if (errMap.get(ERRMSG) == null && errMap.get(SUCCESS) == null) {
                errMap.put(SUCCESS, SUCCESS);
                setCheckResult(code, EnumType.CheckResult.S, SPACE_STRING, dateValue, unit);
            }
        }
    }

    private void setCheckResult(String checkCode, String checkResult, String checkErr, int dateValue, int field) {
        //....
        //Write to database
        //.....
    }
}

ThreadLocal is a thread variable and is thread safe. When each scheduled task runs, it is a thread. Therefore, the values stored in ThreadLocal are also thread safe to avoid mutual interference. Finally solved

6, Summary

Class diagram, supplemented later.

Keywords: Java Back-end

Added by LowEndTheory on Sat, 26 Feb 2022 08:38:44 +0200