Do you understand Netty? Can't understand the source code? It doesn't exist. Take you to read the Netty source code

Prepare the following code in advance, start from the server construction, and deeply analyze the startup process of Netty server.

public class NettyBasicServerExample {

    public void bind(int port){
        //The server programming of netty should start with EventLoopGroup,
        // We want to create two eventloopgroups,
        // One is that boss is specifically used to receive connections, which can be understood as handling accept events,
        // The other is worker, which can focus on events other than accept and handle subtasks.
        //As noted above, one boss thread is generally set, and only one is used when setting multiple boss threads, and there are no application scenarios at present,
        // worker threads are usually tuned according to the server. If they are not written, the default is twice the cpu.
        EventLoopGroup bossGroup=new NioEventLoopGroup();

        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try {
            //To start the server, you need to create ServerBootStrap,
            // In this, netty encapsulates the template code of nio
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                //Configure the channel of the Server, which is equivalent to the ServerSocketChannel in NIO
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO)) //Set the Handler corresponding to ServerSocketChannel
                //childHandler indicates that a processor is configured for those worker threads,
                // This is what NIO said above. It abstracts the specific logic for processing business and puts it into the Handler
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                            .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                            .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                            .addLast(new NormalInBoundHandler("NormalInBoundC",true));
                        socketChannel.pipeline()
                            .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                            .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                            .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
                            .addLast(new ExceptionHandler());
                    }
                });
            //Bind port and wait for client connection synchronously
            ChannelFuture channelFuture=bootstrap.bind(port).sync();
            System.out.println("Netty Server Started,Listening on :"+port);
            //Wait for the server listening port to close
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //Free thread resources
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyBasicServerExample().bind(8080);
    }
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandler:"+name);
        if(flush){
            ctx.channel().writeAndFlush(msg);
        }else {
            throw new RuntimeException("InBoundHandler:"+name);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InboundHandlerException:"+name);
        super.exceptionCaught(ctx, cause);
    }
}
public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
    private final String name;

    public NormalOutBoundHandler(String name) {
        this.name = name;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandler:"+name);
        super.write(ctx, msg, promise);
    }
}

Before starting the server, you need to configure the relevant parameters of ServerBootstrap. This step is roughly divided into the following steps

  • Configure EventLoopGroup thread group
  • Configure Channel type
  • Set the Handler corresponding to ServerSocketChannel
  • Set the port for network listening
  • Set the Handler corresponding to SocketChannel
  • Configure Channel parameters

Netty will assemble the information we configured and publish the service monitoring.

ServerBootstrap parameter configuration process#

The following code is to configure the parameters related to serverbootstrap. The process is relatively simple, that is, save the configured parameter values to the member variables defined by serverbootstrap.

bootstrap.group(bossGroup, workerGroup)
    //Configure the channel of the Server, which is equivalent to the ServerSocketChannel in NIO
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.INFO)) //Set the Handler corresponding to ServerSocketChannel
    //childHandler indicates that a processor is configured for those worker threads,
    // This is what NIO said above. It abstracts the specific logic for processing business and puts it into the Handler
    .childHandler(new ChannelInitializer<SocketChannel>() {
    });

Let's take a look at the class diagram and attribute definition of ServerBootstrap

ServerBootstrap class diagram#

As shown in Figure 8-1, it represents the class relationship diagram of ServerBootstrap.

  • AbstractBootstrap defines an abstract class. As an abstract class, it must be separated from the abstract logic related to Bootstrap. Therefore, it is obvious that Bootstrap should also inherit AbstractBootstrap
  • ServerBootstrap, the startup class of the server,
  • The serverbootstrap acceptor inherits the ChannelInboundHandlerAdapter, so it is itself a Handler. When the server starts and the client connects, it will first enter the serverbootstrap acceptor.

Figure 8-1 ServerBootstrap class diagram

