LengthFieldBasedFrameDecoder custom protocol solves the problems of sticking and unpacking

1, Introduction

1.1 function of lengthfieldbasedframedecoder

The LengthFieldBasedFrameDecoder can customize the length to solve the problem of TCP packet sticking. Therefore, it is also called "basedlength"

1.2 TCP packet sticking and packet sticking

  • TCP sticky packet refers to that several data packets sent by the sender are glued into one packet when they arrive at the receiver. From the perspective of receiving buffer, the header of the latter packet data is immediately followed by the tail of the previous data.

  • After the TCP connection is established, the Client sends multiple messages to the Server. The TCP protocol ensures the data reliability, but it cannot guarantee that the Client sends n packets and the Server receives them according to n packets. The Client side sends n packets, and the Server side may receive n-1 or n+1 packets.

1.3 why does sticking occur

  • Sender reason
    TCP will use the Nagle algorithm by default. The Nagle algorithm mainly does two things: 1) the next packet will be sent only after the previous packet is confirmed; 2) Collect multiple small packets and send them together when an acknowledgement arrives. Therefore, it is the Nagle algorithm that may cause packet sticking by the sender.

  • Receiver reason
    The TCP receiver reads the data packets in the cache mode, and reads the data packets in multiple caches at one time. Naturally, the tail of the previous packet and the head of the next receipt packet stick together.

1.4 how to solve the sticking phenomenon

  1. Add a special symbol. The receiver will separate the received data packets through this special symbol - DelimiterBasedFrameDecoder special separator decoder

  2. Send fixed length packets each time - FixedLengthFrameDecoder fixed length encoder

  3. Define the length field in the message header to identify the total length of the message - LengthFieldBasedFrameDecoder custom length decoder (this paper introduces this scheme in detail)

1.5 lengthfieldbasedframedecoder - Interpretation of 6 parameters

The LengthFieldBasedFrameDecoder is a custom length decoder, so the six parameters in the constructor are basically described around the defined length field.

  • maxFrameLength
    Maximum length of transmitted data frame

  • lengthFieldOffset
    Defines the subscript of the length field in the sent byte array. In other words: the place with the subscript ${lengthFieldOffset} in the sent byte array is the beginning of the length field

  • lengthFieldLength
    Describes the length of the defined length field. In other words: when sending byte array bytes, the byte array bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] field corresponds to the part of the defined length field

  • lengthAdjustment
    Satisfy the formula: sent byte array sent packet length = value of length field + lengthFieldOffset + lengthFieldLength + lengthAdjustment

  • initialBytesToStrip
    Remove the first initialBytesToStrip bit of the received transmission packet

  • failFast - true
    If the read length field exceeds maxFrameLength, a TooLongFrameException will be thrown. false: TooLongFrameException will be thrown only after the bytes represented by the value of the length field are actually read. It is set to true by default. It is recommended not to modify it, otherwise it may cause memory overflow
    For specific explanation, please refer to: https://blog.csdn.net/liyantianmin/article/details/85603347

2, Server implementation

2.1 definition of agreement entity

package com.powernow.usm.netty.protocol;

import io.netty.channel.Channel;
import lombok.Data;

/**
 * Message carrier
 *
 * Bidirectional data transmission carrier between transmission module and service module:
 *
 *                   MessageHolder
 * Service Module <----------------> Transport Module
 *
 */
@Data
public class MessageHolder {

    // Message flag 0x01: client -- > server
    /**
     * Message flag 0x01: request client -- > server
     *           0x02:Response server -- > client
     *           0x03:Notification server -- > client e.g. message forwarding
     */
    private byte sign;
    // Message type
    /**
     * Message type 0x15: personal message
     *        0x16:  Group message
     */
    private byte type;
    // Response status
    private byte status;
    // Json message body
    private String body;
    // The channel that received the message
    private Channel channel;
}

2.2 custom LengthFieldBasedFrameDecoder decoder

