netty(3) – stick pack and half pack problem
What is TCP sticky packet and half packet?
Suppose the client sends two data packets D1 and D2 to the server respectively. Since the number of bytes read by the server at one time is uncertain, there may be the following four situations.
-
The server reads two independent data packets, D1 and D2, without sticking and unpacking;
-
The server receives two data packets at one time. D1 and D2 are bonded together, which is called TCP sticky packet;
-
The server reads two data packets in two times. The first time it reads the complete D1 packet and part of the D2 packet, and the second time it reads the rest of the D2 packet, which is called TCP unpacking;
-
The server reads two data packets twice, and reads part of D1 packet for the first time_ 1. Read the remaining content D1 of D1 package for the second time_ 2 and D2 packages.
At this time, if the TCP receiving sliding window of the server is very small and the data packets D1 and D2 are relatively large, the fifth possibility is likely to occur, that is, the server can receive the D1 and D2 packets completely by multiple times, and multiple unpacking occurs during this period.
Causes of TCP packet sticking / half packet
Due to the mechanism of TCP protocol itself (connection oriented reliable protocol - three-time handshake system), the client and the server will maintain a connection (Channel). When the data connection is not open, multiple data packets can be sent to the server continuously, but if the sent network data packets are too small, Then it will enable the Nagle algorithm (whether it can be configured to enable it or not) to merge smaller packets (based on this, the network delay of TCP is higher than that of UDP) and then send them (timeout or packet size is sufficient). In this case, when the server receives the message (data flow), it cannot distinguish which data packets are sent separately by the client, resulting in sticky packets; After receiving the database, the server puts it into the buffer. If the message is not taken away from the buffer in time, multiple data packets may be taken out at one time next time, resulting in packet sticking
UDP: as a connectionless and unreliable transmission protocol (suitable for frequently sending small data packets), it will not merge and send data packets (there is no Nagle algorithm). It will directly send what data is sent at one end. Since it will not merge data, Every packet is complete (data + UDP header + IP header, etc. are sent once, and data is encapsulated once). There is no need to stick the packet.
The reasons for subcontracting are much simpler: it may be caused by IP fragment transmission, or half packets caused by the loss of some packets in the transmission process, or a packet may be divided into two transmissions, and a part of the packet is taken first when taking data (which may also be related to the size of the received buffer), In short, a packet is divided into multiple reception. There are three more specific reasons, as follows.
-
The size of the socket write buffer is larger than the size of the socket write buffer
-
Perform MSS size TCP segmentation. MSS is the abbreviation of maximum message segment length. MSS is the maximum length of data field in TCP message segment. The data field plus the TCP header is equal to the whole TCP message segment. Therefore, MSS is not the maximum length of TCP message segment, but: MSS=TCP message segment length - TCP header length
-
The payload of Ethernet is larger than MTU for IP fragmentation. MTU refers to the maximum packet size that can pass through a certain layer of a communication protocol. If the IP layer has a data packet to transmit, and the length of the data is larger than the MTU of the link layer, the IP layer will divide the data packet into supporting dry pieces, so that each piece does not exceed the MTU. Note that IP fragmentation can occur on the original sender host or on the intermediate router.
Solve the problem of sticking package and half package
Because the underlying TCP cannot understand the business data of the upper layer, it is impossible to ensure that the data packets will not be split and reorganized at the lower layer. This problem can only be solved through the design of the upper application protocol stack. According to the solutions of the mainstream protocols in the industry, it can be summarized as follows.
-
Add a separator at the end of the package, such as carriage return and line feed for segmentation, such as FTP protocol;
-
The message length is fixed. For example, the size of each message is a fixed length of 200 bytes. If it is not enough, fill in the space;
-
The message is divided into message header and message body. The message header contains the field representing the total length of the message (or the length of the message body). The general design idea is that the first field of the message header uses int32 to represent the total length of the message, and uses LengthFieldBasedFrameDecoder.
Practice of solving the problem of half bag sticking
Carriage return and line feed for segmentation
LineBasedFrameDecoder
In order to solve the problem of half packet reading and writing caused by TCP packet sticking / unpacking, the newline separator codec LineBasedFrameDecoder provided by Netty by default is used to deal with the problem of half packet sticking.
The working principle of LineBasedFrameDecoder is that it successively traverses the readable bytes in ByteBuf to judge whether there is "\ n" or "\ r\n". If so, this position is the end position, and the bytes from the readable index to the end position form a line. It is a decoder with carriage return line feed as the end mark. It supports configuring the maximum length of a single line. If the line feed is still not found after continuously reading the maximum length, an exception will be thrown and the previously read abnormal code stream will be ignored.
Server
public class LineBaseEchoServer { public static final int PORT = 9998; public static void main(String[] args) throws InterruptedException { LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer(); System.out.println("The server is about to start"); lineBaseEchoServer.start(); } public void start() throws InterruptedException { final LineBaseServerHandler serverHandler = new LineBaseServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*Thread group*/ try { ServerBootstrap b = new ServerBootstrap();/*The server must be started*/ b.group(group)/*Pass thread group in*/ .channel(NioServerSocketChannel.class)/*Specifies the use of NIO for network transmission*/ .localAddress(new InetSocketAddress(PORT))/*Specify server listening port*/ /*Every time the server receives a connection request, it will start a new socket communication, that is, channel, So the function of the following code is to add a handle to this subchannel*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*Bind to the server asynchronously, and sync() will block until it is completed*/ System.out.println("The server starts up and waits for the connection and data of the client....."); f.channel().closeFuture().sync();/*Block until the server's channel is closed*/ } finally { group.shutdownGracefully().sync();/*Gracefully close thread group*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LineBaseServerHandler()); } } }
@ChannelHandler.Sharable public class LineBaseServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client:["+ctx.channel().remoteAddress()+"]Connected........."); } /*** Processing after the server reads the network data*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + System.getProperty("line.separator"); ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** Processing after the server reads the network data*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+"About to close..."); } }
Client
public class LineBaseEchoClient { private final String host; public LineBaseEchoClient(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*Thread group*/ try { final Bootstrap b = new Bootstrap();;/*The client must start*/ b.group(group)/*Pass thread group in*/ .channel(NioSocketChannel.class)/*Specifies the use of NIO for network transmission*/ .remoteAddress(new InetSocketAddress(host,LineBaseEchoServer.PORT))/*Configure the ip address and port of the server to which you want to connect*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("Connected to server....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new LineBaseClientHandler()); } } public static void main(String[] args) throws InterruptedException { new LineBaseEchoClient("127.0.0.1").start(); } }
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** Processing after the client reads the network data*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); ctx.close(); } /*** After the client is notified that the channel is active, do something*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,King,James,Deer" + System.getProperty("line.separator"); for(int i=0;i<10;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Custom separator
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder is similar to LineBasedFrameDecoder, but it is more general and allows us to specify any special characters as separators. We can also specify multiple separators at the same time. If there are multiple separators in the request, the separator with the shortest content will be selected as the basis.
Server
public class DelimiterEchoServer { public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997; public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("The server is about to start"); delimiterEchoServer.start(); } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*Thread group*/ try { ServerBootstrap b = new ServerBootstrap();/*The server must be started*/ b.group(group)/*Pass thread group in*/ .channel(NioServerSocketChannel.class)/*Specifies the use of NIO for network transmission*/ .localAddress(new InetSocketAddress(PORT))/*Specify server listening port*/ /*Every time the server receives a connection request, it will start a new socket communication, that is, channel, So the function of the following code is to add a handle to this subchannel*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*Bind to the server asynchronously, and sync() will block until it is completed*/ System.out.println("The server starts up and waits for the connection and data of the client....."); f.channel().closeFuture().sync();/*Block until the server's channel is closed*/ } finally { group.shutdownGracefully().sync();/*Gracefully close thread group*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL.getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } } }
@ChannelHandler.Sharable public class DelimiterServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); /*** Processing after the server reads the network data*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** Processing after the server reads the network data*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client
public class DelimiterEchoClient { private final String host; public DelimiterEchoClient(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*Thread group*/ try { final Bootstrap b = new Bootstrap();;/*The client must start*/ b.group(group)/*Pass thread group in*/ .channel(NioSocketChannel.class)/*Specifies the use of NIO for network transmission*/ .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*Configure the ip address and port of the server to which you want to connect*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("Connected to server....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer( DelimiterEchoServer.DELIMITER_SYMBOL.getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterClientHandler()); } } public static void main(String[] args) throws InterruptedException { new DelimiterEchoClient("127.0.0.1").start(); } }
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** Processing after the client reads the network data*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** After the client is notified that the channel is active, do something*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,King,James,Deer" + DelimiterEchoServer.DELIMITER_SYMBOL; for(int i=0;i<10;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Fixed message length
FixedLengthFrameDecoder
The message length is fixed. After cumulatively reading the message with the total length of fixed length LEN, it is considered that a complete message has been read, and then set the counter to re read the next datagram. For example, the size of each message can be fixed to 1024 bytes. If the length of the message is not enough, the empty bit is used to fill the vacancy. In this way, after reading, you only need to remove the space in trim.
FixedLengthFrameDecoder is a fixed length decoder. It can automatically decode messages according to the specified length. Developers do not need to consider the sticking and unpacking of TCP. It is very practical. No matter how many datagrams are received at a time, it will decode according to the fixed length set in the constructor. If it is a half packet message, the FixedLengthFrameDecoder will cache the half packet message and wait for the next packet to arrive for packet merging until a complete message packet is read.
Server
public class FixedLengthEchoServer { public static final String RESPONSE = "Welcome to Netty!"; public static final int PORT = 9996; public static void main(String[] args) throws InterruptedException { FixedLengthEchoServer fixedLengthEchoServer = new FixedLengthEchoServer(); System.out.println("The server is about to start"); fixedLengthEchoServer.start(); } public void start() throws InterruptedException { final FixedLengthServerHandler serverHandler = new FixedLengthServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*Thread group*/ try { ServerBootstrap b = new ServerBootstrap();/*The server must be started*/ b.group(group)/*Pass thread group in*/ .channel(NioServerSocketChannel.class)/*Specifies the use of NIO for network transmission*/ .localAddress(new InetSocketAddress(PORT))/*Specify server listening port*/ /*Every time the server receives a connection request, it will start a new socket communication, that is, channel, So the function of the following code is to add a handle to this subchannel*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*Bind to the server asynchronously, and sync() will block until it is completed*/ System.out.println("The server starts up and waits for the connection and data of the client....."); f.channel().closeFuture().sync();/*Block until the server's channel is closed*/ } finally { group.shutdownGracefully().sync();/*Gracefully close thread group*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast( new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())); ch.pipeline().addLast(new FixedLengthServerHandler()); } } }
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); ctx.writeAndFlush(Unpooled.copiedBuffer( FixedLengthEchoServer.RESPONSE.getBytes())); } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client
public class FixedLengthEchoClient { public final static String REQUEST = "Mark,Lison,Peter,James,Deer"; private final String host; public FixedLengthEchoClient(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*Thread group*/ try { final Bootstrap b = new Bootstrap();;/*The client must start*/ b.group(group)/*Pass thread group in*/ .channel(NioSocketChannel.class)/*Specifies the use of NIO for network transmission*/ .remoteAddress(new InetSocketAddress(host,FixedLengthEchoServer.PORT))/*Configure the ip address and port of the server to which you want to connect*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("Connected to server....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast( new FixedLengthFrameDecoder( FixedLengthEchoServer.RESPONSE.length())); ch.pipeline().addLast(new FixedLengthClientHandler()); } } public static void main(String[] args) throws InterruptedException { new FixedLengthEchoClient("127.0.0.1").start(); } }
public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** Processing after the client reads the network data*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** After the client is notified that the channel is active, do something*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; for(int i=0;i<10;i++){ msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length()); msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes()); ctx.writeAndFlush(msg); } } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; for(int i=0;i<10;i++){ msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length()); msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes()); ctx.writeAndFlush(msg); } } /*** Handling after exception*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }