[Netty] Netty's thread model and simple usage

Netty's threading model

Reactor model

Netty's model is based on Reactor mode, so we must know Reactor first.

The Reactor model is also called the Dispatch model. When one or more requests are sent to the server at the same time, the server synchronously assigns them to the processing threads of each request.

Three roles of Reactor model

  • Acceptor: handles the new connection of the client and assigns the request to the processing chain
  • Reactor: it is responsible for listening and allocating events and allocating I/O events to the corresponding Handler
  • Handler: event processing, such as encoding, business processing, decoding, etc

Reactor's thread model

There are three models: single Reactor single thread model, single Reactor multi thread model and master-slave Reactor multi thread model.
Netty uses the master-slave Reactor multithreading model.

1. Single Reactor single thread model

Under the thread model, all request connection establishment, I/O reading and writing and business processing are completed in one thread. If time-consuming operations occur in business processing, because all operations on a thread are synchronized, all requests will be delayed and blocked.

2. Single Reactor multithreading model

In order to prevent blocking, under the thread model, the request connection establishment (including authorization and authentication) and I/O reading and writing are completed in a Reactor thread. In addition, the business processing is completed asynchronously in a thread pool, and then written back after processing.

3. Master slave Reactor multithreading model

Because a single Reactor can also drain the ability of CPU multi-core, multiple Reactor threads can be established. Under this thread model, there is a master-slave Reactor. The primary Reactor is used to establish the request connection (including authorization and authentication). The secondary reactors are used to process I/O read and write. The business processing is also processed asynchronously in a thread pool.

Netty's threading model

Membership

Netty thread model consists of ServerBootStrap, NioEventLoopGroup and its components NioEventLoop, NioChannel, ChannelPipeline, ChannelHandler and so on.

Netty is implemented using the master-slave Reactor multithreading model. The master Reactor corresponds to BossGroup and the slave Reactor corresponds to WorkerGroup. BossGroup is used to receive the connection and register the established connection with the WorkerGroup through the Channel. When the IO event is triggered, it is handled by the Pipeline, and the Pipeline is actually handled by the corresponding Handler.

EventLoop

Event circulator

From the name, we can know that it is actually a circular process. It is equivalent to the pair of code segments in the while(true) of the previous nio model.

EvnetLoop mainly consists of a Selector multiplexer to handle IO events and a TaskQueue to store submitted tasks.

The startup method of EventLoop is not to start at the beginning. When a task is submitted, start the processing task and keep running.

EventLoop related source code (deleted)
 @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:

                    case SelectStrategy.BUSY_WAIT:

                    case SelectStrategy.SELECT:
                }
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
            }
        }
    }
Submit task
    @Test
    public void test() {
        //The number of threads to build EventLoopGroup is 1, which means there is only one EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        //The following proves that eventLoop is a thread pool, and the submitted tasks are processed by the same thread
        group.execute(() -> System.out.println("L_MaFia create the EventLoopGroup " + Thread.currentThread().getId()));
        Future<?> submit = group.submit(() -> System.out.println("submit task " + Thread.currentThread().getId()));
        submit.addListener(future -> System.out.println("Finished the task"+Thread.currentThread().getId()));
        Scanner scanner = new Scanner(System.in);
        scanner.nextInt();
        group.shutdownGracefully();
    }
Handle IO events of Channel
  • Handle Channel related IO events. Such as pipeline registration. Calling the register method will eventually call the register method in NIO for registration, but Netty has been encapsulated and handled the problem of thread safety.
  • The original java Native NIO Channel is encapsulated into NioChannel. Of course, the bottom layer is still calling NIO Channel. Originally, the processing of read-write events in Channel was encapsulated into Channelhandler for processing, and the concept of Pipeline was introduced.
ChannelFuture register = group.register(channel);
see NioChannel Usage code

Channel

netty encapsulates the native Channel of java. The concepts of Pipeline and ChannelHandler are added to it. The specific IO event processing of Channel is handled through Pipeline and ChannelHandler (in Pipeline).

NioChannel usage
  1. Initialize Channel

    The initialization operation is similar to the native NIO, which is to open the pipeline, register the selector, and finally bind the port. However, it should be noted that all operations in NioChannel are completed in EventLoop, so you must register before binding ports.

	NioDatagramChannel channel = new NioDatagramChannel();
	ChannelFuture register = group.register(channel);
	ChannelFuture future = channel.bind(new InetSocketAddress(8080));
  1. Initialize Pipeline

    Native NIO directly traverses the selection set (SeletionKey) and then processes read-write events. It is not safe to directly process read-write events in Netty, but uses ChannelHandler to indirectly process read-write events. Generally speaking, there are multiple steps in reading and writing. Pipeline is provided in Netty to organize these channelhandlers. Pipeline is a linked list container. You can add handlers at the beginning and end through addFirst and addLast.

    SimpleChannelInboundHandler() is a Handler provided by netty to handle read events

            channel.pipeline().addLast(new SimpleChannelInboundHandler() {
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    if (msg instanceof DatagramPacket) {
                        System.out.println(System.currentTimeMillis()+ "_:" + ((DatagramPacket) msg).content().toString(Charset.defaultCharset()));
                    }
                }
            });
    

