NioServerSocketChannel registration source code analysis

In the previous chapter, we analyzed the creation and initialization of NioServerSocketChaennl in Netty. In this chapter, we will continue to analyze the analysis of NioServerSocketChannel. NioServerSocketChannel is a channel object officially encapsulated by Netty, which is intended to replace or package the native SocketChannel object of JDK, So how does NioServerSocketChannel relate to the NIO related code of JDK? Welcome to official account, source apprentice.

1, Source code entry search

The source code method we mainly analyzed in the last lesson is initAndRegister method. In fact, it can be seen from the name that this method is used for channel initialization and registration. We continue to return to this method. For the search of this method, refer to the previous chapter:

AbstractBootstrap#initAndRegister

We skip the code analyzed in the last lesson and go directly to the logic related to registration:

ChannelFuture regFuture = config().group().register(channel);

We analyze it one by one:

config()

Now we create ServerBootstrap, so I won't say more about why we choose this one:

private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

We can see that what he returns is this object, which is automatically created when creating ServerBootstrap. We can see that he wears this in his construction method to prove that he holds a reference to ServerBootstrap, which means that he can obtain all properties and methods in ServerBootstrap through this object! What happens after getting this class?

config().group()

It is estimated that many of you have guessed it. Let's directly click into the group to verify it:

@SuppressWarnings("deprecation")
public final EventLoopGroup group() {
    return bootstrap.group();
}

This code obtains the bossGroup object set when we build ServerBootstrap again. If you are interested, you can catch up. It's relatively simple here. We won't elaborate too much. Let's go back to the main line,

config().group().register(channel);

Through the analysis of the above code, we know that the group method returns NioEventLoopGroup. We enter the register method:

We found that there is no NioEventLoopGroup here, but through our study in the previous chapters, we know that NioEventLoopGroup is a subclass of MultithreadEventLoopGroup, so our subclass does not look for the parent class. We enter the MultithreadEventLoopGroup source code:

@Override
public ChannelFuture register(Channel channel) {
    //Generally speaking, NioEventLoop obtained here inherits from SingleThreadEventLoop
    return next().register(channel);
}

Here, we see a code we analyzed earlier, next (), which calls chooser next();, Chooser is an executor selector created when we build NioEventLoopGroup. The function of the next method is to return a thread executor: NioEventLoop! Students who can't remember clearly can look back at the chapter of NioEventLoopGroup initialization source code analysis!

Now, based on the previous chapters, we know that the next() method returns a NioEventLoop class. Let's go to the register() method to see:

However, we found the implementation of NioEventLoop. According to what we learned earlier, we can know that the parent class of NioEventLoop is SingleThreadEventLoop, so we enter the SingleThreadEventLoop#register(io.netty.channel.Channel):

@Override
public ChannelFuture register(Channel channel) {
    //Call its own registration method
    return register(new DefaultChannelPromise(channel, this));
}

//There's nothing to say. Keep chasing
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

We can guess that the main code here is promise channel(). unsafe(). register(this, promise);

In the previous chapter, we analyzed that unsafe is NioMessageUnsafe, but register does not have its implementation:

We still need to go to the parent class and enter io netty. channel. AbstractChannel. AbstractUnsafe#register(this, promise):

Let's first focus on the parameters:

this: it passes in itself. What NioEventLoop it is, that is, it passes in an actuator

promise: wrapper object of NioServerSocketChannel

We enter the register method and analyze the main code:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ......................Ignore unnecessary code for now.............................
    AbstractChannel.this.eventLoop = eventLoop;
    //Note that thread = null at this time, so false is returned
    if (eventLoop.inEventLoop()) {
        //The actual registration selector triggers the handler added event and the channelRegistered event
        register0(promise);
    } else {
        .......................Ignore unnecessary code for now......................
    }
}
AbstractChannel.this.eventLoop = eventLoop;

First, we save the actuator obtained in the previous step in NioServerSocketChannel! This line of code strongly proves that each Channel binds a NioEventLoop object!

if (eventLoop.inEventLoop()) {
    //The actual registration selector triggers the handler added event and the channelRegistered event
    register0(promise);
}

Note: I need to clarify that in the real debugging process, I will not use this branch, but use the else branch to register asynchronously. Here, in order to make it easier for you to understand, I will analyze the source code according to the if branch. In fact, there is no big change. They all call the register0 method to register, but one synchronous and one asynchronous. About asynchrony, It is an important knowledge point in Netty. I will explain it in a separate chapter later!

We enter the register0 source code:

private void register0(ChannelPromise promise) {
    try {
        ..............Ignore code..................
]
        //The actual registration calls the data registration selector at the bottom of the jdk
        // Call register() at the bottom of JDK to register
        //io.netty.channel.nio.AbstractNioChannel.doRegister
        doRegister();
        neverRegistered = false;
        registered = true;

        //Notifies the pipeline to propagate the handlerAdded event
        //The handler added event is triggered, and the task add event is triggered
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //Notifies the pipeline to propagate the channelRegistered event
        // Trigger channelRegistered event
        pipeline.fireChannelRegistered();
        // If the channel has never been registered, only channelActive is triggered.
        // If you unregister and re register a channel, multiple channels are active.
        //isActive() returned false
        // At this time, the Channel has not registered the binding address, so it is inactive
        if (isActive()) {
            ....................Ignore unnecessary code..................
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leakage.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

2, Source code analysis

doRegister();

doRegister();

The real registration method is the most important class that connects nety's NioServerSocket with JDK!

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

The javaChannel() method returns the JDK native SocketChannel, which was saved when NioServerSocketChannel was initialized. Remember the process when NIO developed the Socket

Let's focus on javachannel () Parameters of register:

eventLoop().unwrappedSelector(): when NioEventLoop is re created, two selectors will be saved, one is the original JDK selector and the other is the Netty wrapped selector. The returned selector here is the native selector!

0: don't pay attention to any events

this: this represents the current class. It is of NioServerSocketChannel type. It binds an object of NioServerSocketChannel to the JDK native selector. Subsequently, it only needs to pass the selectionkey Attachment(), you can get the NioServerSocketChannel, and a NioServerSocketChannel contains a JDK native Channel object, so you can perform various read and write operations based on the JDK native Channel!

So far, we have completed the NIO in JDK and bound the channel to the selector. Let's go back to the previous step:

pipeline.invokeHandlerAddedIfNeeded

pipeline.invokeHandlerAddedIfNeeded();

Start callback pipeline channel and add custom events:

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        // Now we have registered to EventLoop. Now it's time to call the callback of ChannelHandler,
        // Add content before completing registration.
        callHandlerAddedForAllHandlers();
    }
}

//callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
    //task = PendingHandlerAddedTask
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

It should be noted that the PendingHandlerCallback task is of type PendingHandlerAddedTask. When was it loaded? In fact, when we initialize NioServerSocketChannel, we call the addLast method. Interested partners can follow the source code directly here:

if (executor.inEventLoop()) {
    callHandlerAdded0(ctx);
}

//Enter the callHandlerAdded0 source logic
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try{
        ctx.callHandlerAdded();
    }
    .......................
}

//Enter CTX callHandlerAdded();
final void callHandlerAdded() throws Exception {
    if (setAddComplete()) {
        handler().handlerAdded(this);
    }
}

Remember handler(), I said during NioServerSocketChannel initialization that the program added a ChannelInitializer to the pipeline, and the ChannelInitializer returned here! We enter the ChannelInitializer#handlerAdded method:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            removeState(ctx);
        }
    }
}

First, we focus on an initChannel(ctx),

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // Prevent re-entry.
    if (initMap.add(ctx)) {
        try {
            // Call the initChannel() method implemented by ChannelInitializer
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            ................................
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // Move the ChannelInitializer itself out of the Pipeline
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
initChannel((C) ctx.channel());

This method will call back the abstract method initChannel of ChannelInitializer. The abstract method is completed when we initialize. We need to find the place to implement this abstract method. Let's go back to the code of the previous lesson: io netty. bootstrap. ServerBootstrap#init

void init(Channel channel) {
    ..........................;
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            //Add the user-defined handler into the pipeline. The handler is the handler passed in when building the ServerBootStr
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(() -> {
                pipeline.addLast(new ServerBootstrapAcceptor(
                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            });
    }    
}

In the last lesson, we skipped this section of logic, only saying that a ChannelInitializer implementation will be added to the channel, and now we will call back its initChannel method:

ChannelHandler handler = config.handler();
if (handler != null) {
    pipeline.addLast(handler);
}

This code will add the handler passed in when the customer reconstructs the ServerBootstrap into the channel. For ease of understanding, we assume that the user does not set the handler, so the handler fails to pass the judgment. Skip. Let's continue:

ch.eventLoop().execute(() -> {
    pipeline.addLast(new ServerBootstrapAcceptor(
        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
});

Here, we register a default Handler asynchronously with the pipeline flow. Why is it asynchronous? Let's not talk about it for the moment. We think it is synchronous to add. At this time, our channel is as follows:

The function of serverbootstrap acceptor is specially used for new connection access,

ServerBootstrapAcceptor(
                final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
    this.childGroup = childGroup;
    this.childHandler = childHandler;
    this.childOptions = childOptions;
    this.childAttrs = childAttrs;

    enableAutoReadTask = new Runnable() {
        @Override
        public void run() {
            channel.config().setAutoRead(true);
        }
    };
}

We can see that it will save a series of parameters, including WorkGroup, childHandler, childOptions and childAttrs. These parameters are passed in when we re create serverBootstrap, which also proves that these parameters act on the client Socket connection!

For the serverbootstrap acceptor, we will conduct a detailed analysis later. We will go on to say that we need to focus on the addLast method,

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        ........................ignore.........................
        //Notification add method callback
        callHandlerAdded0(newCtx);
        return this;
    }

When adding, it will call back the internally generated handlerAdded method. Remember that we were introducing the business channel chapter of the basic architecture of Netty?

After calling addLast again, the method will be called back!

After all the methods are registered, let's go back to the ChannelInitializer#handlerAdded method. When * * initChannel(ctx) * * is called:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // Prevent re-entry.
    if (initMap.add(ctx)) {
        try {
            // Call the initChannel() method implemented by ChannelInitializer
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            ................................
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                // Move the ChannelInitializer itself out of the Pipeline
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

We will enter finally. We will see that an operation will be performed to delete the current class. Who is the current class? It is ChannelInitializer. Therefore, after deletion, the structure of the pipeline object is as shown in the figure:

At this point, the analysis of invokeHandlerAddedIfNeeded is completed

pipeline.fireChannelRegistered();

@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

There is nothing to say about this line of code. You can debug the code yourself. The meaning of this code is to spread the channelRegistered method from the HeadContext node:

So far, the registration of NioServerSocketChannel has been basically analyzed. Some students may feel that they have analyzed less:

if (isActive()) {
    if (firstRegistration) {
        //When the current status of the Channel is active, the channelActive event is triggered
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        // The channel is registered and autoRead() is set. That means we need to start reading
        // Again, so that we can process the inbound data.
        //
        // See https://github.com/netty/netty/issues/4805
        //Start reading event
        beginRead();
    }
}

This code will not be called when it is started for the first time, because the channel has not been bound and the port is officially started, so isActive will return false here. The relevant logic will be analyzed when explaining the new connection access!

3, Summary

  1. Netty will call the registration method at the bottom of JDK and bind its NioServerSocketChannel as att to the selection event!
  2. After registration, the handlerAdded method will be called back
  3. Netty will call back the channelization registered when reinitializing NioServerSocketChannel, add a new connection accessor serverbootstrap acceptor, and delete itself!
  4. When the registration is completed, the Channelregistered method will be called back

Keywords: Java Netty Concurrent Programming source code NIO

Added by punk3d on Wed, 26 Jan 2022 19:32:31 +0200