07-Spring5 WebFlux responsive programming

Introduction to spring Webflux

brief introduction

  1. Spring WebFlux is a new module added to spring 5 for Web development. Its function is similar to that of spring MVC. WebFlux uses a popular responsive programming framework
  1. Traditional Web frameworks, such as spring MVC, are based on Servlet containers. WebFlux is an asynchronous and non blocking framework. The asynchronous and non blocking framework is in Servlet 3 1 later. The core is implemented based on the relevant API of Reactor

What is asynchronous non blocking
  1. Asynchronous and synchronous
  2. Non blocking and blocking
  3. The above is for different objects

Asynchrony and synchronization are aimed at the dispatcher. If the caller sends a request and waits for the other party's response before doing other things, it is synchronization. If he does other things without waiting for the other party's response after sending a request, it is asynchrony

Blocking and non blocking are for the schedulee. After receiving the request, the schedulee gives feedback after completing the requested task, which is blocking. After receiving the request, he gives feedback immediately and then does something, which is non blocking

WebFlux features
  1. Non blocking: under limited resources, improve system throughput and scalability, and realize responsive programming based on Reactor
  2. Functional programming: the spring 5 framework is based on Java 8, and Webflux uses Java 8 functional programming to route requests

Compare SpringMvc

  1. First: both frameworks can use annotation and run in containers such as Tomcat
  2. Second: Spring MVC adopts command programming and WebFlux adopts asynchronous response programming

Responsive programming (Java implementation)

What is responsive programming

Responsive programming is a programming paradigm oriented to data flow and change propagation, which means that static or dynamic data flow can be easily expressed in the programming language, and the relevant calculation model will automatically propagate the changed value through the data flow. Spreadsheet program is an example of responsive programming. Cells can contain literal value or formulas similar to "= B1+C1", The value of the cell containing the formula will change according to the change of other cells

Java 8 and earlier

The provided "Observer mode" includes two classes: Observer and Observable

New SpringBoot project

Create a new SpringBoot project. I'll just build a module

ok

Write code

package com.dance.webflux.reactor8;

import lombok.extern.slf4j.Slf4j;

import java.util.Observable;
import java.util.Observer;

@Slf4j
public class ObserverDemo extends Observable {

    public static void main(String[] args) {

        ObserverDemo observerDemo = new ObserverDemo();

        // Add observer
        observerDemo.addObserver((o, arg) -> {
            log.info("o:{},arg:{}",o,arg);
            System.out.println("undergo changes");
        });

        // Add observer
        observerDemo.addObserver((o, arg) -> {
            log.info("o:{},arg:{}",o,arg);
            System.out.println("Manual observer notification,Ready to change");
        });

        // Change
        observerDemo.setChanged();

        // Notify the observer
        observerDemo.notifyObservers();

    }

}

results of enforcement

23:00:10.650 [main] INFO com.dance.webflux.reactor8.ObserverDemo - o:com.dance.webflux.reactor8.ObserverDemo@504bae78,arg:null
 Manual observer notification,Ready to change
23:00:10.663 [main] INFO com.dance.webflux.reactor8.ObserverDemo - o:com.dance.webflux.reactor8.ObserverDemo@504bae78,arg:null
 undergo changes

Responsive programming (implemented by Reactor)

brief introduction

  1. In Reactive programming operations, Reactor is a framework that meets the Reactive specification
  2. Reactor has two core classes, Mono and Flux. These two classes implement the interface Publisher and provide rich operators. The Flux object implements the Publisher and returns N elements. The Mono object implements the Publisher and returns 1 or 0 elements
  3. Flux and Mono are publishers of data streams. Using flux and Mono, three data signals can be sent, "element value", "error signal" and "completion signal". Both error signal and completion signal represent termination signals. Termination signals are used to tell subscribers that the data stream is over. Error signals terminate the data stream and pass error information to subscribers

Code demonstration Flux and Mono

Introduce dependency

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.1.5.RELEASE</version>
</dependency>

Write code

package com.dance.webflux.reactor8;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class TestReactor {

    public static void main(String[] args) {
        /*
         * just Method direct declaration
         */
        Flux flux = Flux.just(1, 2, 3);

        Mono mono = Mono.just(1);

        // Other methods
        // array
        Integer[] array = new Integer[]{1,2,3,4};
        Flux flux1 = Flux.fromArray(array);
        // aggregate
        List array2 = Arrays.asList(1, 2, 3, 4);
        Flux flux2 = Flux.fromIterable(array2);
        // Stream
        Stream stream = array2.stream();
        Flux flux3 = Flux.fromStream(stream);
        // Supply type functional interface Stream
        Flux tFlux = Flux.fromStream(() -> Stream.of(1, 2, 3));
        
    }

}

Three signal characteristics

  1. Both error signal and completion signal are termination signals and cannot coexist
  2. If no element value is sent, but an error or completion signal is sent directly, it indicates an empty data stream
  3. If there is no error signal and no completion signal, it indicates an infinite data stream

Really, go and have a look at Java 8, or you really can't understand it

Subscription data stream

Calling just or other methods only declares the data flow. The data flow is not issued. The data flow will be triggered only after subscription. Nothing will happen without subscription

// Subscription data stream
flux.subscribe(x -> System.out.print(x + " "));
System.out.println();
mono.subscribe(System.out::println);

results of enforcement

1 2 3 
1

Operator

Operations on data are called operators, such as factory pipelines

First: map element to new element (from stream API)

The second flatmap element maps to a stream

Convert each element into a stream, and merge multiple streams after conversion into one stream

Spring Webflux execution process and core API s

Spring Webflux is based on Reactor. The default container is netty. Netty is a high-performance NIO framework and an asynchronous non blocking framework

Netty

BIO

Execution process

The execution process of spring Webflux is similar to that of spring MVC

The spring Webflux core controller DispatchHandler implements the interface WebHandler

Interface WebHandler has an interface

Because no dependency is added, it cannot be found in the IDEA (I've been looking for it for a long time)

Add WebFlux dependency

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

FAQ

If adding dependencies fails, you can add alicloud warehouses

<repositories>
  <repository>
    <id>aliyun</id>
    <url>https://maven.aliyun.com/repository/public</url>
    <releases>
      <enabled>true</enabled>
    </releases>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>
<pluginRepositories>
  <pluginRepository>
    <id>aliyun-plugin</id>
    <url>https://maven.aliyun.com/repository/public</url>
    <releases>
      <enabled>true</enabled>
    </releases>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </pluginRepository>
</pluginRepositories>

WebHandler

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.web.server;

import reactor.core.publisher.Mono;

public interface WebHandler {
    Mono handle(ServerWebExchange exchange);
}

You can see that Mono is returned, one or zero elements

DispatchHandler implementation

@Override
public Mono handle(ServerWebExchange exchange) { // http request
    // Judge whether the request mapping set is empty
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    // Get the Request to judge whether there is pre-processing
    if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
        return handlePreFlight(exchange);
    }
    /**
    * Convert mapping set to Flux release(Flux.fromIterable),Get a match from a map in the map collection,
    * Then judge whether it is empty after matching,Null return not found(switchIfEmpty),Then fluidization execution handler processor
    * (invokeHandler),Then perform the return result processing(handleResult) Returns one or zero elements Mono
    * / 
    return Flux.fromIterable(this.handlerMappings)
        .concatMap(mapping -> mapping.getHandler(exchange))
        .next()
        .switchIfEmpty(createNotFoundError())
        .flatMap(handler -> invokeHandler(exchange, handler))
        .flatMap(result -> handleResult(exchange, result));
}

Component introduction

DispatchHandler: responsible for the processing of requests

HandlerMapping: request mapping processing

HandlerAdapter: request adaptation processing

HandlerResultHandler: response result processing

Functional programming interface

Spring Webflux implements functional programming with two interfaces, routefunction and handler function

Spring Webflux (annotation based programming model)

  1. Spring Webflux can be implemented in two ways: annotated programming model and functional programming model
  2. Using the annotation programming model is similar to that used by spring MVC before. You only need to configure the relevant dependencies into the project. SpringBoot automatically configures the relevant running containers. By default, the Netty server is used

Create a SpringBoot project and introduce Webflux starter

Already created and introduced

Profile modification

application.properties

server.port=8081

Create entity class

package com.dance.webflux.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private String name;
    private String sex;
    private Integer age;
}

Create Service

New UserService

package com.dance.webflux.service;

import com.dance.webflux.entity.User;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface UserService {

    /**
     * Get user information according to ID
     * @param id id
     * @return user
     */
    Mono getUserById(int id);

    /**
     * Get all user information
     * @return User information
     */
    Flux getAllUser();

    /**
     * Save user information
     * @param user User information
     * @return void
     */
    Mono saveUserInfo(Mono user);

}

