Netty Source Code Analysis--Client Access Acept Process

Read through this article, you will understand
1. How netty accepts new requests
2. How netty assigns reactor threads to new requests
3. How netty adds ChannelHandler to each new connection

reactor thread in netty

There are two types of reactor threads at the core of netty. They can be regarded as two types of engines in netty, which drive the whole framework of netty.

One type of reactor thread is the boos thread group, which accepts new connections and then encapsulates them as channel objects and throws them to the worker thread group. Another type of reactor thread is the worker thread group, which is used to handle read and write of connections.

Whether it's a boos thread or a worker thread, what you do is divided into three steps

  1. Polling IO events registered on selector
  2. Handling IO events
  3. Execute asynchronous task

For boos threads, the first step of polling is basically accept events, indicating that there is a new connection, while the worker thread polling is basically read/write events, indicating the network read/write events.

Establishment of new connections

Simply put, the establishment of a new connection can be divided into three steps
1. New connections detected
2. Register the new connection to the worker thread group
3. Register read events for new connections

New connection entry detected

As we already know, when the server-side binding is started, the channel of the server-side has been registered in the boos reactor thread, and the reactor continuously detects new events until it detects that an accept event has occurred.

NioEventLoop.java

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //Check the SelectionKey Is it valid? If it is invalid, close it channel
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // If ready READ or ACCEPT Trigger unsafe.read() ,Check whether it is 0 or not, as stated in the English commentary on the source code above: JDK One that may produce a dead cycle bug. 
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {//If it is closed, it can be returned directly without further processing. channel Other events
                // Connection already closed - no need to handle write.
                return;
            }
        }
        // If ready WRITE The data in the buffer is sent out, and if the data in the buffer is sent out, the previous concerns are cleared. OP_WRITE sign
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // If it is OP_CONNECT´╝îNeed to be removed OP_CONNECT otherwise Selector.select(timeout)It will return immediately without any blockage, which may occur. cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

This method mainly checks the Selection Key. There are several different situations as follows.

1) OP_ACCEPT, accepting client connection

2) OP_READ, a readable event that receives new data from Channel for upper-level reading.

3) OP_WRITE, writable event, that is, upper layer can write data to Channel.

4) OP_CONNECT, connection establishment event, that is, TCP connection has been established, Channel is in active state.

This blog post mainly looks at what happens inside the box thread selector when it detects the OP_ACCEPT event.

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {//If it is closed, it can be returned directly without further processing. channel Other events
        // Connection already closed - no need to handle write.
        return;
    }
}

The boos reactor thread has polled for the SelectionKey.OP_ACCEPT event, indicating that a new connection has entered. At this time, unsafe of channel will be called to perform the actual operation. At this time, the channel is NioServerSocket Channel, and unsafe is NioMessageUnsafe, the attribute of NioServerSocket Channel.

So, let's move on to its read method, the second step in new connection processing

Register to reactor thread

NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }
    } while (allocHandle.continueReading());
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
}

Call the doReadMessages method to read messages continuously, and use readBuf as a container. Here, you can guess that read is a connection, then call pipeline.fireChannelRead(), baptize each new connection through a layer of server channel, then clean up the container and trigger pipeline. fireChannelReadCompl. Ete ()

Let's look at these two methods in detail.

1.doReadMessages(List)
2.pipeline.fireChannelRead(NioSocketChannel)

doReadMessages()

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

We finally peeped into the boundaries of netty calling jdk underlying nio, Java Channel (). accept (); because the reactor thread in netty scans the accept event in the first step, the accept method here returns immediately, returning to a channel created by jdk underlying NiO

Server Socket Channel has two modes: blocking and non-blocking.

Blocking mode: The ServerSocketChannel.accept() method listens for new incoming connections, and when the accept() method returns, it returns a SocketChannel containing new incoming connections. In blocking mode, the accept() method blocks until a new connection arrives.

b. Non-blocking mode: The accept() method returns immediately, and null will return if no new connection has been entered. Therefore, you need to check whether the returned SocketChannel is null.

In the constructor analysis of NioServerSocket Channel, we know that it is non-blocking through ch.configureBlocking(false); the statement sets the current ServerSocket Channel to be non-blocking.

netty encapsulates jdk's Socket Channel into a custom NioSocket Channel and adds it to the list so that the outer layer can traverse the list for subsequent processing.

From the previous article, we already know that a series of core components in netty, including pipeline,unsafe and so on, will be created when a new connection is accepted?

With that in mind, let's follow up.

NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

We focus on super(parent, socket), and the parent class of NioSocket Channel is AbstractNioByteChannel.

AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

Here, we see the familiar shadow in jdk nio, SelectionKey.OP_READ, which is registered in native jdk nio programming to show interest in channel reading.

Let's move on and trace back to AbstractNioByteChannel's parent class, AbstractNioChannel, where I'm sure you're impressed with this part of the code after reading the last article.

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

When you create a channel on the server side, you end up with this method, super(parent), which creates a series of components in AbstractChannel that are bound to that channel, as follows

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

