Monitoring task life cycle: observer mode in multithreading

The content of this paper is summarized in< Java Detailed explanation of high concurrency programming multithreading and architecture design

Observer mode review

As for the specific implementation, this article will not introduce it. You can view the link ->Observer mode case.

The core of the observer model can be summarized as follows:

  • The topic holds the reference of the observer and can notify the observer when the status of the topic changes;
  • A collection is maintained inside the subject, which holds all observers for specific notification or broadcasting;

To sum up, the observer model is actually the embodiment and extension of the callback mechanism.

Implementation case: life cycle of monitoring task

In the methods provided by Thread, we can view the state of the corresponding Thread through methods such as isAlive(), getState(). In general, these methods are enough. You can do the following in the following scenarios:

  1. It is necessary to monitor when the task starts and ends, and obtain the execution results of the task;
  2. Specific things need to be done at a certain stage of task execution (such as error handling in case of exception, resource cleaning and data processing at the end);

thinking

  1. First of all, it is clear that the tasks executed by threads need to be accurately divided into stages. Therefore, some constants are needed to define the life cycle of the task, such as start, executing, completion, and exception status.
  2. Then a class is needed to realize the things that need to be completed in each stage of the task (such as exception notification and resource release). On this basis, a unified interface can be extracted, which can be called life cycle interface.
  3. Since the task is executed in the thread, of course, the callback of the life cycle should be completed in the run method. Analogous to the observer mode, the run method of a thread can be regarded as the state change method of the subject, and the callback of the life cycle interface can be started in the run method.

code implementation

task

That is, the logic that the thread needs to execute.

@FunctionalInterface
public interface Task<T> {
    T call() throws Exception;
}

Abstract theme

(the start and interrupt methods are additionally defined here to maintain the general usage habits of threads.)

public interface Observable {
    //Define lifecycle constants
    enum Cycle {
        STARTED, RUNNING, DONE, ERROR;
    }
    Cycle getCycle();
    void start();
    void interrupt();
}

Observer

Lifecycle interface

public interface TaskLifecycle<T> {
    void onStart(Thread thread);
    void onRunning(Thread thread);
    void onFinish(Thread thread, T result);
    void onError(Thread thread, Exception e);
}

Specific theme

In multi-threaded environment, this is the subclass of Thread and the implementation class of Observable.

  • The observer mode in the general sense will complete the callback of the Observer class within the state change method of the topic.
  • For multithreading, the Observer class callback is completed in the run method of Thread.
public class ObservableThread<T> extends Thread implements Observable {
    //Hold the reference of the Observer class, that is, the life cycle interface, and trigger the callback in the run method
    private final TaskLifecycle<T> lifecycle;
    //Specific tasks that threads need to perform
    private final Task<T> task;
    //Used to identify the stage of the current thread
    private Cycle cycle;

    public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
        this.lifecycle = lifecycle;
        this.task = task;
    }

    @Override
    public final void run() {
        //Callback in the start phase, setting the thread to the start state
        this.update(Cycle.STARTED, null, null);
        try {
            //Callback in the execution phase, setting the thread to the execution state
            this.update(Cycle.RUNNING, null, null);
            T result = this.task.call();
            //The callback of the completion phase, setting the thread to the completion state
            this.update(Cycle.DONE, result, null);
        } catch (Exception e) {
            //Callback in the exception stage, setting the thread to the exception state
            this.update(Cycle.ERROR, null, e);
        }
    }
    
    @Override
    public Cycle getCycle() {
        return this.cycle;
    }

    private void update(Cycle cycle, T result, Exception e) {
        this.cycle = cycle;
        if (lifecycle == null) {
            return;
        }
        try {
            switch (cycle) {
                case STARTED:
                    this.lifecycle.onStart(currentThread());
                    break;
                case RUNNING:
                    this.lifecycle.onRunning(currentThread());
                    break;
                case DONE:
                    this.lifecycle.onFinish(currentThread(), result);
                    break;
                case ERROR:
                    this.lifecycle.onError(currentThread(), e);
                    break;
            }
        } catch (Exception exception) {
            if (cycle == Cycle.ERROR) {
                throw exception;
            }
        }
    }
}

Test class

public class Test {
    @SneakyThrows
    public static void main(String[] args) {
        //Business to be completed when defining callback
        TaskLifecycle<List<Integer>> lifecycle = new TaskLifecycle<List<Integer>>() {
            @Override
            public void onStart(Thread thread) {
                //eg: resource preparation, logging, etc
                System.out.println(" now onStart ");
            }

            @Override
            public void onRunning(Thread thread) {
                //eg: logging
                System.out.println(" now onRunning ");
            }

            @Override
            public void onFinish(Thread thread, List<Integer> result) {
               //eg: result processing, resource recovery
                System.out.println(" now onFinish ,the result is " + result);
            }

            @Override
            public void onError(Thread thread, Exception e) {
              //eg: exception notification, exception record
                System.out.println(" now onError the exception is " + e);
            }
        };
         Observable thread = new ObservableThread<>(lifecycle, () -> {
            List<Integer> list = new ArrayList<>();
            list.add(1);
            TimeUnit.SECONDS.sleep(13);
            return list;
        });
        thread.start();

        TimeUnit.SECONDS.sleep(3L);
        //Test manual interrupt thread (simulate abnormal conditions)
        thread.interrupt();
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

results of enforcement

 now onStart 
 now onRunning 
 now onError the exception is java.lang.InterruptedException: sleep interrupted

summary

  • Modifying the run method of ObservableThread to final is to avoid subclasses modifying logic through inheritance and rewriting, which will destroy the notification mechanism;
  • The start and run methods defined in the Observable interface are used to blur the API of the Thread, that is, in order to abstract the function of the topic interface more clearly, from the design point of view, the user should treat the Observable Thread more as an observer. However, in order to conform to the usage habits of threads, this example defines these two common methods. If other functions are needed, you can add them.
  • In fact, it belongs to another case of the subject observer. According to the actual situation, we can consider allowing the topic thread to hold multiple references of life cycle interfaces and traverse in the run method.
  • In short, it is actually the callback of the method completed in the thread.

Keywords: git github intellij-idea

Added by mariocesar on Tue, 08 Mar 2022 04:32:56 +0200