Implementation interface

package com.dance.webflux.service.impl;

import com.dance.webflux.entity.User;
import com.dance.webflux.service.UserService;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@Service
public class UserServiceImpl implements UserService {

    /**
     * Create a map set to store data and simulate the database
     */
    private final Map<Integer,User> users = new HashMap<>();
    {
        this.users.put(1,new User("lucy","nan",20));
        this.users.put(2,new User("mary","nv",30));
        this.users.put(3,new User("jack","nv",50));
    }

    @Override
    public Mono getUserById(int id) {
        // Return one or 0 elements
        return Mono.justOrEmpty(users.get(id));
    }

    @Override
    public Flux getAllUser() {
        // Returns multiple elements and all values
        return Flux.fromIterable(this.users.values());
    }

    @Override
    public Mono saveUserInfo(Mono user) {
        // Return null after processing data
        return user.doOnNext(x -> {
            int key = users.size() + 1;
            users.put(key,x);
        }).thenEmpty(Mono.empty());
    }
}

Create Controller

package com.dance.webflux.controller;

import com.dance.webflux.entity.User;
import com.dance.webflux.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class UserController {

    @Autowired
    private UserService userService;

    @GetMapping("/getUserById/{id}")
    public Mono getUserById(@PathVariable Integer id){
        return userService.getUserById(id);
    }

    @GetMapping("/getAllUser")
    public Flux getAllUser(){
        return userService.getAllUser();
    }

    @PostMapping("/saveUserInfo")
    public Mono saveUserInfo(@RequestBody User user){
        return userService.saveUserInfo(Mono.just(user));
    }

}

Start project

FAQ, I reported an error when starting. After troubleshooting, it should be the dependency introduced to see the use of classes, resulting in JAR package conflict

<dependency>
    <groupId>io.projectreactor</groupId>    
    <artifactId>reactor-core</artifactId>
    <version>3.0.3.RELEASE</version>
</dependency>

After commenting out the JAR package, restart it

10:46:27.550 [Thread-1] DEBUG org.springframework.boot.devtools.restart.classloader.RestartClassLoader - Created RestartClassLoader org.springframework.boot.devtools.restart.classloader.RestartClassLoader@7d031891

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.1)

2021-12-13 10:46:27.952  INFO 21120 --- [  restartedMain] com.dance.webflux.WebfluxApplication     : Starting WebfluxApplication using Java 1.8.0_162 on ZB-PF2P9QVH with PID 21120 (D:\zhangyugen@JD.com\coding\Spring5\webflux\target\classes started by ext.caiyuanqing in D:\zhangyugen@JD.com\coding\Spring5)
2021-12-13 10:46:27.954  INFO 21120 --- [  restartedMain] com.dance.webflux.WebfluxApplication     : No active profile set, falling back to default profiles: default
2021-12-13 10:46:27.996  INFO 21120 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2021-12-13 10:46:27.996  INFO 21120 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2021-12-13 10:46:28.777  INFO 21120 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-12-13 10:46:29.151  INFO 21120 --- [  restartedMain] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8081
2021-12-13 10:46:29.158  INFO 21120 --- [  restartedMain] com.dance.webflux.WebfluxApplication     : Started WebfluxApplication in 1.598 seconds (JVM running for 2.507)

Test interface

Get user information according to ID

http://localhost:8081/getUserById/1

Query all users

http://localhost:8081/getAllUser

explain

Implemented in SpringMvc mode and synchronized blocking mode, based on SpringMvc+Servlet+Tomcat

Spring Webflux is implemented in an asynchronous and non blocking manner, based on spring Webflux + reactor + netty

Spring Webflux (based on functional programming model)

  1. When using the functional programming model, you need to initialize the server yourself
  2. Based on the functional programming model, there are two core interfaces, routerfunction (to realize the routing function and forward the request to the corresponding Handler) and handlerfunction (to process the request and generate the response). The core task defines the implementation of the two functional interfaces and starts the required server
  3. Spring Webflux requests and responses are no longer ServletRequest and ServletResponse, but ServerRequest and ServerResponse

