Netty sticky unpacking

1. What is sticky unpacking?

Packet gluing and unpacking generally occur at the TCP protocol layer. TCP, as the transport layer, generally does not know the specific meaning of the data in the upper layer (here refers to our code service layer). It will divide the data packets according to the actual situation of the TCP buffer. Therefore, it is considered as a complete packet in terms of service, and may be split into multiple packets by TCP for transmission, It is also possible to package multiple small packets into one large packet for transmission, which is the so-called TCP packet sticking and unpacking problem. Flow oriented communication has no message protection boundary.
The client sends two packets D1 and D2, but the server may receive data in the following cases. As shown in the figure below:

2. Solution

1. Give each message a fixed length, such as 100 bytes, that is, when the message is less than 100 bytes, fill it with spaces.
In this way, two problems will arise. The first is that if the length of the transmitted data is very small, but it is necessary to supplement 100 bytes, it will waste a lot of memory space. The second is that the maximum length of one-time data can only be 100 bytes.
2. Adding special separators at the end of the data packet, such as underline, middle dash, etc., is a simple and easy method, but when selecting separators, you must pay attention to that there must be no separators inside each data.
3. Sending length: when sending each data, the data length is sent together. For example, the first four bits of each data can be selected as the data length. During application layer processing, the start and end of each data can be judged according to the length.

// (carriage return and line feed subcontracting)
LineBasedFrameDecoder  
// (Special Separator subcontracting) 
DelimiterBasedFrameDecoder
// (subcontracting by fixed length message)
FixedLengthFrameDecoder

3. Code example

Here we show the code for a third solution.
The following 7 classes are involved.

3.1,MyClient

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyMessageEncoder());
                            pipeline.addLast(new MyClientHandler());
                        }
                    });

            System.out.println("netty client start. . ");
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }
}

3.2,MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i = 0; i< 10; i++) {
            String msg = "Hello, I'm Zhang San!";
            //Create protocol package object
            MyMessageProtocol messageProtocol = new MyMessageProtocol();
            messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
            messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));
            ctx.writeAndFlush(messageProtocol);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

3.3,MyServer

public class MyServer {
    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new MyMessageDecoder());
                            pipeline.addLast(new MyServerHandler());
                        }
                    });

            System.out.println("netty server start. . ");
            ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3.4,MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
        System.out.println("====The server receives the following message====");
        System.out.println("length=" + msg.getLen());
        System.out.println("content=" + new String(msg.getContent(), CharsetUtil.UTF_8));
        System.out.println("Number of message packets received by the server=" + (++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.5,MyMessageDecoder

public class MyMessageDecoder extends ByteToMessageDecoder {

    int length = 0;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println();
        System.out.println("MyMessageDecoder decode Called");
        //You need to get binary bytecode - > mymessageprotocol packet (object)
        System.out.println(in);

        if(in.readableBytes() >= 4) {
            if (length == 0){
                length = in.readInt();
            }
            if (in.readableBytes() < length) {
                System.out.println("The current readable data is not enough, continue to wait..");
                return;
            }
            byte[] content = new byte[length];
            if (in.readableBytes() >= length){
                in.readBytes(content);

                //It is encapsulated into MyMessageProtocol object and passed to the next handler for business processing
                MyMessageProtocol messageProtocol = new MyMessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                out.add(messageProtocol);
            }
            length = 0;
        }
    }
}

3.6,MyMessageEncoder

public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode Method called");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

3.7,MyMessageProtocol

public class MyMessageProtocol {

    /**
     * Define the length of one transmission packet
     */
    private int len;
    /**
     * Send package content at one time
     */
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

Keywords: Java Netty network

Added by adamjnz on Wed, 05 Jan 2022 23:25:35 +0200