Thinking and comparison in the application of Spring WebFlux

This paper is based on Spring Cloud Finchley SR4

Through several questions, this paper analyzes the best practice of Spring WebFlux usage and compares it with another framework Vertx

1. Whether you must use the default web container, whether you can use your own web container, and whether you can have web and webplus at the same time

Yes, this kind of dependency is feasible (tomcat and inferow or others can be used for containers, and inferow is used here):

2. How to realize the real Reactor model of asynchronous back pressure?

This problem, apart from using frameworks like Webflux and Vertx, is misunderstood. It is believed that simply adding the dependency of Webflux, and then the interface returns to Mono to implement the Reactor model of asynchronous back pressure. This is not the case. Let's take a few examples and learn more step by step. First of all, for the convenience of testing, we change the size of the http request processing thread pool of the web container to a unique one. For Tomcat, configure:

server.thread.max-thread=1

For undertow (we use undertow here):

# Set the number of IO threads, which mainly perform non blocking tasks. They are responsible for multiple connections. By default, one thread per CPU core is set
server.undertow.io-threads=1
# Block the task thread pool. When performing a servlet like request blocking IO operation, inferow will get the thread from this thread pool.
# Its value setting depends on the blocking coefficient of the system thread executing tasks. The default value is the number of IO threads * 8
server.undertow.worker-threads=1

After that, configure Log4J2 log format as:

<Property name="springAppName">test</Property>
    <Property name="LOG_ROOT">log</Property>
    <Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property>
    <Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property>
    <Property name="LOG_LEVEL_PATTERN">%5p</Property>
    <Property name="logFormat">
        %d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} [${springAppName},%X{X-B3-TraceId},%X{X-B3-SpanId}] [${sys:PID}] [%t][%C:%L]: %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD}
    </Property>

This format allows us to see the thread number, as well as the traceId and spanId of sleuth (our project relies on sleuth). First, write the test code to see whether the direct simple call and just realize the asynchronous back pressure:

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;
    
    @RequestMapping("/test")
    public Mono<String> test() {
        log.info("test started");
        return Mono.just(testService.simulateIOTest());
    }

    @Service
    public static class TestService {

        public String simulateIOTest() {
            try {
                //simulate io
                log.info("simulate start");
                TimeUnit.SECONDS.sleep(5);
                log.info("simulate end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "hello";
        }
    }
}

Call the interface at the same time, check the log, and find:

2019-11-12 09:05:41.595  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:41.596  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:46.598  INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:46.635  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:51.636  INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:05:51.643  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start
2019-11-12 09:05:56.644  INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end

It is obvious that the request is processed serially, because there is only one thread, and this thread is still waiting for the request to be processed. This is not in line with the Reactor model. The thread XNIO-2 task-1 processing http requests should not wait for the request to be processed, but directly process the next request. Replace Mono.just(testService.simulateIOTest()) with Mono. Fromcallable (() - > testservice. Simulateiotest()) and other similar effects. Here, you must use other thread pools to process the actual request. At the end of processing, fill the results in the outermost Mono. In this way, considering that the code is clean and does not use pure callback writing, it is required that each calling method returns a Future type. Here we return to completable Future.

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;

    @RequestMapping("/test")
    public Mono<String> test() {
        log.info("test started");
        return Mono.create(stringMonoSink -> testService.simulateIOTest().thenApply(s -> {
            log.info("apply");
            //Fill in successful results
            stringMonoSink.success(s);
            return s;
        }));
    }

    @Service
    public static class TestService {

        public CompletableFuture<String> simulateIOTest() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    //simulate io
                    log.info("simulate start");
                    TimeUnit.SECONDS.sleep(5);
                    log.info("simulate end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "hello";
            });
        }
    }
}

The result is:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.155  INFO [test,c654462e159fd43e,c654462e159fd43e] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:04.962  INFO [test,8366a95d002ca25a,8366a95d002ca25a] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:04.963  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:05.756  INFO [test,5f851d9e2ef49f14,5f851d9e2ef49f14] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:05.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:08.459  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.156  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:09.964  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
2019-11-12 09:18:10.757  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply

In this way, the Reactor model is really implemented.

3. Complete future thread pool management and log tracking

Completabilefuture can specify thread pool or not. If it is not specified above, then the thread pool used is Java8, which will start a commonfork joinpool with the size of CPU core minus one by default to execute. If it needs to be specified, basically each method can be passed to a thread pool as an external parameter.

As a best practice, as long as it involves IO, it is left to different thread pools, and different types of IO have different thread pools. For example, thread pools for database IO, RPC, cache access, and so on.

There is also a problem here, that is, asynchronous calls result in the loss of spanId and traceId. For example, the above example:

2019-11-12 09:18:03.457  INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started
2019-11-12 09:18:03.458  INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start

8d6eddc9cc80612f is lost, which makes it infeasible for microservice call chain log tracking. Therefore, for asynchronous code, we also need to force spanId and traceId to be set before asynchronous call.

To sum up, the modified code is:

@Log4j2
@RestController
public class TestController {
    @Autowired
    private TestService testService;

    @RequestMapping("/test")
    public Mono<String> test() {
        log.info("test started");
        return Mono.fromFuture(testService.simulateIOTest());
    }

    @Service
    public static class TestService {
        @Autowired
        private Tracer tracer;
        ThreadFactory build = (new ThreadFactoryBuilder()).setNameFormat("test_service_executor-%d").build();
        private ExecutorService executorService = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(131072), build, new ThreadPoolExecutor.AbortPolicy());

        public CompletableFuture<String> simulateIOTest() {
            Span span = tracer.currentSpan();
            return CompletableFuture.supplyAsync(() -> {
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    //simulate io
                    log.info("simulate start");
                    TimeUnit.SECONDS.sleep(5);
                    log.info("simulate end");
                    return "hello";
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, executorService);
        }
    }
}

The result is:

2019-11-12 09:44:30.953  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:28]:test started
2019-11-12 09:44:30.991  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
2019-11-12 09:44:35.991  INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end

3. What are the similarities and differences with Vertx?

In fact, from the design point of view, the basic idea is the same. For any IO operation, if there is a native asynchronous client (return is a Future), use the Future encapsulation to give it to other thread pools for processing, without affecting the http request thread to accept other requests.

The main differences are:

  1. WebFlux framework does not involve thread pool. Vertx even encapsulates asynchronous thread pool as a workerexector class of vertx.
  2. WebFlux asynchronous Future uses Java Native, and the Vetx framework encapsulates Future itself.
  3. The combination of WebFlux and Spring is more perfect, but the Spring ecosystem does not provide a native NIO client. For example, the NIO mysql client that implements the MySQL protocol stack has Vertx, but whether it is robust remains to be verified. These further limit the performance of WebFlux.
  4. Vertx is a cross language Framework

Keywords: Programming WebFlux Spring Tomcat MySQL

Added by cedtech23 on Tue, 12 Nov 2019 12:50:24 +0200