Netty knowledge summary
This article is mainly a summary of some knowledge points about Netty In Action
history
Blocking io
A thread is used to process a connection. Connection creation and operation are blocked
usage method
try { // Listen to local port 8000 ServerSocket serverSocket = new ServerSocket(8000); // Blocking waiting connections Socket clientSocket = serverSocket.accept(); BufferedReader inputStream = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true); String request; // Blocking waiting input while((request = inputStream.readLine()) != null) { if ("Done".equals(request)) { break; } System.out.println(request); } } catch (IOException e) { e.printStackTrace(); }
shortcoming
- You can only process one request at a time. If you need to process multiple requests, you need to create multiple threads
- A large number of threads are waiting for connection or input, resulting in a waste of thread resources
- Each thread's call stack needs to allocate memory. On the one hand, the memory of the physical machine limits the number of threads. On the other hand, thread switching between a large number of threads brings a lot of overhead
NIO
NIO uses a Selector to manage multiple sockets. Each Socket registers events of interest and determines which sockets can be processed through event notification
Therefore, one thread can be used to manage multiple sockets
NIO has two versions, old and new. The old version was introduced in jdk4 and the new version was introduced in jdk7
use
Old NIO
ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(8000); ss.bind(address); serverChannel.configureBlocking(false); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { try { selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle in a proper way break; } Set readyKeys = selector.selectedKeys(); Iterator iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE SelectionKey.OP_READ, ByteBuffer.allocate(100)); } if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); client.read(output); } if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); output.flip(); client.write(output); output.compact(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { } } } }
New NIO
CompletionHandler is introduced in the new NIO. When the operation is completed, the corresponding CompletionHandler will be called
The biggest difference between the new NIO and the old NIO is that the callback is registered by using the CompletionHandler. There is no need to check which events are ready at present
When the event is ready, the corresponding CompletionHandler will be called automatically
public void nioTest() throws Exception{ final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(8000); serverChannel.bind(address); final CountDownLatch latch = new CountDownLatch(1); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(final AsynchronousSocketChannel channel, Object attachment) {serverChannel.accept(null, this); ByteBuffer buffer = ByteBuffer.allocate(100); channel.read(buffer, buffer, new EchoCompletionHandler(channel)); } @Override public void failed(Throwable throwable, Object attachment) { try { serverChannel.close(); } catch (IOException e) { // ingnore on close } finally { latch.countDown(); } } }); try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel channel; EchoCompletionHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { #6 @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); #7 } else { buffer.compact(); channel.read(buffer, buffer, EchoCompletionHandler.this); #8 } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }
advantage
- One thread manages multiple sockets to avoid creating a large number of threads, which reduces the occupation of memory and the overhead of thread switching
- When there is no io operation, the thread can do other work
shortcoming
- Not easy to use
- Poor cross platform performance. The same code may run normally on linux, but it runs abnormally on windows
- NIO2.0 is only supported on jdk7 and above
- The data container ByteBuffer in NIO does not support extension
- epoll bug on linux will cause null loop of Selector, resulting in 100% cpu utilization
Netty
use
Server
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception{ EchoServerHandler handle = new EchoServerHandler(); // Specify how to handle events, such as connection establishment, sending data and receiving data EventLoopGroup group = new NioEventLoopGroup(); try { // Use Bootstrap to configure the server ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // There are two ways to write this. One is the current one. Multiple channel s will share this handle // If the current writing method is used, the Handler must use the @ Sharable annotation, otherwise multiple connections will report errors socketChannel.pipeline().addLast(handle); // If each channel uses its own handler and does not share handlers with other channels, use the following method //socketChannel.pipeline().addLast(new EchoServerHandler()); } }); // Blocking waiting for binding to succeed ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } /** * The service side business logic processes the input data * Write the received data back to the client */ @Sharable class EchoServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(count); count.addAndGet(1); ByteBuf buf = (ByteBuf) msg; System.out.println(String.format("server receivecd:%s", buf.toString(CharsetUtil.UTF_8))); // Write the received data back to the client. At this time, the data is not flush ed ctx.writeAndFlush(Unpooled.copiedBuffer("server received", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(String.format("exception caught:%s", cause)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // Data written before flush // And close the connection after completion ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel inactive"); } } public static void main(String[] args) throws Exception{ new EchoServer(8000).start(); } }
client
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // Configure client Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); // Connect to the server, and ChannelFuture represents the connection result ChannelFuture f = b.connect(host, port).sync(); // The connection result registers a callback. When the connection is established successfully or fails, the callback will be executed f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("Connection established"); } else { System.out.println("Connection failed"); } } }); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel active"); // When the connection is established, send a message to the server String sendMessage = "netty rocks"; ctx.writeAndFlush(Unpooled.copiedBuffer("netty rocks", CharsetUtil.UTF_8)); //ctx.writeAndFlush("netty rocks"); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // The received information may arrive in batches, so the data in a call may only belong to a part of the sent information System.out.println("client received" +byteBuf.toString(CharsetUtil.UTF_8)); } } public static void main(String[] args) throws Exception{ new EchoClient("localhost", 8000).start(); } }
Core components
Bootstrap
Bootstrap is used to configure the server or client, such as specifying EventLoopGroup, Channel, Handler, etc
Handler
When the specified event is triggered, the Handler will be called. The Handler is mainly our business logic
ChannelInitializer
ChannelInitializer is used to add a Handler
EventLoop
EventLoop is used to handle io operations, and one EventLoop is used to handle multiple channel s
The EventLoopGroup contains multiple eventloops
Channle
Generally, a Channel corresponds to a socket connection, but it can also correspond to anything with io operation
ChannelFuture
Whether establishing a connection or reading or writing data, it is executed asynchronously in netty
When some operations are executed, they will not be executed immediately, but will be executed at a later point in time
In some scenarios, you need to know whether the operation is completed, so netty provides ChannelFuture
ChannelFuture is a special Future on which you can register some listener s to execute these callbacks when the operation is completed
principle
Relationship of EventLoop ChannelThread
When a new connection establishment request arrives, it will select an EventLoop from the EventLoopGroup, create a Channel for the connection, and register it on the EventLoop
On the one hand, EventLoop is bound to a thread; On the other hand, all events of the Channel are handled by this EventLoop
Therefore, we do not need to synchronize when developing business logic
Client Bootstrap and Server Bootstrap
Whether you use netty on the client or server, you need to use Bootstrap for configuration
There are two main differences
- The Bootstrap of the client specifies which port to connect to which address; The Bootstrap on the server specifies which local port to listen on
- The Bootstrap of the client uses an EventLoopGroup; The Bootstrap on the server uses two eventloopgroups, one of which is responsible for handling connection establishment requests. For the established requests, create a Channel and register it with the other EventLoopGroup
The reason why the server uses two eventloopgroups is that if one group is used to handle connection establishment and other events at the same time, when a large number of connections are being processed, the threads are occupied by these tasks, resulting in failure to receive new connections, resulting in connection timeout
Transport
Netty provides many built-in Transport implementations
Let's focus on NIO and OIO
NIO
OIO
Compare OIO and NIO
throughput
In terms of throughput, OIO is better than NIO
The reason is that NIO's implementation mechanism will lead to a certain delay in processing events. When events occur, it takes a certain time to notify the selector
Concurrency
In terms of concurrency, NIO is better than OIO
In NIO, a thread corresponds to a selector, and a selector can manage multiple connections; In OIO, a thread corresponds to a connection
Therefore, NIO can handle more connections than OIO under the same physical resource conditions
file transfer
In terms of file transfer, for example, NIO is better than OIO for implementing ftp server
Because NIO has zero copy, if you do not need to operate on the file and just return the file to the client, NIO will directly write the file from the file system to the network stack, thus avoiding the process of copying data from kernel space to user space
Selection in different scenarios
Buffer
Buffer type
Because the ByteBuffer in the jdk is not easy to use, netty has designed its own buffer to store data
Common buffer types are as follows:
- Heap Buffer: stores the data in the heap memory of the jvm. (1) The speed of allocating and reclaiming space is faster (2) you can directly access the underlying array of data
- Direct buffer: stores data in off heap memory. (1) When the data to be used needs to be transmitted through socket s, direct buffer is a better choice, because if direct buffer is not used, the jvm will copy the data to a direct buffer (2) before sending the data. The cost of allocating and reclaiming space is relatively high, but pooling can solve this problem (3) Because the data is not in the jvm heap, you cannot directly access the underlying array of data
- Composite Buffer: mixed buffer, which can contain multiple types of buffers. (1) You can add multiple buffers to it to provide a unified view. (2) because the underlying buffers have multiple types, you cannot directly return arrays
operation
Buffer provides two ways to access stored bytes: random access and sequential access
read and get write and set
In ByteBuf, there are two types of methods to read data: read and get, and there are also two types of methods to write data: write and set
The differences are as follows:
- After reading the data, the readerIndex will be added, while the get operation will not modify the readerIndex
- After writing data, write will add writerIndex, while set operation will not modify writerIndex
Random access
ByteBuf buffer = ...; for (int i = 0;i < buffer.capacity();i++) { byte b = buffer.getByte(i); System.out.println(b); }
Sequential access
Buffer provides two pointers to indicate the current read position and write position
discardable bytes
The bytes in this area represent that the user has accessed and can be discarded. When calling discardReadBytes, this part of the data will be cleared, and the unread data will be moved to this area by memory copy, which will affect the performance
ChannelHandler
ChannelPipeline
- The ChannelPipeline contains multiple channelhandlers, and each Handler contains its own processing logic
- Each ChannelPipeline is bound to a Channel
- When reading data from the connection, it will be executed from front to back in the order that InboundHandler is added to the handlerPipeline
- When data is written to the connection, it will be executed from back to front in the order that the OutboundHandler is added to the handlerPipeline
- The pipeline can be dynamically modified during operation
ChannelHandlerContext
Each time a new ChannelHandler is added to the ChannelPipeline, a new ChannelHandlerContext will be created and associated with the ChannelHandler
ChannelHandlerContext enables the current ChannelHandler to interact with other channelhandlers
A ChannelHandler can be added to multiple channelpipelines. Whenever a ChannelHandler is added to a ChannelPipeline, a new channelhandlercontext will be generated and bound to the ChannelHandler
When a ChannelHandler is added to multiple channelpipelines, the @ Sharable annotation must be used
ChannelInboundInvoker and channeloutbooundinvoker
The ChannelInboundInvoker interface has the ability to trigger inbound events
The channeloutbound invoker interface has the ability to trigger outbound events
ChannelHandlerContext and ChannelPipeline inherit both ChannelInboundInvoker and channeloutbooundinvoker
Therefore, both have the ability to trigger inbound and outbound events
However, the manifestations of the two are different. Events triggered through ChannelPipeline will be executed by all channelhandlers contained in ChannelPipeline, while events triggered through channelhandlercontext will be executed from the latest ChannelHandler
Channel state model
state | describe |
---|---|
chanelUnregistered | channel has been created, but it is not registered with EventLoop |
chanelRegistered | The channel has been created and registered with EventLoop |
chanelActive | channel is connected to the remote end and can receive and send messages |
chanelInactive | channel and remote disconnect |
ChannelHandler
There are two types of channelhandlers:
- ChannelInboundHandler
Used to receive data and respond to changes in status - ChannelOutboundHandler
Used to send data
ChannleInboundHandler
The ChannelInboundHandlerAdapter provides a simple implementation of ChannelInboundHandler. When each event is triggered, it will not process this event, but simply call the next handler for processing; In addition, in its channelRead method, the user must release byteBuf manually, otherwise it will cause memory leakage
SimpleChannelInboundHandler provides the function of automatically releasing byteBuf based on the ChannelInboundHandlerAdapter. It only needs to write the processing logic of reading data in the channelRead0 method. When the method is executed, the corresponding byteBuf will be released automatically
ChannelOutboundHandler
The ChannelOutboundHandlerAdapter provides a simple implementation of ChannelOutboundHanler. When each event is triggered, it will not process this event, but simply call the next handler for processing
Codec
Codec defines how to convert bytes to data in a specified format and how to convert data in a specified format to bytes
Codec is mainly divided into the following two parts:
- Decoder
Converts bytes into data in the specified format for use in inbound - Encoder
Converts data in the specified format into bytes for use in outbound
Decoder
Decoder is mainly used in inbound to convert data from one format to another. It is a channelinbound handler
Therefore, the Decoder can be divided into the following three categories according to the source format and target format
- byte -> message
- message -> message
- message -> byte
ByteToMessageDecoder
netty provides an abstract class ByteToMessageDecoder for the scenario of byte - > message
We mainly need to implement the following method
- The logic of decode byte - > message
Let's take a look at an example of converting bytes to integers
public class ByteToIntMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // Judge whether the number of readable bytes in the input is more than 4 if (in.readableBytes() >= 4) { // Reads an integer from the input out.add(in.readInt()); } } }
As can be seen from the above example, each time you read data from the input, you need to judge whether there are enough bytes in the input
MessageToMessageDecoder
When it is necessary to convert a message of one format into a message of another format, netty provides MessageToMessageDecoder
MessageToMessageDecoder
Similar to ByteToMessageDecoder, you also need to implement the decode method
Let's take a look at an example of converting an integer to a string
public class IntToStringDecoder extends MessageToMessageDecoder<Integer> { @Override protected void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(msg.toString()); } }
Encoder
Encoder is mainly used for outbound. It can be divided into the following two categories according to the source type and target type:
- message -> message
- message -> byte
MessageToByteEncoder
When message - > byte conversion is required, netty provides MessageToByteEncoder. We only need to inherit this class and implement the encode method
Let's look at the implementation of an encoder that converts integers to byte s
public class IntToByteEncoder extends MessageToByteEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } }
MessageToMessageEncoder
When message - > message conversion is required, netty provides MessageToMessageEncoder. Similarly, it only needs to implement the encode method
Let's take a look at an example of converting an integer to a string
public class IntToStringMessageEncoder extends MessageToMessageEncoder<Integer> { @Override protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(msg.toString()); } }
combination
There is usually a requirement to combine encoder and decoder and add them to the pipeline together
There are two ways to achieve combination:
- Inherit ChannelDuplexHandler
Inherit ChannelDuplexHandler and implement the decodelast encode method - Inherit CombinedChannelDuplexHandler
Inherit the CombinedChannelDuplexHandler and pass the encoder and decoder as generics
Bootstrap
Bootstrap is used to configure and start the client and server
ServerBootstrap is used to configure the server
Bootstrap is used to configure the client
Client Boostrap
EventLoopGroup group = new NioEventLoopGroup(); try { // Configure client Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); // Connect to the server, and ChannelFuture represents the connection result ChannelFuture f = b.connect(); // The connection result registers a callback. When the connection is established successfully or fails, the callback will be executed f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("Connection established"); } else { System.out.println("Connection failed"); } } }); } finally { group.shutdownGracefully().sync(); }
Server Bootstrap
The api of Server Bootstrap is similar to that of Client Bootstrap, except that there are many methods starting with child
This is because there are two channels on the server: ServerChannel and ChildChannel
- ServerChannel is responsible for receiving new connections and creating ChildChannel for each connection
- Each ChildChannel represents an established connection
Methods not starting with child are used to configure ServerChannel, and methods starting with child are used to configure ChildChannel
// Specify how to handle events, such as connection establishment, sending data and receiving data EventLoopGroup group = new NioEventLoopGroup(); try { // Use Bootstrap to configure the server ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoServerHandler()); } }); // Blocking waiting for binding to succeed ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); }
test
To facilitate testing, Netty provides the EmbeddedChannel class, which mainly provides the following methods:
- writeInbound
- readInbound
- writeOutbound
- readOutbound
Take outbound as an example. When writeOutbound is called, it will propagate along the outbound direction of channelPipeline. After being processed by all channeloutbound handlers on channelPipeline, you can call readOutbound to read the processed results for verification
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException("frame length must be positive"); } this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while(in.readableBytes() >= frameLength) { ByteBuf buf = in.readBytes(frameLength); out.add(buf); } } } public class FixedLengthFrameDecodeTest { @Test public void testFrameDecode() { ByteBuf buf = Unpooled.buffer(); for (int i = 0;i < 9;i ++) { buf.writeByte(i); } ByteBuf input = buf.copy(); EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3)); Assert.assertTrue(channel.writeInbound(input)); System.out.println(buf.refCnt()); Assert.assertTrue(channel.finish()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertNull(channel.readInbound()); } }
Thread model
One thread per task
introduce
The simplest thread model is to create a new thread to execute each task
advantage
simple
shortcoming
Low efficiency, frequent thread creation and thread destruction will bring a lot of overhead
Thread pool
introduce
After the thread completes the task, it is not destroyed immediately, but put into the thread pool for reuse
advantage
Avoid frequent thread creation and thread destruction
shortcoming
- Managing resources and context switching incurs overhead
- Development is difficult
event loop
introduce
Handling events in a loop
advantage
- Simplify development and don't care about synchronization
Class inheritance structure
use
Channel ch = ...; Future<?> future = ch.eventLoop().execute(new Runnable() { public void run() { // do something } })
A channel will be bound to an eventloop, and all operations related to the channel will be executed in the eventloop
You can get its bound eventloop through channel
Execution of channel events
When submitting a task to its bound eventloop through channel, it will judge whether the current thread and the thread bound by eventloop are the same
If it is the same, it will be executed immediately. If it is not the same, it will be added to the queue of eventloop and executed again when it is traversed next time
This ensures that no matter which thread executes operations through the channel, it will eventually be handed over to its bound eventloop for processing, so as to ensure that all events of the channel are executed by the same thread
task scheduling
Channel ch = ...; ScheduledFuture<?> future = ch.eventLoop().schedule( new Runnable() { public void run() { // do something } } , 60, TimeUnit.SECONDS);
The task scheduling in netty is for the paper "hashed and hierarchical timing wheels: data structures to effectively implement timer facility", which will not 100% guarantee that the tasks will be scheduled on time
The execution of eventloop can be simplified into the following steps:
- Submit a deferred task
- The task is inserted into the scheduling task queue of eventloop
- eventloop detects whether there are currently tasks that can be processed
- If there are tasks, execute them immediately and remove them from the queue
- Continue to check whether there are tasks and repeat step 4
IO thread allocation
Using the non blocking transport layer, such as NIO, a thread will manage multiple connections, and one connection corresponds to one channel
Using the blocking transport layer, such as OIO, a thread manages only one connection, and one connection corresponds to one channel