package com.powernow.usm.netty.handler;

import com.powernow.usm.netty.protocol.MessageHolder;
import com.powernow.usm.netty.protocol.ProtocolHeader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * LengthFieldBasedFrameDecoder Solve the sticking problem, https://www.jianshu.com/p/c90ec659397c
 * Decode Handler
 *
 *                                       Jelly Protocol
 *  __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __
 * |           |           |           |           |              |                          |
 *       2           1           1           1            4               Uncertainty
 * |__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __ __|__ __ __ __ __ __ __ __ __|
 * |           |           |           |           |              |                          |
 *     Magic        Sign        Type       Status     Body Length         Body Content
 * |__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __ __|__ __ __ __ __ __ __ __ __|
 *
 * Protocol header 9 bytes fixed length
 *     Magic      // Verification bit of data packet, short type
 *     Sign       // Message flag, request / response / notification, byte type
 *     Type       // Message type, login / send message, byte type
 *     Status     // Response status, success / failure, byte type
 *     BodyLength // Protocol body length, int type
 */
@Slf4j
public class ProtocolDecoder extends LengthFieldBasedFrameDecoder {
    
    public ProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength,lengthAdjustment,initialBytesToStrip,failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //Call the method of the parent class here to get the desired part. I want all of them here, or just the body part
        in = (ByteBuf) super.decode(ctx,in);

        if(in == null){
            return null;
        }
        if(in.readableBytes() < ProtocolHeader.HEADER_LENGTH){
            throw new Exception("The packet length is less than the protocol header length");
        }
        short magic = in.readShort();
        // Start decoding
        byte sign = in.readByte();
        byte type = in.readByte();
        byte status = in.readByte();
        // Confirmation message body length
        int length = in.readInt();
        
        if(in.readableBytes()!=length){
            throw new Exception("Inconsistent message body");
        }
        //Read body
        byte []bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        MessageHolder messageHolder = new MessageHolder();
        messageHolder.setSign(sign);
        messageHolder.setType(type);
        messageHolder.setStatus(status);
        messageHolder.setBody(new String(bytes, "utf-8"));
        return messageHolder;

    }
}

2.3 server Hanlder

package com.powernow.usm.netty.handler;

import com.powernow.usm.netty.protocol.MessageHolder;
import com.powernow.usm.netty.protocol.ProtocolHeader;
import com.powernow.usm.netty.queue.TaskQueue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;

/**
 * The Handler that finally receives the data puts the data to be processed into the blocking queue, which is taken and deal by the service module
 *
 */
@Slf4j
public class AcceptorHandler extends ChannelInboundHandlerAdapter {

    private final BlockingQueue<MessageHolder> taskQueue;

    public AcceptorHandler() {
        taskQueue = TaskQueue.getQueue();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof MessageHolder) {
            MessageHolder messageHolder = (MessageHolder) msg;
            // Specify Channel
            messageHolder.setChannel(ctx.channel());
            // Add to task queue
            boolean offer = taskQueue.offer(messageHolder);
            log.info("TaskQueue Add task: taskQueue={},message = {}" , taskQueue.size(),messageHolder.toString());
            if (!offer) {
                // Server busy
                log.warn("Server busy, denial of service");
                // Busy response
                response(ctx.channel(), messageHolder.getSign());
            }
        } else {
            throw new IllegalArgumentException("msg is not instance of MessageHolder");
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * Server busy response
     *
     * @param channel
     * @param sign
     */
    private void response(Channel channel, byte sign) {
        MessageHolder messageHolder = new MessageHolder();
        messageHolder.setSign(ProtocolHeader.RESPONSE);
        messageHolder.setType(sign);
        messageHolder.setStatus(ProtocolHeader.SERVER_BUSY);
        messageHolder.setBody("");
        channel.writeAndFlush(messageHolder);
    }
}

2.4 server implementation

package com.powernow.usm.netty;

import com.powernow.usm.netty.handler.AcceptorHandler;
import com.powernow.usm.netty.handler.ProtocolDecoder;
import com.powernow.usm.netty.handler.ProtocolEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @destription
 */
@Slf4j
@Component
public class NettyServer {

