1. What is sticky unpacking?
Packet gluing and unpacking generally occur at the TCP protocol layer. TCP, as the transport layer, generally does not know the specific meaning of the data in the upper layer (here refers to our code service layer). It will divide the data packets according to the actual situation of the TCP buffer. Therefore, it is considered as a complete packet in terms of service, and may be split into multiple packets by TCP for transmission, It is also possible to package multiple small packets into one large packet for transmission, which is the so-called TCP packet sticking and unpacking problem. Flow oriented communication has no message protection boundary.
The client sends two packets D1 and D2, but the server may receive data in the following cases. As shown in the figure below:
2. Solution
1. Give each message a fixed length, such as 100 bytes, that is, when the message is less than 100 bytes, fill it with spaces.
In this way, two problems will arise. The first is that if the length of the transmitted data is very small, but it is necessary to supplement 100 bytes, it will waste a lot of memory space. The second is that the maximum length of one-time data can only be 100 bytes.
2. Adding special separators at the end of the data packet, such as underline, middle dash, etc., is a simple and easy method, but when selecting separators, you must pay attention to that there must be no separators inside each data.
3. Sending length: when sending each data, the data length is sent together. For example, the first four bits of each data can be selected as the data length. During application layer processing, the start and end of each data can be judged according to the length.
// (carriage return and line feed subcontracting) LineBasedFrameDecoder // (Special Separator subcontracting) DelimiterBasedFrameDecoder // (subcontracting by fixed length message) FixedLengthFrameDecoder
3. Code example
Here we show the code for a third solution.
The following 7 classes are involved.
3.1,MyClient
public class MyClient { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); pipeline.addLast(new MyClientHandler()); } }); System.out.println("netty client start. . "); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
3.2,MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i = 0; i< 10; i++) { String msg = "Hello, I'm Zhang San!"; //Create protocol package object MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length); messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
3.3,MyServer
public class MyServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageDecoder()); pipeline.addLast(new MyServerHandler()); } }); System.out.println("netty server start. . "); ChannelFuture channelFuture = serverBootstrap.bind(9000).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
3.4,MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception { System.out.println("====The server receives the following message===="); System.out.println("length=" + msg.getLen()); System.out.println("content=" + new String(msg.getContent(), CharsetUtil.UTF_8)); System.out.println("Number of message packets received by the server=" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
3.5,MyMessageDecoder
public class MyMessageDecoder extends ByteToMessageDecoder { int length = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println(); System.out.println("MyMessageDecoder decode Called"); //You need to get binary bytecode - > mymessageprotocol packet (object) System.out.println(in); if(in.readableBytes() >= 4) { if (length == 0){ length = in.readInt(); } if (in.readableBytes() < length) { System.out.println("The current readable data is not enough, continue to wait.."); return; } byte[] content = new byte[length]; if (in.readableBytes() >= length){ in.readBytes(content); //It is encapsulated into MyMessageProtocol object and passed to the next handler for business processing MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } length = 0; } } }
3.6,MyMessageEncoder
public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder encode Method called"); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }
3.7,MyMessageProtocol
public class MyMessageProtocol { /** * Define the length of one transmission packet */ private int len; /** * Send package content at one time */ private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }