Stick wrap and half wrap
sliding window
In order to improve the efficiency of TCP transmission, the transmitter and receiver will maintain an array of dynamic storage and maintain a certain number of TCP connections.
Phenomenon analysis
Sticky bag
-
Phenomenon: send abc def and receive abcdef
-
reason:
-
Application layer: the receiver ByteBuf setting is large (Netty default 1024)
-
Sliding window: if the receiver fails to process data in time and the window size is large enough, multiple packets will be pasted if cached in the sliding window
-
Nagle algorithm 1: it will cause sticky packets
-
Half package
-
Phenomenon: send abcdef and receive abc def
-
reason:
-
Application layer: the receiver ByteBuf is less than the amount of data actually sent
-
Sliding window: the window size is small and the sender's message is large. Insufficient window capacity can only send part of the data, resulting in half a packet
-
MSS limit: when the sent data exceeds the MSS limit, the data will be sent in segments, resulting in half packets
-
Essential cause: TCP borderless
Solution
-
The client closes the connection in time after sending data each time
@Override // Triggered after the Channel connection is successfully established public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(16); buf.writeBytes(new byte[] {'1','2','3','4','5','6','7','8','9','0','a','s','d','f','g','h'}); ctx.writeAndFlush(buf); ctx.channel().close(); }
-
The server adopts fixed length reception
@Override protected void initChannel(NioSocketChannel channel) { channel.pipeline().addLast(new StringDecoder()) .addLast(new FixedLengthFrameDecoder(16)) // Decode every 16 bytes .addLast(new LoggingHandler(LogLevel.DEBUG)); }
-
The server uses newline character to divide the reception
@Override protected void initChannel(NioSocketChannel channel) { channel.pipeline().addLast(new StringDecoder()) .addLast(new LineBasedFrameDecoder(1024)) // Maximum search length limit .addLast(new LoggingHandler(LogLevel.DEBUG)); }
-
The server adopts protocol analysis
When writing data, first declare the data length, and the server directly parses it according to the protocol content
public static void main(String[] args) { EmbeddedChannel channel = new EmbeddedChannel( new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4/*Length of split data*/), new StringDecoder(StandardCharsets.UTF_8), new LoggingHandler(LogLevel.DEBUG) ); ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); for (int i = 0; i < 3; ++ i) { byte[] bytes = "Hello IllTamer".getBytes(StandardCharsets.UTF_8); buf.writeInt(bytes.length); buf.writeBytes(bytes); } channel.writeInbound(buf); }
Analysis and design of protocol
MyBatis
The protocol data corresponding to set name IllTamer is as follows * 3 \r\n \r\n set \r\n \r\n name \r\n \r\n IllTamer \r\n. It should be noted that MyBatis is the server and the java application is the client (BootStrap).
public static void main(String[] args) throws InterruptedException { NioEventLoopGroup worker = new NioEventLoopGroup(); try { ChannelFuture future = new Bootstrap() .group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { channel.pipeline().addLast(new LoggingHandler()) .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes("*3".getBytes()); buffer.writeBytes(END); buffer.writeBytes("$3".getBytes()); buffer.writeBytes(END); buffer.writeBytes("set".getBytes()); buffer.writeBytes(END); buffer.writeBytes("$4".getBytes()); buffer.writeBytes(END); buffer.writeBytes("name".getBytes()); buffer.writeBytes(END); buffer.writeBytes("$8".getBytes()); buffer.writeBytes(END); buffer.writeBytes("IllTamer".getBytes()); buffer.writeBytes(END); ctx.writeAndFlush(buffer); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); } }); } }) .connect(new InetSocketAddress("192.168.113.201", 6379)) .sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("error ", e); } }
Http
Note that the browser sending the request is the client, and the java application is the server (ServerBootStrap).
public static void main(String[] args) throws InterruptedException { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ChannelFuture future = new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) { channel.pipeline() .addLast(new LoggingHandler(LogLevel.DEBUG)) .addLast(new HttpServerCodec()) .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("Data: {}", msg.getClass()); if (msg instanceof HttpRequest) { // Request header, request line HttpRequest request = (HttpRequest) msg; log.error(request.uri()); // Return response byte[] messages = "<h1>Hello World !<h1>".getBytes(StandardCharsets.UTF_8); DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK); response.headers().setInt(CONTENT_LENGTH, messages.length); // Specify the response data to prevent the client from waiting response.content().writeBytes(messages); // Write back corresponding ctx.writeAndFlush(response); } else if (msg instanceof HttpContent) { // Request body } } }); } }) .bind(25565) .sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("error ", e); } }
Custom protocol
Elements of attention
-
Magic number - used to determine whether it is an invalid packet
-
Version number - used to support protocol upgrade
-
Serialization algorithm - the serialization / deserialization method adopted by the message body can be extended by json, protobuf, hessian, etc
-
Instruction type - specifies business-related instructions such as login, registration, etc
-
Request sequence number - provides asynchronous capability for duplex communication
-
Text length
-
Message body
Message interface - message
public interface Message extends Serializable { /** * Message type * */ byte getMessageType(); /** * Request serial number * Realize duplex and provide asynchronous capability * */ int getSequenceId(); }
Custom codec - MessageCodec
@Slf4j public class MessageCodec extends ByteToMessageCodec<Message> { @Override protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf byteBuf) throws Exception { write(message, byteBuf); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception { list.add(read(byteBuf)); } private static ByteBuf write(Message message, ByteBuf buf) throws IOException { byte[] protocol = new byte[] { 7,6,5,7, // magic number 1, // edition 0, // Serialization type jdk 0, json 1 message.getMessageType(), // Byte instruction type }; buf.writeBytes(protocol).writeInt(message.getSequenceId()); ByteArrayOutputStream bos = new ByteArrayOutputStream(); (new ObjectOutputStream(bos)).writeObject(message); byte[] data = bos.toByteArray(); buf.writeInt(data.length); // Data length buf.writeBytes(data); // data return buf; } private static Message read(ByteBuf buf) throws IOException, ClassNotFoundException { int magic = buf.readInt(); byte version = buf.readByte(); byte serializerType = buf.readByte(); byte messageType = buf.readByte(); int sequenceId = buf.readInt(); int length = buf.readInt(); byte[] data = new byte[length]; buf.readBytes(data, 0, length); Message message = null; if (serializerType == 0) { // jdk ObjectInputStream input = new ObjectInputStream(new ByteArrayInputStream(data)); message = (Message) input.readObject(); } log.debug("magic {}, version {}, serializerType {}, messageType {}, sequenceId {}, length {}, message {}", magic, version, serializerType, messageType, sequenceId, length, message); return message; } }
Test end
public static void main(String[] args) throws Exception { EmbeddedChannel channel = new EmbeddedChannel( // lengthFieldOffset the offset length of the data length new LengthFieldBasedFrameDecoder(1024, 11, 4), new LoggingHandler(), new MessageCodec() ); Message message = new ExampleMessage("IllTamer", 6); // encode channel.writeOutbound(message); // decode ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, message, buf); // Inbound channel.writeInbound(buf); }
@Sharable
The ChannelHandler stateless processor ID that comes with Netty. Only when the processor does not record status information and can be shared between channels will this annotation be provided.
Idle connection & heartbeat packet
Detect idle connections
IdleStateHandler - determines whether the read and write time is too long
@Override protected void initChannel(NioSocketChannel ch) throws Exception { // Exceeding the detection time will trigger the IdleStateEvent event event ch.pipeline().addLast(new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast(new ChannelDuplexHandler() { @Override // Used to respond to special events public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { System.out.println("5s Read free!"); } } }); }
Heartbeat
In practical applications, it often happens that the client is connected normally but does not send data for a long time. At this time, we need to send heartbeat packets to the client regularly to judge whether the client cannot communicate with the server normally due to network delay / data blocking, so as to judge whether to disconnect from the client.