The use of CO process in java

background

Java collaboration. We haven't heard much about Java collaboration for so long, but we often hear the concept of collaboration. This article introduces the specific use of collaboration in Java.

I've seen a lot of examples of java using quasar framework to realize collaborative process, but many of them are written in the wrong way, so I'll write an article here.

When we are using multithreading, if there are long-time I/O operations. At this time, threads are always blocked. If there are many threads, many threads will be idle, resulting in incomplete resource application. The relative coordination process is different. In a single thread, multiple tasks go back and forth. If there is a long-time I/O operation, let them give up the current coordination process scheduling and execute the next task. Of course, all tasks may be stuck at the same point, but this is only for a single thread. When all data returns normally, the current I/O operation will be processed at the same time.

practice

java use collaboration instance
Step 1: add quasar dependency to maven project

		<dependency>
            <groupId>co.paralleluniverse</groupId>
            <artifactId>quasar-core</artifactId>
            <version>0.7.9</version>
            <classifier>jdk8</classifier>
        </dependency>

Step 2: implement collaborative process using quasar framework
The complete code is as follows:

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.strands.SuspendableCallable;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
public class QuasarTest {

    public static void start() throws Exception {
        //Use blocking queues to get results.
        LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            //Fiber here is a bit like Callable and can return data
            Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
                //This is used to test the memory usage
                log.debug("in-" + finalI );
                Fiber.sleep(10000);
                return finalI;
            });
            //Start execution
            fiber.start();
            //Join queue
            fiberQueue.add(fiber);
        }
        while (true) {
            //block
            Fiber<Integer> fiber = fiberQueue.take();
            log.debug("out-" + fiber.get() );
        }
    }
}

The results are as follows:

2022-03-02 17:28:50.327 DEBUG 13832 --- [r-pool-worker-2] com.ai.toptea.sysm.util.QuasarTest       : in-1
2022-03-02 17:28:50.327 DEBUG 13832 --- [r-pool-worker-9] com.ai.toptea.sysm.util.QuasarTest       : in-0
2022-03-02 17:28:50.327 DEBUG 13832 --- [r-pool-worker-4] com.ai.toptea.sysm.util.QuasarTest       : in-3
2022-03-02 17:28:50.327 DEBUG 13832 --- [-pool-worker-11] com.ai.toptea.sysm.util.QuasarTest       : in-2
2022-03-02 17:28:50.329 DEBUG 13832 --- [-pool-worker-13] com.ai.toptea.sysm.util.QuasarTest       : in-4
2022-03-02 17:28:50.329 DEBUG 13832 --- [r-pool-worker-6] com.ai.toptea.sysm.util.QuasarTest       : in-5
2022-03-02 17:28:50.329 DEBUG 13832 --- [r-pool-worker-8] com.ai.toptea.sysm.util.QuasarTest       : in-7
2022-03-02 17:28:50.329 DEBUG 13832 --- [-pool-worker-15] com.ai.toptea.sysm.util.QuasarTest       : in-6
2022-03-02 17:28:50.330 DEBUG 13832 --- [-pool-worker-10] com.ai.toptea.sysm.util.QuasarTest       : in-9
2022-03-02 17:28:50.330 DEBUG 13832 --- [r-pool-worker-1] com.ai.toptea.sysm.util.QuasarTest       : in-8
2022-03-02 17:29:00.337 DEBUG 13832 --- [r-pool-worker-8] com.ai.toptea.sysm.util.QuasarTest       : in-4
2022-03-02 17:29:00.337 DEBUG 13832 --- [-pool-worker-15] com.ai.toptea.sysm.util.QuasarTest       : in-5
2022-03-02 17:29:00.337 DEBUG 13832 --- [-pool-worker-13] com.ai.toptea.sysm.util.QuasarTest       : in-2
2022-03-02 17:29:00.337 DEBUG 13832 --- [r-pool-worker-6] com.ai.toptea.sysm.util.QuasarTest       : in-7
2022-03-02 17:29:00.337 DEBUG 13832 --- [r-pool-worker-2] com.ai.toptea.sysm.util.QuasarTest       : in-1
2022-03-02 17:29:00.337 DEBUG 13832 --- [r-pool-worker-4] com.ai.toptea.sysm.util.QuasarTest       : in-3
2022-03-02 17:29:00.337 DEBUG 13832 --- [-pool-worker-10] com.ai.toptea.sysm.util.QuasarTest       : in-0
2022-03-02 17:29:00.337 DEBUG 13832 --- [r-pool-worker-1] com.ai.toptea.sysm.util.QuasarTest       : in-6
2022-03-02 17:29:00.338 DEBUG 13832 --- [r-pool-worker-3] com.ai.toptea.sysm.util.QuasarTest       : in-9
2022-03-02 17:29:00.338 DEBUG 13832 --- [-pool-worker-12] com.ai.toptea.sysm.util.QuasarTest       : in-8
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-0
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-1
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-2
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-3
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-4
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-5
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-6
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-7
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-8
2022-03-02 17:29:00.339 DEBUG 13832 --- [  restartedMain] com.ai.toptea.sysm.util.QuasarTest       : out-9