Test Demo

    @Test
    public void test(){
        //1. Create NioEventGroup
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        //2. Create a Channel
        //The original java Native NIO Channel is encapsulated into NioChannel. Of course, the bottom layer is still calling NIO Channel. Originally, the processing of read-write events in Channel was encapsulated into Channelhandler for processing, and the concept of Pipeline was introduced.
        NioDatagramChannel channel = new NioDatagramChannel();
        //3. Registering a Channel with EventLoopGroup is equivalent to submitting a task to the group and returning a Future as the result callback
        //In order to call IO operations safely, Netty sealed all direct operations on Io into a task and handed it to the IO thread for execution. Therefore, if we call IO through Netty, it will not return immediately
        ChannelFuture register = group.register(channel);
        register.addListener(future -> System.out.println("Complete registration"));
        //4. Bind the Channel to the port
        ChannelFuture future = channel.bind(new InetSocketAddress(8080));
        future.addListener((future1)-> System.out.println("Finish binding"));
        //5. Add a Handler for the pipeline of the Channel
        channel.pipeline().addLast(new SimpleChannelInboundHandler() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof DatagramPacket) {
                    System.out.println(System.currentTimeMillis()+ "_:" + ((DatagramPacket) msg).content().toString(Charset.defaultCharset()));
                }
            }
        });
        Scanner scanner = new Scanner(System.in);
        scanner.nextInt();
        group.shutdownGracefully();
    }

General Netty usage

In normal usage, it is not necessary to always initialize the main Channel, but also the sub Channel. Netty provides ServerBootStrap, which encapsulates the above operations such as registration, binding and initialization. Simplifies calls to the Netty API.

  1. initialization
    Through ServerBootStrap, you can directly set the thread group. The boss group is used to handle the Accept event of NioServerSocketChannel to handle the connection and authentication of requests. The worker group is used to handle I/O read-write events and asynchronous tasks submitted. You can specify chanl Tell bootclass that the pipeline needs to be maintained.
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup(8);
        bootstrap.group(boss, work).channel(NioServerSocketChannel.class);
  1. Set the Pipeline of the sub Pipeline

    Initialize the Pipeline and ChannelInitializer of the sub Pipeline (SocketChannel in TCP) and add the corresponding Handler.

        bootstrap.childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new MyChannelHandler());
            }
        });
  1. Business processing

Here, we inherit SimpleChannelInboundHandler to implement the business processing of read events.

  1. Binding port

    Bind port for ServerBootStrap

        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)).sync();
        future.addListener((future1) -> System.out.println("Finish binding"));
    @Test
    public void test() throws InterruptedException {
        //Specify the pipeline to be opened for automatic registration = = "nioserversocketchannel - > channel(NioServerSocketChannel.class)
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup(8);
        bootstrap.group(boss, work).channel(NioServerSocketChannel.class);

        //5. Add a Handler for the pipeline of the Channel
        bootstrap.childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new MyChannelHandler());
            }
        });
        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)).sync();
        future.addListener((future1) -> System.out.println("Finish binding"));
        Scanner scanner = new Scanner(System.in);
        scanner.nextInt();
    }


    class MyChannelHandler extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                System.out.println(System.currentTimeMillis() + "_:" + ((ByteBuf) msg).toString(Charset.defaultCharset()));
                ChannelFuture channelFuture = ctx.channel().writeAndFlush(Unpooled.copiedBuffer(System.currentTimeMillis() + "_:" + ((ByteBuf) msg).toString(Charset.defaultCharset()), Charset.defaultCharset()));
                channelFuture.addListener(future -> System.out.println("Write success"));
            }
        }
    }

Summary:

  1. Netty uses the master-slave Reactor multithreading model
  2. EventLoop is the detection and allocation of Netty processing requests, IO events and other operations
  3. Netty introduces Channel, Pipeline and ChannelHandler to handle tasks asynchronously
  4. Netty provides ServerBootStrap to simplify pipeline initialization

Keywords: Java Netty Microservices NIO

Added by -Karl- on Tue, 08 Mar 2022 06:29:47 +0200