Netty ten thousand words explanation - Advanced

Stick wrap and half wrap

sliding window

In order to improve the efficiency of TCP transmission, the transmitter and receiver will maintain an array of dynamic storage and maintain a certain number of TCP connections.

Phenomenon analysis

Sticky bag

  • Phenomenon: send abc def and receive abcdef

  • reason:

    • Application layer: the receiver ByteBuf setting is large (Netty default 1024)

    • Sliding window: if the receiver fails to process data in time and the window size is large enough, multiple packets will be pasted if cached in the sliding window

    • Nagle algorithm 1: it will cause sticky packets

Half package

  • Phenomenon: send abcdef and receive abc def

  • reason:

    • Application layer: the receiver ByteBuf is less than the amount of data actually sent

    • Sliding window: the window size is small and the sender's message is large. Insufficient window capacity can only send part of the data, resulting in half a packet

    • MSS limit: when the sent data exceeds the MSS limit, the data will be sent in segments, resulting in half packets

Essential cause: TCP borderless

Solution

  1. The client closes the connection in time after sending data each time

     @Override // Triggered after the Channel connection is successfully established
     public void channelActive(ChannelHandlerContext ctx) {
         ByteBuf buf = ctx.alloc().buffer(16);
         buf.writeBytes(new byte[] {'1','2','3','4','5','6','7','8','9','0','a','s','d','f','g','h'});
         ctx.writeAndFlush(buf);
         ctx.channel().close();
     }
  2. The server adopts fixed length reception

     @Override
     protected void initChannel(NioSocketChannel channel) {
         channel.pipeline().addLast(new StringDecoder())
             .addLast(new FixedLengthFrameDecoder(16)) // Decode every 16 bytes
             .addLast(new LoggingHandler(LogLevel.DEBUG));
     }
  3. The server uses newline character to divide the reception

     @Override
     protected void initChannel(NioSocketChannel channel) {
         channel.pipeline().addLast(new StringDecoder())
             .addLast(new LineBasedFrameDecoder(1024)) // Maximum search length limit
             .addLast(new LoggingHandler(LogLevel.DEBUG));
     }
  4. The server adopts protocol analysis

    When writing data, first declare the data length, and the server directly parses it according to the protocol content

         public static void main(String[] args) {
             EmbeddedChannel channel = new EmbeddedChannel(
                     new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4/*Length of split data*/),
                     new StringDecoder(StandardCharsets.UTF_8),
                     new LoggingHandler(LogLevel.DEBUG)
             );
             ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
             for (int i = 0; i < 3; ++ i) {
                 byte[] bytes = "Hello IllTamer".getBytes(StandardCharsets.UTF_8);
                 buf.writeInt(bytes.length);
                 buf.writeBytes(bytes);
             }
             channel.writeInbound(buf);
         }

Analysis and design of protocol

MyBatis

  The protocol data corresponding to set name IllTamer is as follows * 3 \r\n \r\n set \r\n \r\n name \r\n \r\n IllTamer \r\n. It should be noted that MyBatis is the server and the java application is the client (BootStrap).

     public static void main(String[] args) throws InterruptedException {
         NioEventLoopGroup worker = new NioEventLoopGroup();
         try {
             ChannelFuture future = new Bootstrap()
                     .group(worker)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<NioSocketChannel>() {
                         @Override
                         protected void initChannel(NioSocketChannel channel) throws Exception {
                             channel.pipeline().addLast(new LoggingHandler())
                                     .addLast(new ChannelInboundHandlerAdapter() {
                                         @Override
                                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                             ByteBuf buffer = ctx.alloc().buffer();
                                             buffer.writeBytes("*3".getBytes());
                                             buffer.writeBytes(END);
                                             buffer.writeBytes("$3".getBytes());
                                             buffer.writeBytes(END);
                                             buffer.writeBytes("set".getBytes());
                                             buffer.writeBytes(END);
                                             buffer.writeBytes("$4".getBytes());
                                             buffer.writeBytes(END);
                                             buffer.writeBytes("name".getBytes());
                                             buffer.writeBytes(END);
                                             buffer.writeBytes("$8".getBytes());
                                             buffer.writeBytes(END);
                                             buffer.writeBytes("IllTamer".getBytes());
                                             buffer.writeBytes(END);
                                             ctx.writeAndFlush(buffer);
                                         }
 ​
                                         @Override
                                         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                             ByteBuf buf = (ByteBuf) msg;
                                             System.out.println(buf.toString(Charset.defaultCharset()));
                                         }
                                     });
                         }
                     })
                     .connect(new InetSocketAddress("192.168.113.201", 6379))
                     .sync();
             future.channel().closeFuture().sync();
         } catch (InterruptedException e) {
             log.error("error ", e);
         }
     }

Http

