Source code analysis of EventLoop and EventLoopGroup of Netty core components

2021SC@SDUSC

In this blog, we will focus on the core components of netty, EventLoop and EventLoopGroup.

catalogue

1, Introduction example

2, Analysis

1. Relationship between nioeventloop and NioEventLoopGroup

2. Constructor of nioeventloop

3. newChild function in nioeventloopgroup

I   Introduction example

Netty provides a large number of demo s for users to use and test. Today, we will start from the EchoServer test class provided by netty in the figure below to analyze the two core components of EventLoop and EventLoopGroup.

 

Class code:

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    /**
     * @see io.netty.channel.nio.NioEventLoop
     */
    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)//new ReflectiveChannelFactory<C>(channelClass)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))// ServerSocketChannel exclusive
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception { // SocketChannel exclusive
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                 }
             });
           // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2, Analysis

In the main method, first create the SSL configuration class, and then create two EventLoopGroup objects:

      // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);

These two objects are the core objects of the whole Netty. It can be said that the operation of the whole Netty depends on them. bossGroup is used to accept Tcp requests. It will give the request to the workerGroup. The workerGroup will obtain the real connection and then communicate with the connection, such as reading, writing, decoding and encoding.

1. Relationship between nioeventloop and NioEventLoopGroup

EventLoopGroup construct:

First, the EventLoopGroup declares as follows:

      // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(8);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);

We enter   View in NioEventLoopGroup class:

2. Constructor of nioeventloop

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    /**
     * Create a new instance using the default number of threads, the given {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(ThreadFactory threadFactory) {
        this(0, threadFactory, SelectorProvider.provider());
    }

Constructor with the most complete parameters:

   public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             SelectorProvider selectorProvider,
                             SelectStrategyFactory selectStrategyFactory,
                             RejectedExecutionHandler rejectedExecutionHandler,
                             EventLoopTaskQueueFactory taskQueueFactory,
                             EventLoopTaskQueueFactory tailTaskQueueFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
    }

There are many NioEventLoop constructors. Each parameter can be customized. I won't post all of them. Finally, go back to the constructor with the most complete parameters. Let's explain the function of each parameter one by one:

  • nThreads: number of threads, corresponding to the number of eventloops. When it is 0, the default number is the number of CPU cores * 2
  • executor: we are all familiar with this. The thread that is ultimately used to execute EventLoop
  • chooserFactor: when we submit a task to the thread pool, chooserFactor will select a thread to execute according to the policy
  • selectorProvider: used to instantiate a selector in the jdk. No EventLoop has a selector
  • selectStrategyFactory: used to generate the corresponding selection policy factory when subsequent threads run
  • rejectedExecutionHandler: the same function as in the jdk thread pool. It is used to handle the situation that there are no redundant threads in the thread pool. By default, it throws an exception directly
/**
     * @param nThreads the number of threads that will be used by this instance.
     * @param executor the Executor to use, or {@code null} if default one should be used.
     * @param chooserFactory the {@link EventExecutorChooserFactory} to use.
     * @param selectorProvider the {@link SelectorProvider} to use.
     * @param selectStrategyFactory the {@link SelectStrategyFactory} to use.
     * @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use.
     * @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
     *                         {@link SingleThreadEventLoop#execute(Runnable)},
     *                         or {@code null} if default one should be used.
     * @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
     *                             {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)},
     *                             or {@code null} if default one should be used.
     */

The above are overloaded construction methods and add some default values, such as null executor, a singleton selection policy factory, and a default thread pool rejection policy. The following is the real construction method of NioEventLoopGroup. In the abstract parent class MultithreadEventExecutorGroup, the code is as follows:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // Create an EventLoop array with nThreads size
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // Creating a concrete EventLoop will call the methods in the subclass NioEventLoopGruop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // If one of them fails to create, close all the previously created ones
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // Provide the created EventLoop to EventExecutorChooser for subsequent selection
    chooser = chooserFactory.newChooser(children);

    // Add an EventLoop listener to listen for the termination status of EventLoop
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        // Cyclic addition
        e.terminationFuture().addListener(terminationListener);
    }
    // Convert the EventLoop array to a read-only set
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

analysis:

      This part of the code is very long. Let's split it:

1. If the executor is null, create a default threadpertask executor and use Netty's default thread factory.

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

2. Create a thread pool (singleton thread pool) array according to the number of incoming threads (CPU*2).

children = new EventExecutor[nThreads];

3. Loop fill the elements in the array. If the exception occurs, close all singleton thread pools.

4. Create a thread selector according to the thread selection factory. The default is 2 remainder (bit operation) or sequential acquisition.

 // Provide the created EventLoop to EventExecutorChooser for subsequent selection
    chooser = chooserFactory.newChooser(children);

5. Add a shutdown listener for each singleton thread pool.

 // Add an EventLoop listener to listen for the termination status of EventLoop
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

6. Add all singleton thread pools to a HashSet.

// Convert the EventLoop array to a read-only set
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);

3. newChild method in nioeventloopgroup

When a new Channel is connected, the NioEventLoopGroup needs to take out a NioEventLoop to bind the Channel, and the subsequent IO operations of the Channel are operated on the NioEventLoop. Here, the newChild method is called. The code is as follows:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
  • It can be found that it is new NioEventLoop
  • It can be seen that the parameters first used to create EventLoopGroup are actually used to create EventLoop

Follow up the constructor of NioEventLoop

Analysis comments in code

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
     	//Calling the constructor of the parent class is mainly to save the thread group NioEventLoopGroup
    	//Create a 2.1 billion task queue
    	//The executor is responsible for creating threads, and the executor is also saved.
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        /*Non mainline code, ignored
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }*/
    
        //The provider=SelectorProvider.provider() jdk comes with to create a ServerSocketChannel
        provider = selectorProvider;
    	//The openSelector method is responsible for encapsulating the selectedKeys of the hashSet structure of jdk into the SelectedSelectionKeySet defined by netty. There is an array structure in the SelectedSelectionKeySet. This optimization can be configured.
        final SelectorTuple selectorTuple = openSelector();
        //Replaced the native selector of the data structure selectedkeys publicselectedkeys
        selector = selectorTuple.selector;
        //The underlying data structure of the selector wrapped by the subclass has also been replaced
        unwrappedSelector = selectorTuple.unwrappedSelector;
        //selectStrategy=new DefaultSelectStrategyFactory()
        selectStrategy = strategy;
    }

  General process of newChild:

  • new NioEventLoop
    • Save the ThreadPerTaskExcutor you created earlier
    • Create MpscQueue (task queue)
    • Create selector

  Due to space reasons, we will analyze the NioEventLoop part in the next blog.

Keywords: Netty

Added by sandy1028 on Fri, 15 Oct 2021 20:17:22 +0300