    @Value("${netty.port}")
    private Integer port;

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //Maximum length
    private static final int LENGTH_FIELD_LENGTH = 4;  //Number of bytes occupied by length field
    private static final int LENGTH_FIELD_OFFSET = 5;  //Length offset
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    @PostConstruct
    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(group, bossGroup) // Bind thread pool
                    .channel(NioServerSocketChannel.class) // Specify the channel to use
                    .localAddress(port)// Binding listening port
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // Trigger operation when Binding client connection
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            log.info("New client connection received: {}",ch.toString());
                            ch.pipeline().addLast("ProtocolDecoder", new ProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,true));
//                            ch.pipeline().addLast("ProtocolDecoder", new ProtocolDecoder1(1024 * 8,5,4));
                            ch.pipeline().addLast("ProtocolEncoder", new ProtocolEncoder());
                            ch.pipeline().addLast("IdleStateHandler", new IdleStateHandler(6, 0, 0));
                            ch.pipeline().addLast("AcceptorHandler", new AcceptorHandler());
                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // The server creates bindings asynchronously
            System.out.println(NettyServer.class + " Start listening: " + cf.channel().localAddress());
            cf.channel().closeFuture().sync(); // Close the server channel
        } catch (InterruptedException e) {
            log.warn("Netty Binding exception", e);
        } finally {
            group.shutdownGracefully().sync(); // Release thread pool resources
            bossGroup.shutdownGracefully().sync();
        }
    }
}
  • Custom decoder parameter interpretation
new ProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,true)
maxFrameLength: Maximum length of frame
lengthFieldOffset length: Address of field offset
lengthFieldLength length;Length of bytes occupied by field
lengthAdjustment: Modify the value defined in the frame data length field, which can be negative, because sometimes we are used to recording the length of the header,If negative,It indicates how many fields to push back
initialBytesToStrip: How many lengths are skipped during parsing
failFast; by true,When frame Length exceeds maxFrameLength Report immediately when necessary TooLongFrameException Exception, is false,Read the whole frame and then report the difference

3, Client implementation

3.1 custom client encoder

package com.powernow.usm.netty.handler;

import cn.hutool.json.JSONUtil;
import com.powernow.usm.config.BizException;
import com.powernow.usm.netty.protocol.MessageHolder;
import com.powernow.usm.netty.protocol.ProtocolHeader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;


/**
 * Code Handler
 *
 *                                       Jelly Protocol
 *  __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __ __
 * |           |           |           |           |              |                          |
 *       2           1           1           1            4               Uncertainty
 * |__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __ __|__ __ __ __ __ __ __ __ __|
 * |           |           |           |           |              |                          |
 *     Magic        Sign        Type       Status     Body Length         Body Content
 * |__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __|__ __ __ __ __|__ __ __ __ __ __ __ __ __|
 *
 * Protocol header 9 bytes fixed length
 *     Magic      // Verification bit of data packet, short type
 *     Sign       // Message flag, request / response / notification, byte type
 *     Type       // Message type, login / send message, byte type
 *     Status     // Response status, success / failure, byte type
 *     BodyLength // Protocol body length, int type
 *
 *
 */
@Slf4j
public class ProtocolEncoder extends MessageToByteEncoder<MessageHolder> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MessageHolder msg, ByteBuf out) throws Exception {
        String body = msg.getBody();
        if (msg == null || msg.getBody() == null) {
            throw new BizException("msg == null");
        }
        // code
        byte[] bytes = body.getBytes("utf-8");

