Netty communication technology

1, Introduction

1, Overview

Netty is an open source framework provided by JBOSS. In essence, it is a network application framework. It provides a non blocking and event driven network application framework and tools to facilitate the rapid development of high-performance and reliable network server and client.

2, Core architecture

        

The figure above is the core architecture of netty, which is intercepted from the official website. If you are interested, you can go and have a look

Core:

  1. Extensible event model
  2. Unified communication API simplifies communication coding
  3. Zero copy and rich byte buffer

Transport service:

  1. Support Socket and Datagram (Datagram)
  2. HTTP transport
  3. In VM pipe (pipeline protocol, a process of JVM)

Protocol support:

  1. HTTP and WebSocket
  2. SSL secure socket protocol support
  3. Google Protobuf (serialization framework)
  4. Support zlib and gzip compression
  5. Support the transmission of large files
  6. RTSP (real time streaming protocol, an application layer protocol in TCP/IP protocol system)
  7. Support binary protocol and provide complete unit test

3, Why use Netty instead of Java Native NIO

Netty mainly uses NIO, and has made some implementations and upgrades based on NIO in JDK

There are four main points:

  1. Netty's API is more developer friendly, and the API function in JDK is weak and complex (for example, ByteBuffer is changed to ByteBuffer)
  2. Netty adopts the Reactor thread model, which can ensure its own thread safety
  3. Netty can achieve high availability and solve some transmission problems, such as packet sticking, half packet, open circuit reconnection, etc
  4. Bug is solved, such as epoll bug in NIO of JDK

4, Projects using Netty

  • Database: Cassandra
  • Big data processing: Spark, Hadoop
  • Message Queue: RocketMQ
  • Retrieval: Elasticsearch
  • Framework: gRPC, Apache Dubbo, spring 5 (responsive programming WebFlux)
  • Distributed coordinator: ZooKeeper
  • Tool class: async HTTP client

2, Reactor model

The Reactor thread model is an idea that does not belong to Java or Netty. It defines three roles

  • Reactor: used to listen and allocate events and assign I/O events to corresponding handlers. New events include connection ready, read ready, and write ready.
  • Acceptor: handles client connections and assigns them to the processor chain (it can be simply understood as ServerSocketChannel for the time being)
  • Handler: bind itself to events, perform non blocking read and write tasks, read in from the channel, complete processing business logic, and then write the results to the channel

Three NIO patterns are defined through these three roles

Single Reactor single thread mode

 

All requests received and data processed are executed by a thread, so after a certain number, the performance will decline.

Single Reactor multithreading mode

 

 

Based on the just single Reactor single thread mode, we extract the long-time codec and Business Computing, and establish a thread pool for processing, which can improve the performance, but it is not the optimal solution.

Master slave Reactor multithreading model

 

This time, we will make adjustments based on the above model again. The ServerSocketChannel specifically responsible for receiving the connection will open another Reactor to call. This Reactor is called the master; The master Reactor establishes the Channel on the slave Reactor that is specially responsible for reading and writing, which is the so-called 1+N+M (one listening thread is responsible for listening to new sockets; n IO threads are responsible for reading and writing sockets; M worker threads are responsible for processing data).

Workflow:

  1. The MainReactor object of the Reactor main thread listens to the connection request of the client through the selector, and processes the client connection event through the Acceptor.
  2. After the Acceptor establishes a socket connection with the client, the MainReactor will assign the connection to the SubReactor.
  3. SubReactor registers the connection to its own Selector queue for listening, and creates a corresponding Handler to handle various events.
  4. When a new event occurs on the connection, SubReactor will call the corresponding Handler for processing.
  5. The Handler reads the request data from the channel and buffer through read, and then distributes it to the Worker for processing.
  6. The Worker will return the result to the Handler after processing the data, and the Handler will send the data to the client through the send request.
  7. One MainReactor can correspond to multiple subreactors.

Advantages:

  1. The responsibilities of each thread are simple and clear. MainReactor only needs to register the connection, and SubReactor is responsible for subsequent business processing.
  2. The interaction between MainReactor and SubReactor is simple. The master only needs to hand over the connection to the slave, and the slave does not need to return data.
  3. Multiple subreactors can handle highly concurrent services.

3, Netty's implementation of Reactor

In this part of Netty, you can see that a lot of pooling ideas are used

Workflow:

  1. Netty provides two thread pools, a BossGroup and a WorkerGroup. Each pool has an EventLoop (equivalent to a thread, which can be NIO, BIO and AIO)
  2. Each EventLoop contains a Selector and a TaskQueue
  3. Each BossEventLoop is responsible for the following three things
  4. ① select: poll the accept event on the registered ServerSocketChannel
  5. ② processSeleckedKeys: connect with the client, create a SocketChannel, and register it with a WorkerEventLoop
  6. ③ runAllTasks: continue processing other events
  7. Each WorkerEventLoop is also responsible for the following three things
  8. ① select: poll the read/write events on the registered SocketChannel
  9. ② processSeleckedKeys: process the corresponding data on the PipeLine on the corresponding SocketChannel
  10. ③ runAllTasks: continue processing other events in the queue

ChannelPipeline and ChannelHandler

As shown in the figure, channelpipeline is the container of ChannelHandler, and each SocketChannel will be bound with a channelpipeline. If this is a server program, we call the data inbound for reading and processing, which needs to be processed by a series of handlers; If the server wants to write back data to the client, it also needs to go through a series of Handler processing, which is called outbound.

ChannelHandler is divided into outbound and inbound processors, as well as hybrid processors that can handle both outbound and inbound

IV. example code used by Netty

Server

public class NettyServer {
    public static void main(String[] args) {
        NettyServer server = new NettyServer();
        server.start(8888);
    }

    private void start(int port) {
        // Create a thread pool
        EventLoopGroup boss = new NioEventLoopGroup(1);
        // Create slave thread pool
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            // Create a server-side boot class
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    // Configure the primary channel of the server
                    .channel(NioServerSocketChannel.class)
                    // Configure server handler
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // Configure the slave handler, that is, the handler connected to the server
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new ServerInboundHandler1());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

client

public class NettyClient {
    public static void main(String[] args) {
        NettyClient client = new NettyClient();
        client.start("127.0.0.1",8888);
    }

    private void start(String host, int port) {
        EventLoopGroup loopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(loopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                        }
                    });
            // Connect server
            ChannelFuture future = bootstrap.connect(host, port).sync();
            // The client sends data to the server
            Channel channel = future.channel();
            String msg= "I am Netty client, Did you recieve it??";
            ByteBuf buffer = channel.alloc().buffer();
            buffer.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
            channel.writeAndFlush(buffer);

            // Waiting to close
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            loopGroup.shutdownGracefully();
        }
    }
}

Inbound processor

@Slf4j
public class ServerInboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("ServerInboundHandler1 channelActive Yes");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] dst=new byte[buf.readableBytes()];
        buf.readBytes(dst);
        String s = new String(dst, Charset.defaultCharset());
        System.out.println(s);
        log.info("Read data from client"+s);
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

Keywords: Java Netty Distribution NIO

Added by RobertSubnet on Thu, 06 Jan 2022 12:02:11 +0200