readInterestOp here indicates that the channel is concerned about the event of SelectionKey.OP_READ, which will be registered with selector later, and then set the channel to non-blocking mode, creating unsafe and a pipeline in the channel.

pipeline.fireChannelRead(NioSocketChannel)

As we have already known before, in all kinds of channel s of netty, there will be a pipeline. Literally, it means pipeline. We can understand it as a pipeline process. The pipeline process has a starting point, an end, and a variety of pipeline checkpoints, an item, starting from the pipeline. Point start processing, through the processing of each pipeline level, and finally to the end of the pipeline.

For netty, the beginning of the pipeline is HeadContxt, and the end of the pipeline is TailConext. HeadContxt calls Unsafe to do specific operations. TailConext is used to throw unprocessed exceptions in pipeline and warning of unprocessed messages to users.

In the previous article, we have already known that when channel is initialized on the server side, a pipeline processor ServerBootstrapAcceptor has been automatically added to the pipeline, and a series of parameters set in user code have been passed into the constructor. Next, let's take a look at ServerBootstrapAccept. Or

ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;

    ServerBootstrapAcceptor(
            EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
}

The previous pipeline. fireChannelRead (NioSocket Channel); finally, through the call chain of head - > unsafe - > Server Bootstrap Acceptor, the channel Read method of Server Bootstrap Acceptor is invoked here, and the channel Read is forced to convert msg into Channel as soon as it comes up.

Then we get the channel, which is the corresponding pipeline in NioSocket Channel that we came out of earlier, and add the child Handler in user code to the pipeline, where the child Handler is embodied in user code.

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoServerHandler());
     }
 });

Channel Initializer corresponds to this. Here, the processor corresponding to pipeline in NioSocket Channel is head - > Channel Initializer - > tail, remember, I will mention it again later!

Next, set the attr and option corresponding to the NioSocket Channel, and then go to the child group. register (child), where the child group is the NioEventLoop Group that we new ly generated in the startup code.

We go into the register method of NioEventLoopGroup and proxy it to its parent class MultithreadEventLoopGroup.

MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

Here's another next() method. Let's follow it up.

MultithreadEventLoopGroup.java

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

Back to its parent class

MultithreadEventExecutorGroup.java

@Override
public EventExecutor next() {
    return chooser.next();
}

The corresponding class of chooser here is EventExecutor Chooser, which literally means Event Executor Chooser. The role in our context is to select a reactor thread from the worker reactor thread group.

public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();
    }
}

There are two implementations of chooser

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

By default, chooser is created by DefaultEvent Executor chooser Factory. When a reactor thread selector is created, the number of reactor threads is determined. If it is a power of 2, a PowerOf Tower Event Executor chooser is created. Otherwise, a GenericEvent Executor chooser is created.

Both types of selectors select reactor threads by Round-Robin. The only difference is that PowerOf TowEvent Executor Chooser is through operation, while GenericEvent Executor Chooser is through surplus operation, which is more efficient than surplus operation.

After selecting a reactor thread, that is, NioEventLoop, we go back to where we registered.

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

Actually, this is the same process as the server startup. You can refer to my previous article.

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

Like the server startup process, first call doRegister(); do the real registration process, as follows

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

Bind this channel to a selector, and a selector is used by a reactor thread, which is responsible for the subsequent polling and event handling of the channel.

After binding the reactor thread, call pipeline.invokeHandlerAddedIfNeeded()

As we mentioned earlier, so far NioSocket Channel has three processors in its pipeline, head - > Channel Initializer - > tail, which eventually calls the handler Added method of Channel Initializer.

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

After the handlerAdded method calls the initChannel method, it calls remove(ctx); it deletes itself, as follows

AbstractNioChannel.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

And here's the init Channel method? Let's go back to user methods, such as the following user code

user designation codes

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });

It turned out that we finally ran into our own code! After that, NioSocket Channel's bound pipeline processors include head - > Logging Handler - > EchoServer Handler - > tail

Registered Read Event

Next, we haven't finished analyzing the code yet.

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    // ..
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}

pipeline.fireChannelRegistered(); There's really nothing meaningful to do, but ultimately call the ChannelHandlerAdded method of each processor in the business pipeline to handle the callback.

isActive() returns true when the connection has been established, so enter the method block and go to pipeline.fireChannelActive(); here I omit the detailed steps and go directly to the key link.

AbstractNioChannel.java

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

This is actually registering the SelectionKey.OP_READ event to the selector, indicating that the channel is ready to start processing read events.

summary

So far, the handling of new connections in netty has been shown to you. Let's summarize.

1.boos reactor thread polls for new connection entry
2. Create NioSocket Channel and a series of netty core components by encapsulating the channel at the bottom of jdk
3. Select a worker reactor thread to bind the connection through chooser
4. Register Read Events and Start Reading and Writing New Connections

Keywords: Java Netty socket JDK

Added by Monshery on Wed, 11 Sep 2019 07:08:29 +0300