Create handler (specific implementation method)

package com.dance.webflux.handler;

import com.dance.webflux.entity.User;
import com.dance.webflux.service.UserService;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class UserHandler {

    private final UserService userService;

    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    /**
     * Query user information according to ID
     * @param serverRequest Request body
     * @return Responder
     */
    public Mono getUserById(ServerRequest serverRequest){
        int id = Integer.parseInt(serverRequest.pathVariable("id"));
        Mono userById = userService.getUserById(id);
        return userById.flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(user).switchIfEmpty(ServerResponse.notFound().build()));
    }

    /**
     * Query all users
     * @param serverRequest Request body
     * @return Responder
     */
    public Mono getUserAll(ServerRequest serverRequest){
        Flux allUser = userService.getAllUser();
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(allUser,User.class);
    }

    /**
     * Save user information
     * @param serverRequest Request body
     * @return Responder
     */
    public Mono saveUserInfo(ServerRequest serverRequest){
        Mono userMono = serverRequest.bodyToMono(User.class);
        return ServerResponse.ok().build(userService.saveUserInfo(userMono));
    }
}

Create servers (routes and adapters)

package com.dance.webflux;

import com.dance.webflux.handler.UserHandler;
import com.dance.webflux.service.impl.UserServiceImpl;
import org.springframework.boot.autoconfigure.rsocket.RSocketProperties;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.io.IOException;

import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;

public class FunctionServer {

    /**
     * Create route
     * @return Routing function
     */
    public RouterFunction routingFunction(){
        // Create UserService
        UserServiceImpl userService = new UserServiceImpl();
        // Create processor
        UserHandler userHandler = new UserHandler(userService);
        // Set route
        return RouterFunctions
                .route(GET("/user/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUserById)
                .andRoute(GET("/users").and(accept(MediaType.APPLICATION_JSON)), userHandler::getUserAll)
                .andRoute(POST("/saveUserInfo"), userHandler::saveUserInfo);
    }

    /**
     * Create server complete adaptation
     */
    public void createReactorServer(){
        // Routing and Handler adaptation
        RouterFunction serverResponseRouterFunction = routingFunction();
        // Convert to HttpHandler
        HttpHandler httpHandler = toHttpHandler(serverResponseRouterFunction);
        // Convert to adapter
        ReactorHttpHandlerAdapter reactorHttpHandlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
        // Create HttpServer setup port
        HttpServer httpServer = HttpServer.create().port(8081);
        // Specify processing adapter and bind
        DisposableServer disposableServer = httpServer.handle(reactorHttpHandlerAdapter).bindNow();
    }

    /**
     * start-up
     * @param args parameter
     */
    public static void main(String[] args) throws IOException {
        FunctionServer server = new FunctionServer();
        server.createReactorServer();
        System.out.println("enter to exit");
        System.in.read();

    }

}

Start project

Start the main method

16:06:37.931 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:06:38.773 [main] DEBUG org.springframework.web.server.adapter.HttpWebHandlerAdapter - enableLoggingRequestDetails='false': form data and headers will be masked to prevent unsafe logging of potentially sensitive data
16:06:38.854 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:06:38.877 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:06:38.878 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
16:06:38.883 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:06:38.885 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:06:38.887 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:06:38.888 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
16:06:38.891 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:06:38.894 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
16:06:38.894 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.(long, int): available
16:06:38.895 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:06:38.896 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\EXT~1.CAI\AppData\Local\Temp (java.io.tmpdir)
16:06:38.896 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:06:38.897 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
16:06:38.900 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3760193536 bytes
16:06:38.900 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:06:38.902 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
16:06:38.902 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:06:39.003 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:06:39.003 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:06:39.115 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=8, workerCount=8}
16:06:39.115 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.DefaultPooledConnectionProvider@26275bef
16:06:39.118 [main] DEBUG reactor.netty.resources.DefaultLoopIOUring - Default io_uring support : false
16:06:39.744 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
16:06:39.746 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
16:06:39.768 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:06:39.819 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:06:39.819 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:06:39.836 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
16:06:39.836 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
16:06:39.869 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
16:06:39.961 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 18728 (auto-detected)
16:06:39.965 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:06:39.965 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:06:40.496 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
16:06:40.498 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
16:06:40.971 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:50:56:ff:fe:c0:00:08 (auto-detected)
16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:06:41.012 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:06:41.013 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
16:06:41.028 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:06:41.028 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
16:06:41.028 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:06:41.216 [reactor-http-nio-1] DEBUG reactor.netty.transport.ServerTransport - [40eb46d7, L:/0:0:0:0:0:0:0:0:62011] Bound new server
enter to exit

test

Get user information according to ID

http://localhost:8081/user/1

Get all user information

http://localhost:8081/users

SpringWebFlux(WebClient call)

Write WebClient

package com.dance.webflux;

import com.dance.webflux.entity.User;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

public class FunctionClient {