conclusion

It can be seen from the printing time that 10 processes have not been printed by fiber sleep(10000); Blocking is to record the stack information of the current process and then schedule the execution of other processes, so the execution time is 17:28:50.

Here is the explanation:
Fiber in quasar is actually a continuation, which can be scheduled by the scheduler defined by quasar. A continuation records the status of the running instance, and will be interrupted at any time, and will then be restored where it was interrupted. Quasar actually achieves this goal by modifying bytecode, so when running quasar program, you need to modify your code at runtime through Java agent. Of course, you can also do so during compilation. golang has its own built-in scheduler, while quasar uses ForkJoinPool, a thread pool with work steaming function, as the scheduler by default. Work steeling is very important because you don't know which fiber will finish first. Work steeling can dynamically steal a context from other queues, which can maximize the use of CPU resources.

From r-pool-worker-2 in the print log, it can be seen that the queue numbers of the queues in work steeling are different, indicating that they are commands executed in different processes.

extend

1. Fiber in quasar is actually a continuation, which can be scheduled by the scheduler defined by quasar. A continuation records the status of the running instance, and will be interrupted at any time, and will then be restored where it was interrupted. Quasar actually achieves this goal by modifying bytecode, so when running quasar program, you need to modify your code at runtime through Java agent. Of course, you can also do so during compilation. golang has its own built-in scheduler, while quasar uses ForkJoinPool, a thread pool with work steaming function, as the scheduler by default. Work steeling is very important because you don't know which fiber will finish first. Work steeling can dynamically steal a context from other queues, which can maximize the use of CPU resources.

2. Here you will ask, how does quasar know which bytecode to modify? In fact, it is also very simple. Quasar will scan which methods can be interrupted at runtime through Java agent. At the same time, it will insert some continuation logic into the methods before and after the method is called. If you define @ suspended Annotation on the method, Then quasar will do something similar to the following for the method calling the annotation.

3. Here, suppose you define @ suspendable on method f and call method g with the same annotation at the same time, then all methods calling f will insert some bytecodes. The logic of these bytecodes is to record the current state on the Fiber stack so that it can be dynamically recovered in the future. (similar to Fiber, threads also have their own stack). In the suspendable method chain, the parent class of Fiber will call Fiber Park, which will throw a SuspendExecution exception to stop the running of the thread so that the scheduler of Quasar can perform scheduling. The SuspendExecution here will be captured by Fiber itself, which should not be captured at the business level. If Fiber is awakened (the scheduler will call Fiber.unpark), then f will be called again where it was interrupted (where Fiber will know where it was interrupted). At the same time, G's call result (G's return result) will be inserted into F's recovery point. In this way, it looks like G's return is f's local variables, thus avoiding the nesting of callback.

4. There is a lot of verbosity above. In fact, to put it simply, try to stop the running thread stack so that the scheduler of Quasar can intervene. There are only two conditions for JVM thread interruption, one is throwing an exception and the other is return. Here, Quasar is achieved by throwing exceptions, so you will see that the code above will throw SuspendExecution. But if you do catch this exception, it indicates that there is a problem, so it is generally written like this.

Keywords: Java jvm Multithreading thread

Added by fisicx on Wed, 02 Mar 2022 12:20:40 +0200