This paper is based on Spring Cloud Finchley SR4
Through several questions, this paper analyzes the best practice of Spring WebFlux usage and compares it with another framework Vertx
1. Whether you must use the default web container, whether you can use your own web container, and whether you can have web and webplus at the same time
Yes, this kind of dependency is feasible (tomcat and inferow or others can be used for containers, and inferow is used here):
2. How to realize the real Reactor model of asynchronous back pressure?
This problem, apart from using frameworks like Webflux and Vertx, is misunderstood. It is believed that simply adding the dependency of Webflux, and then the interface returns to Mono to implement the Reactor model of asynchronous back pressure. This is not the case. Let's take a few examples and learn more step by step. First of all, for the convenience of testing, we change the size of the http request processing thread pool of the web container to a unique one. For Tomcat, configure:
server.thread.max-thread=1
For undertow (we use undertow here):
# Set the number of IO threads, which mainly perform non blocking tasks. They are responsible for multiple connections. By default, one thread per CPU core is set server.undertow.io-threads=1 # Block the task thread pool. When performing a servlet like request blocking IO operation, inferow will get the thread from this thread pool. # Its value setting depends on the blocking coefficient of the system thread executing tasks. The default value is the number of IO threads * 8 server.undertow.worker-threads=1
After that, configure Log4J2 log format as:
<Property name="springAppName">test</Property> <Property name="LOG_ROOT">log</Property> <Property name="LOG_DATEFORMAT_PATTERN">yyyy-MM-dd HH:mm:ss.SSS</Property> <Property name="LOG_EXCEPTION_CONVERSION_WORD">%xwEx</Property> <Property name="LOG_LEVEL_PATTERN">%5p</Property> <Property name="logFormat"> %d{${LOG_DATEFORMAT_PATTERN}} ${LOG_LEVEL_PATTERN} [${springAppName},%X{X-B3-TraceId},%X{X-B3-SpanId}] [${sys:PID}] [%t][%C:%L]: %m%n${sys:LOG_EXCEPTION_CONVERSION_WORD} </Property>
This format allows us to see the thread number, as well as the traceId and spanId of sleuth (our project relies on sleuth). First, write the test code to see whether the direct simple call and just realize the asynchronous back pressure:
@Log4j2 @RestController public class TestController { @Autowired private TestService testService; @RequestMapping("/test") public Mono<String> test() { log.info("test started"); return Mono.just(testService.simulateIOTest()); } @Service public static class TestService { public String simulateIOTest() { try { //simulate io log.info("simulate start"); TimeUnit.SECONDS.sleep(5); log.info("simulate end"); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; } } }
Call the interface at the same time, check the log, and find:
2019-11-12 09:05:41.595 INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:05:41.596 INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start 2019-11-12 09:05:46.598 INFO [test,26bf995af305ad34,26bf995af305ad34] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end 2019-11-12 09:05:46.635 INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:05:46.635 INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start 2019-11-12 09:05:51.636 INFO [test,620bd553b1e55dcd,620bd553b1e55dcd] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end 2019-11-12 09:05:51.643 INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:05:51.643 INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:38]:simulate start 2019-11-12 09:05:56.644 INFO [test,bc17d60861ba1a2a,bc17d60861ba1a2a] [26208] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController$TestService:40]:simulate end
It is obvious that the request is processed serially, because there is only one thread, and this thread is still waiting for the request to be processed. This is not in line with the Reactor model. The thread XNIO-2 task-1 processing http requests should not wait for the request to be processed, but directly process the next request. Replace Mono.just(testService.simulateIOTest()) with Mono. Fromcallable (() - > testservice. Simulateiotest()) and other similar effects. Here, you must use other thread pools to process the actual request. At the end of processing, fill the results in the outermost Mono. In this way, considering that the code is clean and does not use pure callback writing, it is required that each calling method returns a Future type. Here we return to completable Future.
@Log4j2 @RestController public class TestController { @Autowired private TestService testService; @RequestMapping("/test") public Mono<String> test() { log.info("test started"); return Mono.create(stringMonoSink -> testService.simulateIOTest().thenApply(s -> { log.info("apply"); //Fill in successful results stringMonoSink.success(s); return s; })); } @Service public static class TestService { public CompletableFuture<String> simulateIOTest() { return CompletableFuture.supplyAsync(() -> { try { //simulate io log.info("simulate start"); TimeUnit.SECONDS.sleep(5); log.info("simulate end"); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); } } }
The result is:
2019-11-12 09:18:03.457 INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:03.458 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:04.155 INFO [test,c654462e159fd43e,c654462e159fd43e] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:04.156 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:04.962 INFO [test,8366a95d002ca25a,8366a95d002ca25a] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:04.963 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:05.756 INFO [test,5f851d9e2ef49f14,5f851d9e2ef49f14] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:05.757 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:18:08.459 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:08.459 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply 2019-11-12 09:18:09.156 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:09.156 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-5][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply 2019-11-12 09:18:09.964 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:09.964 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-7][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply 2019-11-12 09:18:10.757 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end 2019-11-12 09:18:10.757 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-9][com.hopegaming.syringe.api.frontend.order.TestController:28]:apply
In this way, the Reactor model is really implemented.
3. Complete future thread pool management and log tracking
Completabilefuture can specify thread pool or not. If it is not specified above, then the thread pool used is Java8, which will start a commonfork joinpool with the size of CPU core minus one by default to execute. If it needs to be specified, basically each method can be passed to a thread pool as an external parameter.
As a best practice, as long as it involves IO, it is left to different thread pools, and different types of IO have different thread pools. For example, thread pools for database IO, RPC, cache access, and so on.
There is also a problem here, that is, asynchronous calls result in the loss of spanId and traceId. For example, the above example:
2019-11-12 09:18:03.457 INFO [test,8d6eddc9cc80612f,8d6eddc9cc80612f] [22892] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:26]:test started 2019-11-12 09:18:03.458 INFO [test,,] [22892] [ForkJoinPool.commonPool-worker-3][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start
8d6eddc9cc80612f is lost, which makes it infeasible for microservice call chain log tracking. Therefore, for asynchronous code, we also need to force spanId and traceId to be set before asynchronous call.
To sum up, the modified code is:
@Log4j2 @RestController public class TestController { @Autowired private TestService testService; @RequestMapping("/test") public Mono<String> test() { log.info("test started"); return Mono.fromFuture(testService.simulateIOTest()); } @Service public static class TestService { @Autowired private Tracer tracer; ThreadFactory build = (new ThreadFactoryBuilder()).setNameFormat("test_service_executor-%d").build(); private ExecutorService executorService = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(131072), build, new ThreadPoolExecutor.AbortPolicy()); public CompletableFuture<String> simulateIOTest() { Span span = tracer.currentSpan(); return CompletableFuture.supplyAsync(() -> { try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { //simulate io log.info("simulate start"); TimeUnit.SECONDS.sleep(5); log.info("simulate end"); return "hello"; } catch (Exception e) { throw new RuntimeException(e); } }, executorService); } } }
The result is:
2019-11-12 09:44:30.953 INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [XNIO-2 task-1][com.hopegaming.syringe.api.frontend.order.TestController:28]:test started 2019-11-12 09:44:30.991 INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:44]:simulate start 2019-11-12 09:44:35.991 INFO [test,bc1ba4169e037577,bc1ba4169e037577] [2796] [test_service_executor-0][com.hopegaming.syringe.api.frontend.order.TestController$TestService:46]:simulate end
3. What are the similarities and differences with Vertx?
In fact, from the design point of view, the basic idea is the same. For any IO operation, if there is a native asynchronous client (return is a Future), use the Future encapsulation to give it to other thread pools for processing, without affecting the http request thread to accept other requests.
The main differences are:
- WebFlux framework does not involve thread pool. Vertx even encapsulates asynchronous thread pool as a workerexector class of vertx.
- WebFlux asynchronous Future uses Java Native, and the Vetx framework encapsulates Future itself.
- The combination of WebFlux and Spring is more perfect, but the Spring ecosystem does not provide a native NIO client. For example, the NIO mysql client that implements the MySQL protocol stack has Vertx, but whether it is robust remains to be verified. These further limit the performance of WebFlux.
- Vertx is a cross language Framework