AbstractBootstrap property definition#

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    @SuppressWarnings("unchecked")
    private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
    @SuppressWarnings("unchecked")
    private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
    /**
     * The EventLoopGroup here, as the Acceptor thread on the server side, is responsible for processing the access request of the client side
     * As a client Connector thread, it is responsible for registering the listening connection operation bit to judge the asynchronous connection result.
     */
    volatile EventLoopGroup group; //
    @SuppressWarnings("deprecation")
    private volatile ChannelFactory<? extends C> channelFactory;  //The channel factory is obviously used to manufacture the corresponding channel
    private volatile SocketAddress localAddress;  //SocketAddress is used to bind a server address

    // The order in which ChannelOptions are applied is important they may depend on each other for validation
    // purposes.
    /**
     * ChannelOption You can add channel to add some configuration information
     */
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    /**
     *  ChannelHandler How to handle the IO event of chaner.
     */
    private volatile ChannelHandler handler;
}

The above attribute definitions are summarized as follows:

  1. A ChannelFactory object is provided to create a Channel. A Channel will correspond to an EventLoop for IO event processing. Only one EventLoop will be bound in the whole life cycle of a Channel. Here, it can be understood that a thread is allocated to the Channel for IO event processing, and the thread is recycled after completion.
  2. AbstractBootstrap does not provide an EventLoop, but an EventLoopGroup. In fact, I think only one EventLoop is needed here.
  3. Both the server and client channels need to be bound to a local port, so there is the localAddress object of the SocketAddress class.
  4. There are many options for the Channel. All have the options object LinkedHashMap < channeloption <? >, Object>
  5. How to handle Channel IO events? We add an event handler ChannelHandler object.

ServerBootstrap property definition#

ServerBootstrap can be understood as a factory class started by the server. We can use it to complete the server-side Netty initialization. Main responsibilities:|

  • EventLoop initialization
  • channel registration
  • Initialization of pipeline
  • Adding process of handler
  • Server connection processing.
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    // The order in which child ChannelOptions are applied is important they may depend on each other for validation
    // purposes.
    //SocketChannel related property configuration
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); //Configuration class
    private volatile EventLoopGroup childGroup;  //Worker thread group
    private volatile ChannelHandler childHandler; //Handler responsible for IO processing of SocketChannel

    public ServerBootstrap() { }
}

Analysis of server startup process#

After understanding the configuration of ServerBootstrap related attributes, let's continue to look at the service startup process. When starting the next analysis, we might as well think about the following problems

  • How are the channels implemented by Netty and the channels provided by the underlying JDK connected and implemented
  • The function and implementation principle of ChannelInitializer, a special Handler processor
  • How is Pipeline initialized

ServerBootstrap.bind#

Let's start with serverbootstrap The definition of bind () method is mainly used to bind a port and publish the server to listen.

According to our understanding of using NIO related APIs, it is nothing more than using the underlying API of JDK to open a server to listen and bind a port.

 ChannelFuture channelFuture=bootstrap.bind(port).sync();
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
  • validate(), verify whether the core member attributes of ServerBootstrap are configured correctly, such as group, channelFactory, childHandler, childGroup, etc. if these attributes are not configured, the server will report an error when starting
  • localAddress, bind a local port address

doBind#

The doBind method is relatively long. From a large code structure, it can be divided into three parts

  • initAndRegister initializes and registers the Channel and returns a ChannelFuture, indicating that the initialization and registration of the Channel is an asynchronous implementation
  • regFuture.cause() is used to determine whether initAndRegister() has an exception. If an exception occurs, it will be returned directly
  • regFuture.isDone(), judge whether the initAndRegister() method is completed. If the execution is complete, the doBind0() method is called. If the execution is not completed, regfuture adds a listening callback, and judges the execution result again in the listening callback for related processing. PendingRegistrationPromise is used to save the status of asynchronous execution results

From the overall code logic, the logical structure is still very clear. The initAndRegister() method is responsible for Channel initialization and registration, and the doBind0() method is used to bind ports. This is nothing more than what we do to publish services using NIO related API s.

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
	
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

initAndRegister#

