preface
The notes are based on black horse's Netty teaching handout and some of my own understanding. I feel that this is a very good video I've seen. There's basically no nonsense. Video address: Black horse Netty . Here is.
Or this Code:
public class TestSourceServer { public static void main(String[] args) { new ServerBootstrap() //EventLoop has a thread and actuator selector, which is used to focus on events and solve some tasks .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>(){ @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); } }).bind(8080); } }
1. Review of accept in NiO
- selector.selecr() blocks until the event occurs
- Traversal processing selectedKeys
- Get a key and judge whether the type is accept
- Create SocketChannel and set non blocking
- Register SocketChannel with selector
- Set SocketChannel to follow the read event
2. accept process in netty
Next, point 10 of NioServerSocketChannel in the previous article. Here is the enter accept event. It can be said that points 1, 2 and 3 of nio are completed here, while unsafe read(); Complete the remaining three points
The following is the code in the read method, in which we mainly observe where points 4, 5 and 6 will be executed
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //4. Create SocketChannel and set non blocking //After looking at the source code, here is localRead =1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //Pipeline: get the pipeline of NioServertSocketChannel //Call the handler processing above //In this step, there are only three head accept end processors //It's all said in the previous article pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
The following is an important method. The function is written on it
1. int localRead = doReadMessages(readBuf)
@Override protected int doReadMessages(List<Object> buf) throws Exception { //Establish connection, create SocketChannel and return SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //NioSocketChannel is created. The following is to put NioSocketChannel into the result as a message //At that time, the processor on the pipeline will get the information and process it buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } //accept public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() { @Override public SocketChannel run() throws IOException { //Call serversocketchannel Accept completes the connection establishment return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
Conclusion: this method is mainly
- SocketChannel and NioServerSocketChannel are created
- Set NioServerSocketChannel as a message in the result
- Now run to pipeline When the firechannelread method is used, the handler will be called to handle accept
- So far, the fourth step is completed nio steps
2. pipeline.fireChannelRead(readBuf.get(i))
Running to this step means calling the handler on the pipeline to process the message. Once called, it will jump to the read method of the accept processor serverbootstrap acceptor below. The following is the flow of this method
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //Set processor child.pipeline().addLast(childHandler); //Set some parameters below setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { //At this time, some important processes are actually a new eventLoop //Find a selector inside to bind with the channel, and set one //The thread listens for events of the bound channel childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
1. childGroup.register(child).addListener(new ChannelFutureListener()
We enter this method layer by layer
Note why the following method does not use if, because when we run here, the thread used is the ServerSocketChannel thread in NioEventGroup, and the newly created SocketChannel and the current thread should not be the same
The familiar doRegister() method was also mentioned in the previous article (in the source code of netty). Its function is to bind nioServerSocketChannel and selector without paying attention to events. Here, step 5 is completed. 5 Register SocketChannel with the selector, nio steps
We continue to run in this method. There is a method under the diRegister() method. The function of this method is to trigger the initialization event on the newly created channel. After we continue to run, we will come to the initChannel method of the client we wrote. The main function depends on the name, that is, to add the processor handler
We continue to follow the above code down to the following fireChannelActive() method, which is used to focus on the read event
Look at the call chain of this method
When you come to the final calling method, you can see that the read event is called here. So far, step 6 is completed nio steps Of course, the call chain in the middle doesn't matter. It's just that you can realize that you have finally completed this step of the accept process in nio, but these six steps are encapsulated layer by layer in netty.
3. read process in netty
This is the same method. After the client connects, send a piece of data to the server. Note that the first entry is accept and the second entry is read. You can see that readyOps becomes 1
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } //To get the pipeline, you need to use the handler processor inside to process it final ChannelPipeline pipeline = pipeline(); // Get ByteBuf, because the message is here final ByteBufAllocator allocator = config.getAllocator(); //allocHandle: dynamically adjust the size of the above ByteBuf and use direct memory. Because it is an io operation, using direct memory is efficient final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //Allocate specific ByteBuf, and you can read the data after dividing byteBuf = allocHandle.allocate(allocator); //This method is to read the data sent by the client allocHandle.lastBytesRead(doReadBytes(byteBuf)); //Prove that you've read it if (allocHandle.lastBytesRead() <= 0) { //If there's nothing to read, release ByteBuf byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } //Read the message once and add it once allocHandle.incMessagesRead(1); readPending = false; //This is also an important method, which means calling the handler of our server to process the sent message //After this method is called, it will enter the handler we wrote pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
If there is any error, please point it out!!!!