Netty learning notes netty source code - accept and read process

preface

The notes are based on black horse's Netty teaching handout and some of my own understanding. I feel that this is a very good video I've seen. There's basically no nonsense. Video address: Black horse Netty . Here is.

Or this Code:

public class TestSourceServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                //EventLoop has a thread and actuator selector, which is used to focus on events and solve some tasks
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>(){

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler());
                    }
                }).bind(8080);
    }
}



1. Review of accept in NiO

  1. selector.selecr() blocks until the event occurs
  2. Traversal processing selectedKeys
  3. Get a key and judge whether the type is accept
  4. Create SocketChannel and set non blocking
  5. Register SocketChannel with selector
  6. Set SocketChannel to follow the read event



2. accept process in netty

Next, point 10 of NioServerSocketChannel in the previous article. Here is the enter accept event. It can be said that points 1, 2 and 3 of nio are completed here, while unsafe read(); Complete the remaining three points

The following is the code in the read method, in which we mainly observe where points 4, 5 and 6 will be executed

@Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                    	//4. Create SocketChannel and set non blocking
                    	//After looking at the source code, here is localRead =1 
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //Pipeline: get the pipeline of NioServertSocketChannel
                    //Call the handler processing above
                    //In this step, there are only three head accept end processors
                    //It's all said in the previous article
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

The following is an important method. The function is written on it


1. int localRead = doReadMessages(readBuf)

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
	//Establish connection, create SocketChannel and return
     SocketChannel ch = SocketUtils.accept(javaChannel());

     try {
         if (ch != null) {
         	 //NioSocketChannel is created. The following is to put NioSocketChannel into the result as a message
         	 //At that time, the processor on the pipeline will get the information and process it
             buf.add(new NioSocketChannel(this, ch));
             return 1;
         }
     } catch (Throwable t) {
         logger.warn("Failed to create a new channel from an accepted socket.", t);

         try {
             ch.close();
         } catch (Throwable t2) {
             logger.warn("Failed to close a socket.", t2);
         }
     }

     return 0;
 }

//accept
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                	//Call serversocketchannel Accept completes the connection establishment
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }


Conclusion: this method is mainly

  • SocketChannel and NioServerSocketChannel are created
  • Set NioServerSocketChannel as a message in the result
  • Now run to pipeline When the firechannelread method is used, the handler will be called to handle accept
  • So far, the fourth step is completed nio steps



2. pipeline.fireChannelRead(readBuf.get(i))

Running to this step means calling the handler on the pipeline to process the message. Once called, it will jump to the read method of the accept processor serverbootstrap acceptor below. The following is the flow of this method

 @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
			//Set processor
            child.pipeline().addLast(childHandler);
			//Set some parameters below
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
            	//At this time, some important processes are actually a new eventLoop
            	//Find a selector inside to bind with the channel, and set one
            	//The thread listens for events of the bound channel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }



1. childGroup.register(child).addListener(new ChannelFutureListener()

We enter this method layer by layer





Note why the following method does not use if, because when we run here, the thread used is the ServerSocketChannel thread in NioEventGroup, and the newly created SocketChannel and the current thread should not be the same



The familiar doRegister() method was also mentioned in the previous article (in the source code of netty). Its function is to bind nioServerSocketChannel and selector without paying attention to events. Here, step 5 is completed. 5 Register SocketChannel with the selector, nio steps


We continue to run in this method. There is a method under the diRegister() method. The function of this method is to trigger the initialization event on the newly created channel. After we continue to run, we will come to the initChannel method of the client we wrote. The main function depends on the name, that is, to add the processor handler



We continue to follow the above code down to the following fireChannelActive() method, which is used to focus on the read event


Look at the call chain of this method











When you come to the final calling method, you can see that the read event is called here. So far, step 6 is completed nio steps Of course, the call chain in the middle doesn't matter. It's just that you can realize that you have finally completed this step of the accept process in nio, but these six steps are encapsulated layer by layer in netty.



3. read process in netty

This is the same method. After the client connects, send a piece of data to the server. Note that the first entry is accept and the second entry is read. You can see that readyOps becomes 1

@Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            //To get the pipeline, you need to use the handler processor inside to process it
            final ChannelPipeline pipeline = pipeline();
            // Get ByteBuf, because the message is here
            final ByteBufAllocator allocator = config.getAllocator();
            //allocHandle: dynamically adjust the size of the above ByteBuf and use direct memory. Because it is an io operation, using direct memory is efficient
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
           
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                	//Allocate specific ByteBuf, and you can read the data after dividing
                    byteBuf = allocHandle.allocate(allocator);
                    //This method is to read the data sent by the client
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //Prove that you've read it
                    if (allocHandle.lastBytesRead() <= 0) {
                        //If there's nothing to read, release ByteBuf
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }
					//Read the message once and add it once
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //This is also an important method, which means calling the handler of our server to process the sent message
                    //After this method is called, it will enter the handler we wrote
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }




If there is any error, please point it out!!!!

Keywords: Java Netty Back-end source code analysis

Added by henryblake1979 on Mon, 31 Jan 2022 08:46:15 +0200