        out.writeShort(ProtocolHeader.MAGIC)
                .writeByte(msg.getSign())
                .writeByte(msg.getType())
                .writeByte(msg.getStatus())
                .writeInt(bytes.length)
                .writeBytes(bytes);

    }
}

3.2 client Handler

package com.powernow.usm.netty.client;


import com.powernow.usm.dto.Message;
import com.powernow.usm.netty.protocol.MessageHolder;
import com.powernow.usm.netty.protocol.ProtocolHeader;
import com.powernow.usm.utils.Serializer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * The Handler that finally receives the data
 */
@Slf4j
public class AcceptorHandler extends ChannelInboundHandlerAdapter {

    private ChannelHandlerContext ctx;

    public AcceptorHandler(){}

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel active");
        this.ctx = ctx;
		// The client sends 10 messages continuously
        for (int i = 0; i < 10; i++) {
            Message message = new Message();
            message.setSender("sender" + i);
            message.setReceiver( "receiver" + i);
            message.setContent("hello receiver " + i + ", i am sender" + i);
            message.setTime(System.currentTimeMillis());
            MessageHolder messageHolder = new MessageHolder();
            messageHolder.setSign(ProtocolHeader.REQUEST);
            messageHolder.setType(ProtocolHeader.PERSON_MESSAGE);
            messageHolder.setStatus((byte) 0);
            messageHolder.setBody(Serializer.serialize(message));
            Channel channel = ctx.channel();
            channel.writeAndFlush(messageHolder);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Active after reading");
        if (msg instanceof MessageHolder) {
            MessageHolder messageHolder = (MessageHolder) msg;
            log.info(messageHolder.toString());
            // Processing messages
//            Dispatcher.dispatch(messageHolder);
        } else {
            throw new IllegalArgumentException("msg is not instance of MessageHolder");
        }
    }
}

3.3 client implementation

package com.powernow.usm.netty.client;

import com.powernow.usm.netty.handler.ProtocolDecoder;
import com.powernow.usm.netty.handler.ProtocolEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;

/**
 * @destription
 */
public class NettyClient {
    private final String host;
    private final int port;


    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //Maximum length
    private static final int LENGTH_FIELD_LENGTH = 4;  //Number of bytes occupied by length field
    private static final int LENGTH_FIELD_OFFSET = 5;  //Length offset
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    public NettyClient() {
        this(12345);
    }

    public NettyClient(int port) {
        this("localhost", port);
    }

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group) // Register thread pool
                    .channel(NioSocketChannel.class) // NioSocketChannel is used as the channel class for connection
                    .remoteAddress(new InetSocketAddress(this.host, this.port)) // Bind connection port and host information
                    .handler(new ChannelInitializer<SocketChannel>() { // Bind connection initializer
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("connect connected...");
                            ch.pipeline().addLast("ProtocolDecoder", new ProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,true));//Anti sticking treatment
                            ch.pipeline().addLast("ProtocolEncoder", new ProtocolEncoder());
                            ch.pipeline().addLast("IdleStateHandler", new IdleStateHandler(0, 5, 0));
                            ch.pipeline().addLast("ReaderHandler", new AcceptorHandler());
                        }
                    });
            System.out.println("created..");

            ChannelFuture cf = b.connect().sync(); // Asynchronous connection server
            System.out.println("connected..."); // Connection complete

            cf.channel().closeFuture().sync(); // Asynchronously waiting to close the connection channel
            System.out.println("closed.."); // Close complete
        } finally {
            group.shutdownGracefully().sync(); // Release thread pool resources
        }
    }

    public static void main(String[] args) throws Exception {
        new NettyClient("127.0.0.1", 12345).start();// Connect 127.0.0.1/12345 and start
        System.out.println("===================================");
    }
}

4, Verify

Start the restricted server, and then start the client to send data to the server. The results are shown in the figure below:

Keywords: Java Netty TCP/IP

Added by v4g on Thu, 17 Feb 2022 14:56:04 +0200