    public static void main(String[] args) {
        //Call server address
        WebClient webClient = WebClient.create("http://127.0.0.1:8081");
        //Query by id
        String id = "1";
        User userresult = webClient.get().uri("/user/{id}", id).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block();
        System.out.println(userresult);
        //Query all
        Flux results = webClient.get().uri("/users").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(User.class);
        results.map(User::getName).buffer().doOnNext(System.out::println).blockFirst();
    }

}

results of enforcement

16:19:25.382 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:19:25.440 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:19:25.446 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:19:25.447 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 8
16:19:25.449 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:19:25.450 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:19:25.451 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:19:25.452 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available
16:19:25.453 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:19:25.453 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
16:19:25.454 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.(long, int): available
16:19:25.454 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:19:25.454 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\EXT~1.CAI\AppData\Local\Temp (java.io.tmpdir)
16:19:25.455 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:19:25.455 [main] DEBUG io.netty.util.internal.PlatformDependent - Platform: Windows
16:19:25.457 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 3760193536 bytes
16:19:25.457 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:19:25.459 [main] DEBUG io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available
16:19:25.459 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:19:25.517 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=8, workerCount=8}
16:19:25.517 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.DefaultPooledConnectionProvider@13fee20c
16:19:25.635 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:19:25.636 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:19:26.686 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:19:26.687 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:19:27.099 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
16:19:27.100 [main] DEBUG io.netty.util.NetUtil - Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
16:19:27.138 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [32502377] HTTP GET http://127.0.0.1:8081/user/1
16:19:27.158 [main] DEBUG reactor.netty.resources.DefaultLoopIOUring - Default io_uring support : false
16:19:27.682 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : false
16:19:27.683 [main] DEBUG reactor.netty.resources.DefaultLoopKQueue - Default KQueue support : false
16:19:27.690 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:19:27.711 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:19:27.711 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:19:27.720 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
16:19:27.720 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
16:19:27.732 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
16:19:28.183 [main] DEBUG io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider - Default DNS servers: [/10.2.255.200:53, /172.31.36.36:53, /172.31.37.37:53] (sun.net.dns.ResolverConfiguration)
16:19:28.193 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new [http] client pool [PoolFactory{evictionInterval=PT0S, leasingStrategy=fifo, maxConnections=500, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false, pendingAcquireMaxCount=1000, pendingAcquireTimeout=45000}] for [/127.0.0.1:8081]
16:19:28.260 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 25696 (auto-detected)
16:19:28.611 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 00:50:56:ff:fe:c0:00:08 (auto-detected)
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:19:28.640 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
16:19:28.651 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:19:28.652 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
16:19:28.652 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:19:28.681 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [da237854] Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
16:19:28.753 [reactor-http-nio-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
16:19:28.753 [reactor-http-nio-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
16:19:28.754 [reactor-http-nio-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@74ee324f
16:19:28.761 [reactor-http-nio-2] DEBUG reactor.netty.transport.TransportConfig - [da237854] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.httpDecompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:19:28.786 [reactor-http-nio-2] DEBUG reactor.netty.transport.TransportConnector - [da237854] Connecting to [/127.0.0.1:8081].
16:19:28.792 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Registering pool release on close event for channel
16:19:28.792 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Channel connected, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
16:19:28.793 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}, [connected])
16:19:28.806 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [configured])
16:19:28.808 [reactor-http-nio-2] DEBUG reactor.netty.http.client.HttpClientConnect - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Handler is being applied: {uri=http://127.0.0.1:8081/user/1, method=GET}
16:19:28.810 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/user/1, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [request_prepared])
16:19:28.834 [reactor-http-nio-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
16:19:28.834 [reactor-http-nio-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:19:28.834 [reactor-http-nio-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:19:28.834 [reactor-http-nio-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:19:28.834 [reactor-http-nio-2] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.delayedQueue.ratio: 8
16:19:28.852 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/user/1, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [request_sent])
16:19:28.860 [reactor-http-nio-2] DEBUG io.netty.handler.codec.compression.Brotli - brotli4j not in the classpath; Brotli support will be unavailable.
16:19:28.863 [reactor-http-nio-2] DEBUG reactor.netty.http.client.HttpClientOperations - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Received response (auto-read:false) : [Content-Type=application/json, content-length=36]
16:19:28.863 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/user/1, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [response_received])
16:19:28.870 [reactor-http-nio-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [32502377] [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Response 200 OK
16:19:28.981 [reactor-http-nio-2] DEBUG reactor.netty.channel.FluxReceive - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:19:28.989 [reactor-http-nio-2] DEBUG reactor.netty.http.client.HttpClientOperations - [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Received last HTTP packet
16:19:29.026 [reactor-http-nio-2] DEBUG org.springframework.http.codec.json.Jackson2JsonDecoder - [32502377] [da237854-1, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Decoded [User(name=lucy, sex=nan, age=20)]
User(name=lucy, sex=nan, age=20)
16:19:29.026 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/user/1, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [response_completed])
16:19:29.026 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/user/1, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [disconnecting])
16:19:29.026 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Releasing channel
16:19:29.030 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Channel cleaned, now: 0 active connections, 1 inactive connections and 0 pending acquire requests.
16:19:29.030 [main] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [58ce9668] HTTP GET http://127.0.0.1:8081/users
16:19:29.032 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Channel acquired, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
16:19:29.032 [reactor-http-nio-2] DEBUG reactor.netty.http.client.HttpClientConnect - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Handler is being applied: {uri=http://127.0.0.1:8081/users, method=GET}
16:19:29.032 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/users, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [request_prepared])
16:19:29.033 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/users, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [request_sent])
16:19:29.037 [reactor-http-nio-2] DEBUG reactor.netty.http.client.HttpClientOperations - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Received response (auto-read:false) : [transfer-encoding=chunked, Content-Type=application/json]
16:19:29.037 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/users, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [response_received])
16:19:29.038 [reactor-http-nio-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [58ce9668] [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Response 200 OK
16:19:29.058 [reactor-http-nio-2] DEBUG reactor.netty.channel.FluxReceive - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:19:29.064 [reactor-http-nio-2] DEBUG org.springframework.http.codec.json.Jackson2JsonDecoder - [58ce9668] [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Decoded [User(name=lucy, sex=nan, age=20)]
16:19:29.064 [reactor-http-nio-2] DEBUG org.springframework.http.codec.json.Jackson2JsonDecoder - [58ce9668] [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Decoded [User(name=mary, sex=nv, age=30)]
16:19:29.065 [reactor-http-nio-2] DEBUG org.springframework.http.codec.json.Jackson2JsonDecoder - [58ce9668] [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Decoded [User(name=jack, sex=nv, age=50)]
16:19:29.065 [reactor-http-nio-2] DEBUG reactor.netty.http.client.HttpClientOperations - [da237854-2, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Received last HTTP packet
[lucy, mary, jack]
16:19:29.066 [reactor-http-nio-2] DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions - [58ce9668] Cancel signal (to close connection)
16:19:29.066 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/users, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [response_completed])
16:19:29.066 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] onStateChange(GET{uri=/users, connection=PooledConnection{channel=[id: 0xda237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081]}}, [disconnecting])
16:19:29.066 [reactor-http-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Releasing channel
16:19:29.066 [reactor-http-nio-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [da237854, L:/127.0.0.1:62998 - R:/127.0.0.1:8081] Channel cleaned, now: 0 active connections, 1 inactive connections and 0 pending acquire requests.

Added by bUcKl3 on Tue, 14 Dec 2021 14:51:30 +0200