As the name suggests, this method is initialization and registration, which can be guessed based on the analysis of our whole process

  • Initialization should be to build the Handler processing chain on the server side
  • Register should register the connection of the current server to the selector

Let's confirm our conjecture through the source code.

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //Create a specific Channel implementation through ChannelFactory
        channel = channelFactory.newChannel();
        init(channel); //initialization
    } catch (Throwable t) {
        //Omit
    }
    //This code should be consistent with our guess, that is, register the currently initialized channel with the selector. This process is also asynchronous
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) { //Get the execution result of regFuture
        if (channel.isRegistered()) { 
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

channelFactory.newChannel()#

Before analyzing this method, we can continue to speculate its logic.

In the code for building the server at the beginning, we set a channel
NioServerSocketChannel.class object, which represents the specific API used for the construction of the current channel

bootstrap.group(bossGroup, workerGroup)
    //Configure the channel of the Server, which is equivalent to the ServerSocketChannel in NIO
    .channel(NioServerSocketChannel.class)

In the initAndRegister method, channelfactory is used Newchannel () to generate a specific channel instance. Therefore, it is not difficult to think that there must be a certain relationship between the two. We can also arbitrarily think that this factory will dynamically build a specified channel instance according to the channel we configured.

channelFactory has multiple implementation classes, so we can find the specific definition of channelFactory from the configuration method. The code is as follows.

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
        ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

The specific implementation of channelFactory is reflective channelFactory, so we locate the implementation of newChannel() method.

ReflectiveChannelFactory.newChannel#

In this method, an instance is built using constructor.

@Override
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

The initialization code of the constructor is as follows. The clazz class passed in is used to obtain the constructor of this class. The constructor can create an instance object through newInstance

At this time, clazz is actually NioServerSocketChannel

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }
}

NioServerSocketChannel#

The construction method of NioServerSocketChannel is defined as follows.

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
}

When NioServerSocketChannel is instantiated, the newSocket method is invoked to create a server instance.

Called in the newSocket method.
provider.openServerSocketChannel() to complete the creation of ServerSocketChannel, which is the server API in NIO in Java.

public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

Through layer by layer deduction, we finally saw how Netty encapsulated step by step and completed the creation of ServerSocketChannel.

Set non blocking#

In the construction method of NioServerSocketChannel, first call the parent class through super to do some configuration operations

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