Note that the browser sending the request is the client, and the java application is the server (ServerBootStrap).

     public static void main(String[] args) throws InterruptedException {
         NioEventLoopGroup boss = new NioEventLoopGroup();
         NioEventLoopGroup worker = new NioEventLoopGroup();
         try {
             ChannelFuture future = new ServerBootstrap()
                     .group(boss, worker)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<NioSocketChannel>() {
                         @Override
                         protected void initChannel(NioSocketChannel channel) {
                             channel.pipeline()
                                     .addLast(new LoggingHandler(LogLevel.DEBUG))
                                     .addLast(new HttpServerCodec())
                                     .addLast(new ChannelInboundHandlerAdapter() {
                                         @Override
                                         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                             log.debug("Data: {}", msg.getClass());
                                             if (msg instanceof HttpRequest) { // Request header, request line
                                                 HttpRequest request = (HttpRequest) msg;
                                                 log.error(request.uri());
                                                 // Return response
                                                 byte[] messages = "<h1>Hello World !<h1>".getBytes(StandardCharsets.UTF_8);
                                                 DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
                                                 response.headers().setInt(CONTENT_LENGTH, messages.length); // Specify the response data to prevent the client from waiting
                                                 response.content().writeBytes(messages);
                                                 // Write back corresponding
                                                 ctx.writeAndFlush(response);
                                             } else if (msg instanceof HttpContent) { // Request body
 ​
                                             }
                                         }
                                     });
                         }
                     })
                     .bind(25565)
                     .sync();
             future.channel().closeFuture().sync();
         } catch (InterruptedException e) {
             log.error("error ", e);
         }
     }

Custom protocol

Elements of attention

  • Magic number - used to determine whether it is an invalid packet

  • Version number - used to support protocol upgrade

  • Serialization algorithm - the serialization / deserialization method adopted by the message body can be extended by json, protobuf, hessian, etc

  • Instruction type - specifies business-related instructions such as login, registration, etc

  • Request sequence number - provides asynchronous capability for duplex communication

  • Text length

  • Message body

Message interface - message

 public interface Message extends Serializable {
     /**
      * Message type
      * */
     byte getMessageType();
     /**
      * Request serial number
      *  Realize duplex and provide asynchronous capability
      * */
     int getSequenceId();
 }

Custom codec - MessageCodec

 @Slf4j
 public class MessageCodec extends ByteToMessageCodec<Message> {
     @Override
     protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf byteBuf) throws Exception {
         write(message, byteBuf);
     }
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
         list.add(read(byteBuf));
     }
     private static ByteBuf write(Message message, ByteBuf buf) throws IOException {
         byte[] protocol = new byte[] {
                 7,6,5,7, // magic number
                 1, // edition
                 0, // Serialization type jdk 0, json 1
                 message.getMessageType(), // Byte instruction type
         };
         buf.writeBytes(protocol).writeInt(message.getSequenceId());
 ​
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         (new ObjectOutputStream(bos)).writeObject(message);
         byte[] data = bos.toByteArray();
         buf.writeInt(data.length); // Data length
         buf.writeBytes(data); // data
         return buf;
     }
     private static Message read(ByteBuf buf) throws IOException, ClassNotFoundException {
         int magic = buf.readInt();
         byte version = buf.readByte();
         byte serializerType = buf.readByte();
         byte messageType = buf.readByte();
         int sequenceId = buf.readInt();
         int length = buf.readInt();
         byte[] data = new byte[length];
         buf.readBytes(data, 0, length);
         Message message = null;
         if (serializerType == 0) { // jdk
             ObjectInputStream input = new ObjectInputStream(new ByteArrayInputStream(data));
             message = (Message) input.readObject();
         }
         log.debug("magic {}, version {}, serializerType {}, messageType {}, sequenceId {}, length {}, message {}", magic, version, serializerType, messageType, sequenceId, length, message);
         return message;
     }
 }

Test end

 public static void main(String[] args) throws Exception {
     EmbeddedChannel channel = new EmbeddedChannel(
         // lengthFieldOffset the offset length of the data length
         new LengthFieldBasedFrameDecoder(1024, 11, 4),
         new LoggingHandler(),
         new MessageCodec()
     );
     Message message = new ExampleMessage("IllTamer", 6);
     // encode
     channel.writeOutbound(message);
     // decode
     ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
     new MessageCodec().encode(null, message, buf);
     // Inbound
     channel.writeInbound(buf);
 }

@Sharable

The ChannelHandler stateless processor ID that comes with Netty. Only when the processor does not record status information and can be shared between channels will this annotation be provided.

Idle connection & heartbeat packet

Detect idle connections

IdleStateHandler - determines whether the read and write time is too long

 @Override
 protected void initChannel(NioSocketChannel ch) throws Exception {
     // Exceeding the detection time will trigger the IdleStateEvent event event
     ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
     ch.pipeline().addLast(new ChannelDuplexHandler() {
         @Override // Used to respond to special events
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
             IdleStateEvent event = (IdleStateEvent) evt;
             if (event.state() == IdleState.READER_IDLE) {
                 System.out.println("5s Read free!");
             }
         }
     });
 }

Heartbeat

In practical applications, it often happens that the client is connected normally but does not send data for a long time. At this time, we need to send heartbeat packets to the client regularly to judge whether the client cannot communicate with the server normally due to network delay / data blocking, so as to judge whether to disconnect from the client.

Keywords: Java Back-end Framework

Added by truman on Thu, 18 Nov 2021 01:38:06 +0200