Finally, super will call the constructor in AbstractNioChannel,

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp; //Set the concern event. This is a connection event, so it is OP_ACCEPT
    try {
        ch.configureBlocking(false); //Set non blocking
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

Continue analyzing initAndRegister#

After the initialization of the channel is completed, the next step is to register the current channel with the Selector, so continue to return to the initAndRegister method.

final ChannelFuture initAndRegister() {
//Omit
    //This code should be consistent with our guess, that is, register the currently initialized channel with the selector. This process is also asynchronous
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) { //Get the execution result of regFuture
        if (channel.isRegistered()) { 
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

Registering with a Selector is actually registering with an EventLoopGroup. If you can guess, it means that you understand the previous content.

config(). group(). The code of register (channel) is actually to obtain the boseventloopgroup configured in ServerBootstrap, and then register the current server channel into the group.

At this time, when we want to see the implementation of register through shortcut keys, we find that there are multiple implementations of EventLoopGroup. Let's take a look at the class relationship diagram, as shown in Figure 8-2.

Figure 8-3 EventLoopGroup class diagram

The implementation class of the EventLoopGroup we configured earlier is NioEventLoopGroup, and NioEventLoopGroup inherits from MultithreadEventLoopGroup. Therefore, in the register() method, we can directly find the implementation method of the parent class.

MultithreadEventLoopGroup.register#

Everyone is familiar with this code. Select a NioEventLoop from the NioEventLoopGroup and register the current channel

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

The next() method returns NioEventLoop, which has multiple implementation classes. Let's look at the class relationship diagram shown in Figure 8-4.

Figure 8-4 NioEventLoop class diagram

It is found from the class diagram that NioEventLoop derives from SingleThreadEventLoop, so next() register(channel); Method, which executes register in SingleThreadEventLoop

SingleThreadEventLoop.register#

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

ChannelPromise, derived from Future, is used to implement the callback function of asynchronous task processing. In short, it is to asynchronize the registered actions. When the asynchronous execution is completed, the execution results will be backfilled into ChannelPromise

AbstractChannel.register#

Abstract classes are generally the processing of public logic, and the processing here is mainly to judge some parameters, and then call the register0() method.

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) { //Judge whether it has been registered
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) { //Judge whether the EventLoop type is the EventLoop object type. If not, throw an exception
        promise.setFailure(
            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;
	//If the internal thread of Reactor is called, that is, the current register method is triggered by the EventLoop thread, execute the following process
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else { //If it is an external thread
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

AbstractChannel.register0#

Netty selects an EventLoop from the EventLoopGroup thread group and binds it to the current Channel. After that, all I/O events in the Channel life cycle are handled by this EventLoop.

The register0 method mainly does four things:

  • Call the API at JDK level to register the current Channel
  • Trigger HandlerAdded event
  • Trigger channelRegistered event
  • When the Channel status is active, the channelActive event is triggered

In the current logic of ServerSocketChannel connection registration, we only need to focus on the following doRegister method.

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();  //Call the register() method at the JDK level to register
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded(); //Trigger Handler, if necessary

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) { //This is the registration of ServerSocketChannel, so the connection is still inactive
            if (firstRegistration) {
                pipeline.fireChannelActive(); 
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

AbstractNioChannel.doRegister#

Enter into
AbstractNioChannel. The doregister method.

javaChannel().register() is responsible for calling JDK level methods and registering the channel with EventLoop () Unwrappedselector(), in which the third parameter is passed in the channel object implemented by Netty itself, that is, bind the object to the attachment.

The purpose of this is that each time the Selector object is called for event polling, Netty can obtain its own channel object when an event is triggered.

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

Service registration summary#

The above code is rather convoluted, but it is not difficult to understand as a whole

  • Initializes the specified Channel instance
  • Assign the Channel to an EventLoop
  • Then register the Channel in the Selector of the EventLoop

AbstractBootstrap.doBind0#

After analyzing the registration logic, go back to the doBind0 method in the AbstractBootstrap class. This method can be known without looking. After the ServerSocketChannel is initialized, the next thing to do is to bind an ip and port address.

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    //Get the eventLoop instance in the current channel and execute an asynchronous task.
    //It should be noted that, as we mentioned earlier in the course, eventLoop performs select traversal on the one hand and blocks tasks in the queue on the other hand. Here, tasks are added to the queue for asynchronous execution.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            //If the ServerSocketChannel is registered successfully, the bind method of the channel will be called
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel. The bind method will be called one by one according to the handler chain configuration in ServerSocketChannel. In this case, we have configured a LoggingHandler processor for ServerSocketChannel, so the bind method will call LoggingHandler first, and then call the bind method in DefaultChannelPipeline to call the link

-> DefaultChannelPipeline.ind

-> AbstractChannel.bind

​ -> NioServerSocketChannel.doBind

Finally, call the bind method in the previously initialized ServerSocketChannel to bind the local address and port.

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

Build Pipeline of SocketChannel#

In the configuration of ServerBootstrap, we configured inbound and outbound handlers for SocketChannel, that is, when the IO events of a SocketChannel are ready, they will be processed one by one according to the processor linked list we configured. When was the linked list built and what kind of structure is it? Let's analyze the content of this part

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline()
            .addLast(new NormalInBoundHandler("NormalInBoundA",false))
            .addLast(new NormalInBoundHandler("NormalInBoundB",false))
            .addLast(new NormalInBoundHandler("NormalInBoundC",true));
        socketChannel.pipeline()
            .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
            .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
            .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
            .addLast(new ExceptionHandler());
    }
});

Construction of childHandler#

The construction process of childHandler is in abstractchannel Implemented in register0 method

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); //This is to create a channel
            init(channel); //Here is initialization
        } catch (Throwable t) {
            //Omit
        }
        ChannelFuture regFuture = config().group().register(channel); //This is registration
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

ServerBootstrap.init#

The init method calls init() in ServerBootstrap. The code is as follows.

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;  //childHandler is the ChannelInitializer added during server configuration
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    // At this time, the Channel is NioServerSocketChannel. Here, add a processor chain for NioServerSocketChannel.
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler(); //If, during the construction of ServerBootstrap, the If the handler adds a processor, it will add the relevant processor to the pipeline in NioServerSocketChannel.
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() { //Asynchronous Tianjian is a serverbootstrap acceptor processor. From the name,
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        //currentChildHandler represents the pipeline of the SocketChannel. When a client connection is received, the handler will be added to the pipeline of the current SocketChannel
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

Among them, the core part of the above code is described as follows

  • ChannelPipeline is a defaultchannelpipeline protected {AbstractChannel(Channel parent) {this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();}
  • p.addLast adds a handler processor chain to NioServerSocketChannel. Here, a ChannelInitializer callback function is added. The callback is triggered asynchronously. Two things are done in the callback method if serverbootstrap If the handler adds a processor, the relevant processor will be added to the pipeline. In the case of this demonstration, we add LoggerHandler asynchronous execution and ServerBootstrapAcceptor. From the name, it is specially used to receive new connection processing.

Let's think about a question here. Why does NioServerSocketChannel need to callback the processor through ChannelInitializer? Why is the serverbootstrap acceptor added to the pipeline through asynchronous tasks?

The reason is that NioServerSocketChannel has not started to register the Channel with the Selector object during initialization, that is, it is impossible to register the ACCEPT event with the Selector, so the ChannelInitializer processor is added in advance. After the Channel registration is completed, add serverbootstrap acceptor to the Pipeline.

ServerBootstrapAcceptor#

Use the following method to demonstrate the construction process of Pipeline in SocketChannel

  1. Start server listening
  2. Place a breakpoint in the channelRead method of serverbootstrap acceptor
  3. Connect through telnet, and debug will be triggered.
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);  //Here, add the handler to the pipeline of SocketChannel

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        //Register the linked SocketChannel of the current client into an EventLoop.
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

Serverbootstrap acceptor is a special processor in NioServerSocketChannel on the server side. The channelRead event of this processor will only be triggered when a new connection is generated, so it is through final Channel child = (Channel) msg here; You can get the link SocketChannel of the client directly.

The server bootstrap acceptor then passes through childgroup Register () method to register the current NioSocketChannel to the worker thread.

Process of event triggering mechanism#

In the serverbootstrap acceptor, childgroup. Is called when a client connection is received Register (child) registers the current client connection to the Selector of the specified NioEventLoop.

This registration process is exactly the same as the NioServerSocketChannel registration process explained earlier, and will eventually enter abstractchannel Register0 method.

AbstractChannel.register0#

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered(); //Execute the ChannelRegistered() event in the pipeline.
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

pipeline.fireChannelRegistered()#

@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

The following event triggers are divided into two logics

  • If the current task is triggered in eventLoop, invokeChannelRegistered will be called directly
  • Otherwise, invokeChannelRegistered is executed asynchronously.
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

invokeChannelRegistered#

Trigger the channelRegistered method of the next handler.

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRegistered();
    }
}

Netty server startup summary#

So far, we have analyzed and completed the whole process of server startup. The main logic is as follows

  • Creating a server Channel essentially calls the JDK native Channel according to the implementation of user configuration
  • Initialize the core properties of the Channel, such as unsafe and pipeline
  • Initializing the Pipeline of the Channel is mainly to add two special processors, ChannelInitializer and serverbootstrap acceptor
  • Register the Channel of the server and add the OP_ACCEPT event. The underlying call here is the implementation of JDK level. It means that the Channel is registered with the Selector in boseventloop
  • Bind port: call API at JDK level to bind port.

Keywords: Java JavaEE Back-end Interview

Added by jstarkweather on Fri, 14 Jan 2022 19:08:09 +0200