This article discusses the operation architecture of Netty core engine Reactor

This series of Netty source code analysis articles is based on 4.1.56 Final version

In this paper, the author will introduce the operation architecture of Netty's core engine Reactor. I hope that through this introduction, we can have a comprehensive understanding of how Reactor drives the operation of the whole Netty framework. It also paves the way for us to further introduce the relevant contents of Netty on the whole life cycle of processing network requests, so as to facilitate your subsequent understanding.

Then, before starting the formal content of this article, the author will take you to review the relevant content on how to build the whole framework of Netty introduced in the previous article. It doesn't matter if you haven't read the reader friends of the author's previous articles. These will not affect the reading of this article, but the parts related to relevant details. You can look back.

Previous review

stay Talk about Netty: the implementation of Reactor in Netty (creation) In this paper, we introduce the creation process of the master-slave Reactor thread group, the core engine of Netty server, and the important attributes in the related core components. In this process, we also mentioned Netty's optimization of various details, such as some optimization for JDK NIO native Selector, which shows Netty's pursuit of extreme performance. Finally, we created the Reactor with the following structure.

In the last article Detailed illustration of the whole startup process of Netty Reactor In, we completely introduced the whole process of Netty server startup, and introduced the properties and configuration methods related to ServerBootstrap involved in the startup process. The creation and initialization process of NioServerSocketChannel on the server receiving the connection and the inheritance structure of its class. It focuses on the registration process of NioServerSocketChannel to Reactor, the start time of Reactor thread and the initialization time of pipeline. Finally, the whole process of NioServerSocketChannel binding port address is introduced. In this process, we learned how these core components of Netty are connected in series.

After Netty is started, we get the following framework structure:

NioServerSocketChannel is managed in the main Reactor thread group. It is used to receive client connections, initialize the received client connections in the serverbootstrap acceptor in its pipeline, and then register the initialized client connections in the slave Reactor thread group.

The slave Reactor thread group is mainly responsible for listening and processing IO ready events of all client connections registered on it.

One Channel can only be assigned to a fixed Reactor. One Reactor is responsible for processing IO ready events on multiple channels, which can allocate the full number of client connections carried by the server to multiple reactors for processing, and ensure the thread safety of IO processing on channels. The corresponding relationship between Reactor and Channel is shown in the figure below:

The above content is a review of the relevant contents of the author's previous articles. You can recall better. It doesn't matter if you can't recall. It doesn't affect your understanding of the content of this article at all. If you are interested in relevant details, you can look back after reading this article.

Let's get back to business and officially start the content of this article. Next, the author will introduce how these core components cooperate with each other to drive the operation of the whole Netty Reactor framework.

When the Netty Reactor framework is started, the first and most important thing is how to efficiently receive client connections.

Before discussing how the Netty server receives connections, we need to find out the running mechanism of the Reactor thread and how it listens to and handles IO ready events on the Channel.

This article is equivalent to the following. We will introduce the front part of Reactor thread listening and processing ACCEPT events, Read events and Write events. This article focuses on the whole running framework of Reactor thread. Understanding the content of this article will be very helpful to understand how the Reactor thread handles IO events later.

We have mentioned the Reactor thread countless times in the creation stage and startup stage of Netty framework, so this Reactor thread should show its power in the running stage to be introduced in this article.

After the introduction of the previous article, we know that the Reactor thread in Netty mainly does three things:

  • Poll all channels registered on the Reactor for IO ready events of interest.

  • Handle IO ready events on the Channel.

  • Perform asynchronous tasks in Netty.

It is these three parts that constitute the operation framework of Reactor. Now let's see how the operation framework works~~

The whole running framework of Reactor thread

Do you remember the author in Talking about Netty: IO model from the perspective of kernel As mentioned in the article, the evolution of IO model revolves around the theme of "how to manage as many connections as possible with as few threads as possible".

Netty's IO model is an IO multiplexing model implemented through JDK NIO Selector, while netty's IO thread model is mainly from Reactor thread model.

according to Talking about Netty: IO model from the perspective of kernel In the IO multiplexing model introduced in this article, we can easily understand that Netty will use a user state Reactor thread to continuously rotate the IO readiness events on the Channel through the Selector in the kernel state.

To put it bluntly, what the Reactor thread actually executes is an endless loop. In the endless loop, it continuously goes through the Selector to rotate the IO ready event. If the IO ready event occurs, it will be returned from the Selector system call and processed. If the IO ready event does not occur, it will be blocked on the Selector system call until the Selector wake-up conditions are met.

As long as any one of the following three conditions is met, the Reactor thread will be awakened from the Selector:

  • When the Selector polls for an IO active event.

  • When the scheduled task to be executed by the Reactor thread reaches the task execution time deadline.

  • When an asynchronous task is submitted to Reactor, the Reactor thread needs to be awakened from the Selector in order to execute the asynchronous task in time.

It can be seen here that Netty is still hard to squeeze the reactor thread. Anyway, there is no IO ready event to deal with. The reactor thread cannot wait here for nothing. Wake it up immediately and transfer it to deal with the submitted asynchronous tasks and scheduled tasks. Reactor threads can be regarded as a model of 996. They work continuously.

After understanding the general operation framework of Reactor thread, let's go to the source code to see how its core operation framework is implemented.

Because this source code is relatively large and complex, the author first extracts its operation framework to facilitate everyone's overall understanding of the whole operation process.

The above figure shows the whole working system of Reactor, which is mainly divided into the following important working modules:

  1. The Reactor thread blocks getting IO ready events on the Selector. In this module, you will first check whether there is an asynchronous task to be executed. If there is an asynchronous task to be executed, no matter whether there is an IO ready event or not, it cannot be blocked on the Selector. Then you will poll whether there is an IO ready event on the Selector. If so, it can be executed together with the asynchronous task. Give priority to IO ready events and execute asynchronous tasks.

  2. If there are currently no asynchronous tasks to be executed, the Reactor thread will then check whether there are scheduled tasks to be executed. If so, it will block on the Selector until the deadline of the scheduled task is expired, or it will be awakened when other wake-up conditions are met. If there is no scheduled task to execute, the Reactor thread will block on the Selector until the wake-up condition is met.

  3. When the Reactor thread meets the wake-up conditions and is awakened, it will first judge whether it is awakened due to IO ready events, asynchronous tasks to be executed, or both. Then the Reactor thread will handle the IO ready event and execute asynchronous tasks.

  4. Finally, the Reactor thread returns to the starting point of the loop and repeats the above three steps continuously.

The above is the whole core logic of Reactor thread operation. The following is the author's extraction of Reactor's overall code design framework according to the above core logic. You can first feel the whole source code implementation framework in combination with the Reactor work flow chart above, so as to correspond Reactor's core processing steps with the corresponding processing modules in the code, There is no need to read every line of code here, but to understand it in the unit of logic processing module. Later, the author will introduce these logic processing modules one by one in detail.

  @Override
    protected void run() {
        //Record the polling times to solve the empty rotation training bug of JDK epoll
        int selectCnt = 0;
        for (;;) {
            try {
                //Polling results
                int strategy;
                try {
                    //Get the polling results according to the polling policy. The hasTasks() here mainly checks whether there are asynchronous tasks waiting to be executed in the ordinary queue and the tail queue
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO does not support spin (BUSY_WAIT)

                    case SelectStrategy.SELECT:

                      The core logic is that if there is a task to be executed, then Reactor The thread executes the asynchronous task immediately. If there is no asynchronous task, it will poll IO event

                    default:
                    }
                } catch (IOException e) {
                       ................ellipsis...............
                }

                Execution here indicates that the wake-up conditions are met, Reactor Thread from selector Wake up and start processing IO Ready events and executing asynchronous tasks
                /**
                 * Reactor Threads need to ensure timely execution of asynchronous tasks. As long as asynchronous tasks are submitted, they need to exit polling.
                 * If there are IO events, the IO events will be processed first, and then asynchronous tasks will be processed
                 * */

                selectCnt++;
                //It is mainly used to remove invalid selectkeys from the SelectedKeys set of IO ready
                needsToSelectAgain = false;
                //Adjust the CPU time ratio of the Reactor thread to execute IO events and asynchronous tasks. The default is 50, indicating that the time ratio of executing IO events and asynchronous tasks is one to one
                final int ioRatio = this.ioRatio;
             
               It mainly deals with IO Ready events and executing asynchronous tasks
               Need priority IO Ready event, and then according to ioRatio Set processing IO event CPU Time consuming and asynchronous tasks CPU Time ratio,
               To determine how long asynchronous tasks are performed

                //Judge whether to trigger JDK Epoll BUG and trigger null polling
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    //There are neither IO ready events nor asynchronous tasks. The Reactor thread is abnormally awakened from the Selector, triggering the JDK Epoll empty rotation BUG
                    //Rebuild the selector and return selectcnt to zero
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                ................ellipsis...............
            } catch (Error e) {
                ................ellipsis...............
            } catch (Throwable t) {
              ................ellipsis...............
            } finally {
              ................ellipsis...............
            }
        }
    }

From the source code implementation framework of Reactor extracted above, we can see that Reactor thread mainly does the following things:

  1. Poll the IO events of interest to all channels registered on the Reactor through JDK NIO Selector. For NioServerSocketChannel, because it is mainly responsible for receiving client connections, it listens to Op_ For the accept event, the client NioSocketChannel is mainly responsible for handling the read-write events on the connection, so it listens to the OP_READ and OP_WRITE event.

It should be noted here that netty will only automatically register OP_READ event, while op_ The write event is registered by the user when the Socket write buffer is full and cannot continue to write send data.

  1. If an asynchronous task needs to be executed, the polling operation will be stopped immediately and the asynchronous task will be executed. There are two situations:

    • Both IO ready events and asynchronous tasks need to be executed. The IO ready event is processed first, and then the asynchronous task execution time is determined according to the execution time ratio set by ioRatio. Here, the Reactor thread needs to control the execution time of asynchronous tasks, because the core of the Reactor thread is to handle IO ready events, and the most important things cannot be delayed due to the execution of asynchronous tasks.

    • No IO ready event occurs, but there are asynchronous tasks or scheduled tasks that need to be executed when they expire. Only execute asynchronous tasks and squeeze the Reactor thread as much as possible. You can't be idle without an IO ready event.

    In the second case, only 64 asynchronous tasks will be executed. The purpose is to prevent excessive execution of asynchronous tasks and delay the most important thing. Poll IO events.

  2. Finally, Netty will judge whether the wake-up of the Reactor thread is triggered JDK epoll null polling BUG If the BUG is triggered, the Selector will be rebuilt. Bypass JDK BUG to solve the problem.

Normally, the Reactor thread is awakened from the Selector in two ways:

  • Poll for IO ready events.
  • There are asynchronous tasks or scheduled tasks to be executed.
    and JDK epoll null polling BUG When neither of the above two situations occurs, the Reactor thread will be accidentally awakened from the Selector, causing the CPU to idle.

JDK epoll null polling BUG: https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6670302

Well, now that we have understood the overall operation structure framework of Reactor thread, let's go deep into these core processing modules to break them one by one~~

1. The reactor thread polls for IO ready events

stay Talk about Netty: the implementation of Reactor in Netty (creation) In this article, the author mentioned a constructor parameter SelectStrategyFactory when describing the creation process of the master-slave Reactor thread group NioEventLoopGroup.

   public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

  public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

The most important thing of the Reactor thread is to poll for IO ready events. SelectStrategyFactory is used to specify the polling policy, and the default implementation is defaultselectstrategyfactory INSTANCE.

At the beginning when the Reactor thread starts polling, it uses this selectStrategy to calculate a polling strategy. Later, different logical processing will be carried out according to this strategy.

  @Override
    protected void run() {
        //Record the polling times to solve the empty rotation training bug of JDK epoll
        int selectCnt = 0;
        for (;;) {
            try {
                //Polling results
                int strategy;
                try {
                    //Get the polling results according to the polling policy. The hasTasks() here mainly checks whether there are asynchronous tasks waiting to be executed in the ordinary queue and the tail queue
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO does not support spin (BUSY_WAIT)

                    case SelectStrategy.SELECT:

                      The core logic is that if there is a task to be executed, then Reactor The thread executes the asynchronous task immediately. If there is no asynchronous task, it will poll IO event

                    default:
                    }
                } catch (IOException e) {
                       ................ellipsis...............
                }

                ................ellipsis...............
}

Now let's look at the specific calculation logic of this polling strategy?

1.1 polling strategy

public interface SelectStrategy {

    /**
     * Indicates a blocking select should follow.
     */
    int SELECT = -1;
    /**
     * Indicates the IO loop should be retried, no blocking select to follow directly.
     */
    int CONTINUE = -2;
    /**
     * Indicates the IO loop to poll for new events without blocking.
     */
    int BUSY_WAIT = -3;

    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

Let's first look at the three polling strategies defined in Netty:

  • SelectStrategy.SELECT: at this time, there are no asynchronous tasks to be executed. The Reactor thread can safely block the Selector and wait for the IO ready event.

  • SelectStrategy.CONTINUE: restart a round of IO polling.

  • SelectStrategy.BUSY_WAIT: the Reactor thread performs spin polling. Since NIO does not support spin operation, skip to selectstrategy Select policy.

Let's take a look at the calculation logic calculateStrategy of the polling strategy:

final class DefaultSelectStrategy implements SelectStrategy {
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() { }

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        /**
         * Reactor Threads should ensure timely execution of asynchronous tasks
         * 1: If there are asynchronous tasks waiting to be executed, execute selectNow() non blocking polling for an IO ready event immediately
         * 2: If there is no asynchronous task, skip to the switch select branch
         * */
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
}
  • Before the polling of the Reactor thread starts, you need to first judge whether there are currently asynchronous tasks to be executed. The judgment basis is to check whether there are asynchronous tasks in the asynchronous task queue taskQueue in Reactor and the tail queue tailTask used for statistics tasks.
    @Override
    protected boolean hasTasks() {
        return super.hasTasks() || !tailTasks.isEmpty();
    }

   protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty();
    }
  • If there are asynchronous tasks to be executed in the Reactor, the Reactor thread needs to be executed immediately and cannot be blocked on the Selector. Before returning, you need to call selectNow() to check whether there is an IO ready event. If yes, it can be processed together with asynchronous tasks. If not, asynchronous tasks can be processed in time.

The semantics of Netty here is: first, the Reactor thread needs to give priority to the processing of IO ready events, and then ensure the timely execution of asynchronous tasks. If there is no IO ready event but an asynchronous task needs to be executed, the Reactor thread will execute the asynchronous task in time instead of continuing to block and wait for the IO ready event on the Selector.

   private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

   int selectNow() throws IOException {
        //Non blocking
        return selector.selectNow();
    }
  • If the current Reactor thread has no asynchronous tasks to execute, the calculateStrategy method directly returns SelectStrategy Select is the constant - 1 defined in the SelectStrategy interface. When the calculateStrategy method returns a non-zero value through selectNow(), it indicates that there are IO ready channels at this time, and the returned value indicates how many IO ready channels there are.
  @Override
    protected void run() {
        //Record the polling times to solve the empty rotation training bug of JDK epoll
        int selectCnt = 0;
        for (;;) {
            try {
                //Polling results
                int strategy;
                try {
                    //Get the polling results according to the polling policy. The hasTasks() here mainly checks whether there are asynchronous tasks waiting to be executed in the ordinary queue and the tail queue
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO does not support spin (BUSY_WAIT)

                    case SelectStrategy.SELECT:

                      The core logic is that if there is a task to be executed, then Reactor The thread executes the asynchronous task immediately. If there is no asynchronous task, it will poll IO event

                    default:
                    }
                } catch (IOException e) {
                       ................ellipsis...............
                }

                ................handle IO Ready events and executing asynchronous tasks...............
}

From the default polling strategy, we can see that selectstrategy Calculatestrategy returns only three cases:

  • Return - 1: the switch logic branch enters selectstrategy The select branch indicates that there are no asynchronous tasks to be executed in the Reactor at this time. The Reactor thread can safely block on the Selector and wait for the IO ready event to occur.

  • Return 0: the switch logical branch enters the default branch, indicating that there is no IO ready event in the Reactor at this time, but there are asynchronous tasks to be executed. The process directly enters the logical part of processing asynchronous tasks through the default branch.

  • Return > 0: the switch logic branch enters the default branch, indicating that there are both IO ready events and asynchronous tasks to be executed in the Reactor. The process directly enters the logic part of processing IO ready events and executing asynchronous tasks through the default branch.

Now that we know the direction of Reactor's process processing logic, let's focus on selectstrategy Polling logic in the select branch. This is the core of Reactor listening for IO ready events.

1.2 polling logic

                    case SelectStrategy.SELECT:
                        //At present, there is no asynchronous task execution, and the Reactor thread can safely block and wait for the IO ready event

                        //Take the deadline of the scheduled task to be executed from the scheduled task queue
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // -1 means there is no scheduled task in the current scheduled task queue
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }

                        //The deadline of the earliest scheduled task is used as the blocking time of the select, which means the execution time of the scheduled task is reached
                        //Regardless of whether there is an IO ready event or not, the selector must be awakened to enable the reactor thread to perform scheduled tasks
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                //Check again whether there are asynchronous tasks in the normal task queue
                                //If not, start select blocking and polling IO ready events
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This indicates that the Reactor has been awakened from the Selector
                            // Set the state of Reactor to wake up
                            // lazySet optimizes unnecessary volatile operations, does not use memory barriers, and does not guarantee the visibility of write operations (single thread does not need to be guaranteed)
                            nextWakeupNanos.lazySet(AWAKE);
                        }

When the process comes to this point, it shows that there is nothing to do on Reactor. You can safely block the Selector and wait for the IO ready event to arrive.

So how long should the Reactor thread be blocked on the Selector??

Before answering this question, let's review Talk about Netty: the implementation of Reactor in Netty (creation) When talking about the creation of Reactor in the article, it is mentioned that in addition to polling the IO ready events on the Channel and processing the IO ready events, the Reactor thread is also responsible for executing the asynchronous tasks in the Netty framework.

Asynchronous tasks in the Netty framework are divided into three categories:

  • Ordinary asynchronous tasks stored in taskQueue.

  • The tail tasks stored in the tail queue tail tasks are used to perform closing actions such as statistical tasks.

  • There is also a scheduled task that will be mentioned here. It is stored in the scheduled task queue scheduledtask queue in Reactor.

From the inheritance structure in the ReactorNioEventLoop class, we can also see that Reactor has the ability to perform scheduled tasks.

Since Reactor needs to perform scheduled tasks, it cannot always block the Selector and wait for IO ready events indefinitely.

So let's go back to the problem mentioned at the beginning of this section. In order to ensure that the Reactor can execute the scheduled task in time, the Reactor thread needs to be awakened before the deadline of the first scheduled task to be executed arrives.

Therefore, before the Reactor thread starts polling for IO ready events, we need to first calculate the blocking timeout of the Reactor thread on the Selector.

1.2.1 polling timeout of reactor

First, we need to get the scheduled task deadline to be executed from the scheduled task queue of Reactor. Use this deadline as the timeout for the Reactor thread to poll on the Selector. This ensures that when the scheduled task is about to be executed, the Reactor can now be awakened from the Selector in time.

    private static final long AWAKE = -1L;
    private static final long NONE = Long.MAX_VALUE;

    // nextWakeupNanos is:
    //    AWAKE            when EL is awake
    //    NONE             when EL is waiting with no wakeup scheduled
    //    other value T    when EL is waiting with wakeup scheduled at time T
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
      if (curDeadlineNanos == -1L) {
            // -1 means there is no scheduled task in the current scheduled task queue
            curDeadlineNanos = NONE; // nothing on the calendar
      }

      nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

    protected final long nextScheduledTaskDeadlineNanos() {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
    }

    final ScheduledFutureTask<?> peekScheduledTask() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
    }

}

The nextScheduledTaskDeadlineNanos method will return the latest scheduled task deadline time point in the current Reactor scheduled task queue. If there is no scheduled task in the scheduled task queue, it will return - 1.

The nextWakeupNanos variable in NioEventLoop is used to store the time point at which the Reactor is awakened from the Selector. It is set to the deadline at which the scheduled task needs to be executed recently. If there is no scheduled task to be executed at present, it is set to long MAX_ Value blocks until an IO ready event arrives or an asynchronous task needs to be executed.

1.2.2 Reactor starts polling for IO ready events

     if (!hasTasks()) {
             //Check again whether there are asynchronous tasks in the normal task queue. If not, start select ing blocking and polling IO ready events
            strategy = select(curDeadlineNanos);
     }

Before the Reactor thread starts blocking the polling IO ready event, you need to check again whether there are asynchronous tasks to be executed.

If there happens to be asynchronous task submission at this time, you need to stop the polling of IO ready events and transfer to execute asynchronous tasks. If there are no asynchronous tasks, polling for IO ready events will officially begin.

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            //When there are no scheduled tasks or ordinary tasks, start polling for IO ready events, and block until the wake-up condition is established
            return selector.select();
        }

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

If deadlineNanos == NONE, we know NONE after the introduction in the previous section
Indicates that there is no scheduled task in the current Reactor, so you can safely block the Selector and wait for the IO ready event to arrive.

selector. The select () call is a blocking call. If there is no IO ready event, the Reactor thread will block here until the IO ready event arrives. The empty polling BUG of JDK NIO Epoll mentioned earlier is not taken into account

After reading this, the problem arises. At this time, the Reactor thread is blocking the selector Wait for the arrival of the IO ready event on the select() call. If an asynchronous task is submitted to the Reactor for execution at this time, and there is no IO ready event at this time, and the Reactor thread will continue to block here because there is no IO ready event, how to execute the asynchronous task??

The ringer must also answer the bell. Since the asynchronous task wants to be executed immediately after being submitted, wake up the Reactor thread when submitting the asynchronous task.

    //addTaskWakesUp = true indicates that the Reactor thread will wake up if and only if the addTask method is called
    //addTaskWakesUp = false indicates that not only addTask method can wake up Reactor. There are other methods to wake up Reactor. The default setting is false
    private final boolean addTaskWakesUp;

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            //If the current thread is not a Reactor thread, start the Reactor thread
            //It can be seen here that the Reactor thread is started by adding asynchronous tasks to NioEventLoop
            startThread();
            .....................ellipsis...................
        }

        if (!addTaskWakesUp && immediate) {
            //io.netty.channel.nio.NioEventLoop.wakeup
            wakeup(inEventLoop);
        }
    }

I think you will not be unfamiliar with the execute method. In the last article Detailed illustration of the whole startup process of Netty Reactor In, we introduced this method when we introduced the startup of Reactor thread.

The important operations involved in the startup process, such as Register operation and Bind operation, need to be encapsulated into asynchronous tasks and submitted to Reactor for execution through this method.

Here we will focus on the wakeup logic in the second half of the execute method.

Let's first introduce the two parameters related to wakeup logic, boolean immediate and boolean addTaskWakesUp.

  • Immediate: indicates whether the submitted task needs to be executed immediately. In Netty, as long as the task type you submit is not LazyRunnable, it needs to be executed immediately. immediate = true

  • Addtaskwakesup: true indicates that the Reactor thread will wake up when and only when the addTask method is called. Calling other methods does not wake up the Reactor thread.
    When initializing NioEventLoop, it will be set to false, which means that not only the addTask method can wake up the Reactor thread, but also other methods can wake up the Reactor thread. For example, the execute method here will wake up the Reactor thread.

For this wake-up condition in the execute method! addTaskWakesUp & & immediate, netty: when the immediate parameter is true, it means that the asynchronous task needs to be executed immediately. The default setting of addTaskWakesUp is false, which means that not only the addTask method can wake up Reactor, but also other methods, such as the execute method. However, when it is set to true, the semantics becomes that only addTask can wake up Reactor. Even if immediate = true in the execute method, Reactor cannot be awakened because the execute method is executed instead of the addTask method.

    private static final long AWAKE = -1L;
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            //Wake up the Reactor thread from the Selector
            selector.wakeup();
        }
    }

When nextwakeupnanos = wake, it indicates that the current Reactor is in the wake-up state. Since it is in the wake-up state, it is not necessary to execute the selector Wakeup () wakes up Reactor repeatedly, which also saves the system call cost this time.

In the source code implementation framework introduced at the beginning of section 1.2 polling logic, after Reactor is awakened, the execution code will enter finally {...} Statement block, where nextWakeupNanos is set to await.

                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This indicates that the Reactor has been awakened from the Selector
                            // Set the state of Reactor to wake up
                            // lazySet optimizes unnecessary volatile operations, does not use memory barriers, and does not guarantee the visibility of write operations (single thread does not need to be guaranteed)
                            nextWakeupNanos.lazySet(AWAKE);
                        }

Here, Netty uses an AtomicLong type variable nextWakeupNanos, which can represent both the current state of the Reactor thread and the blocking timeout time of the Reactor thread. We can also learn this skill in daily development.

Let's go back to the main line of the Reactor thread polling for IO ready events.

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            //When there are no scheduled tasks or ordinary tasks, start polling for IO ready events, and block until the wake-up condition is established
            return selector.select();
        }

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

When deadlineNanos is not NONE, it means that there is a scheduled task to be executed by the Reactor at this time. The Reactor thread needs to block and wait for the IO ready event on the Selector until the latest scheduled task execution time point deadline arrives.

deadlineNanos here refers to the deadline of the latest scheduled task execution time point in Reactor, and the unit is nanosecond. It refers to an absolute time.

What we need to calculate is the timeout timeout of the Reactor thread blocking in the Selector. The unit is milliseconds, which refers to a relative time.

Therefore, before the Reactor thread starts blocking on the Selector, we need to convert the absolute time deadlineNanos in nanoseconds into the relative time timeoutMillis in milliseconds.

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            //When there are no scheduled tasks or ordinary tasks, start polling for IO ready events, and block until the wake-up condition is established
            return selector.select();
        }

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

Here, you may wonder why you should add 0.995 milliseconds to deadlineNanos when calculating timeoutMillis through deadlineToDelayNanos method??

Imagine this scenario. When the deadline of the latest scheduled task is about to arrive within 5 microseconds, the calculated timeout millis will be 0 by converting nanoseconds into milliseconds.

In Netty, the meaning of timeoutMillis = 0 is that the scheduled task execution time has reached the deadline time point and needs to be executed.

The reality is that the scheduled task still has 5 microseconds to reach the deadline. Therefore, in this case, it is necessary to add 0.995 milliseconds to the deadline nanos to form 1 millisecond, which cannot be 0.

So we can see from here that when Reactor has a scheduled task, it must block for at least 1 millisecond.

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    protected static long deadlineToDelayNanos(long deadlineNanos) {
        return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
    }
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {

    static long deadlineToDelayNanos(long deadlineNanos) {
        return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
    }

    //Start time point
    private static final long START_TIME = System.nanoTime();

    static long nanoTime() {
        return System.nanoTime() - START_TIME;
    }

    static long deadlineNanos(long delay) {
        //Calculate the deadline removal start time of scheduled task execution
        long deadlineNanos = nanoTime() + delay;
        // Guard against overflow
        return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
    }

}

It should be noted here that when creating a scheduled task, the execution deadline of the scheduled task will be calculated through the deadlineNanos method. The calculation logic of the deadline is the current time point + task delay - system startup time. The system startup time needs to be deducted here.

Therefore, when calculating the delay (i.e. timeout) through deadline, you need to add the system startup time: deadline nanos - nanotime ()

When the timeoutmillis calculated by deadlineToDelayNanos is < = 0, it means that the Reactor currently has an adjacent scheduled task to execute. At this time, it needs to return immediately. It cannot block the execution of scheduled tasks on the Selector. Of course, before returning to execute the scheduled task, you need to pass the Selector Selectnow() non blocking polls whether there are IO ready events on the Channel to prevent delaying the processing of IO events. What a broken heart~~

When timeoutMillis > 0, the Reactor thread can safely block the Selector and wait for the arrival of IO events until the timeout of timeoutMillis arrives.

timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)

When an IO event arrives in the Channel registered on the Reactor, the Reactor thread will start from the selector Wake up in the select (timeoutmillis) call and handle the IO ready event immediately.

Here is an extreme case. If the deadline of a recent scheduled task is at a distant point in time in the future, which will make timeoutMillis take a very, very long time, won't Reactor always block the Selector and make Netty unable to work?

I think you should have an answer now. We introduced it at the beginning of the section "1.2.2 Reactor starts polling IO ready events". When the Reactor is blocking on the Selector, if the user thread submits an asynchronous task to the Reactor at this time, the Reactor thread will be awakened through the execute method.

So far, the most important and core logic in Reactor: the processing flow of polling IO ready events on the Channel, which we have explained.

When Reactor polls that there are IO active events or asynchronous tasks to be executed, it will be awakened from the Selector. Now it's time to introduce how to handle IO ready events and execute asynchronous tasks after Reactor is awakened.

After all, Netty is a network framework, so it will give priority to handling IO events on the Channel. Based on this fact, Netty will not tolerate unlimited execution of asynchronous tasks, which will affect IO throughput.

Netty uses the ioRatio variable to adjust the proportion of CPU time allocated by Reactor threads between processing IO events and executing asynchronous tasks.

Let's take a look at the allocation logic of the execution time ratio~~~

2. Time proportion allocation between reactor processing IO and asynchronous tasks

Whenever an IO ready event arrives, Reactor needs to ensure that the IO event is processed in a timely and complete manner. ioRatio mainly limits the time required to execute asynchronous tasks to prevent the Reactor thread from processing asynchronous tasks for too long, resulting in I/O events not being processed in time.

                //Adjust the CPU time ratio of the Reactor thread to execute IO events and asynchronous tasks. The default is 50, indicating that the time ratio of executing IO events and asynchronous tasks is one to one
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) { //Execute IO events in one brain first, and perform asynchronous tasks in one brain (no time limit)
                    try {
                        if (strategy > 0) {
                            //If there is an IO ready event, the IO ready event is processed
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        //Process all asynchronous tasks
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {//Execute IO event first, use time ioTime to execute asynchronous task, only use time ioTime * (100 - ioRatio) / ioRatio
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // Limit the processing of asynchronous tasks within the timeout time to prevent the Reactor thread from processing asynchronous tasks for too long, resulting in I/O event blocking
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else { //If there is no IO ready event processing, only 64 asynchronous tasks will be executed at most to prevent the Reactor thread from processing asynchronous tasks for too long and causing I/O event blocking
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
  • When ioRatio = 100, it means that there is no need to consider the limitation of execution time. When there is an IO ready event (strategy > 0), the Reactor thread needs to give priority to processing the IO ready event. After processing the IO event, it executes all asynchronous tasks, including ordinary tasks, tail tasks and scheduled tasks. No time limit.

The value of strategy indicates the number of IO ready channels. It is the IO introduced earlier netty. Channel. nio. Return value of nioeventloop#select method.

  • When the value set by ioRatio is not 100, the default value is 50. It is necessary to count the time-consuming ioTime of executing IO events first, and calculate the limit time of executing asynchronous tasks later according to ioTime * (100 - ioRatio) / ioRatio. In other words, the Reactor thread needs to execute limited asynchronous tasks within this limited time to prevent the Reactor thread from processing I/O events in time due to the long processing time of asynchronous tasks.

By default, the ratio between the time spent executing IO events and the time spent executing asynchronous tasks is set to be one-to-one.
The higher the ioRatio setting, the smaller the proportion of time the Reactor thread takes to execute asynchronous tasks.

To get the time limit required by the Reactor thread to execute asynchronous tasks, you must know the time-consuming ioTime of executing IO events, and then calculate the time limit of executing asynchronous tasks according to ioRatio.

If there is no IO ready event to be processed by the Reactor thread at this time, we can't get ioTime. How can we get the limited time to execute asynchronous tasks??

In this special case, Netty only allows the Reactor thread to execute up to 64 asynchronous tasks and then end the execution. Turn to continue rotation training IO ready event. The core purpose is to prevent the Reactor thread from processing I/O events in time due to the long processing time of asynchronous tasks.

By default, when Reactor has asynchronous tasks to process but no IO ready event, Netty will only allow Reactor threads to execute up to 64 asynchronous tasks.

Now that we know the overall framework of Reactor for processing IO events and asynchronous tasks, let's introduce the specific logic of Reactor threads in processing IO events and asynchronous tasks?

3. The reactor thread handles IO ready events

    //This field is the reference holding the selectedKeys of the selector object. When the IO event is ready, it is obtained directly from here
   private SelectedSelectionKeySet selectedKeys;

   private void processSelectedKeys() {
        //Whether to adopt the selectedKey collection type after netty optimization is determined by the variable disable_ KEY_ SET_ The default value determined by optimization is false
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

Are you familiar with this code??

Do you remember where we are Talk about Netty: the implementation of Reactor in Netty (creation) In this article, the Reactor NioEventLoop class is introduced. In the process of creating the Selector, it is mentioned that in consideration of the insertion and traversal performance of the selectedKeys set in JDK NIO Selector, Netty replaced the HashSet implementation of selectedKeys in JDK NIO Selector with the SelectedSelectionKeySet set implemented by array.

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO ready SelectionKey (wrapped with channel)
    protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    //All selectionkeys registered on the Selector (wrapped with channel)
    protected HashSet<SelectionKey> keys;

    ...............ellipsis...................
}

Disable by optimizing switch in Netty_ KEY_ SET_ Optimization controls whether the JDK NIO Selector is optimized. The default is to be optimized.

When the optimization switch is on, Netty will save the created SelectedSelectionKeySet set in the private SelectedSelectionKeySet selectedKeys field of NioEventLoop, so that the Reactor thread can directly obtain the IO ready SelectionKey from here.

When the optimization switch is off, Netty will directly adopt the default implementation of JDK NIO Selector. The selectedKeys field of NioEventLoop will be null.

Students who forget this paragraph can review it Talk about Netty: the implementation of Reactor in Netty (creation) About the creation process of Reactor.

After reviewing the previous content, we can see that the logic of processing IO ready events in Reactor is also divided into two parts, one is optimized by Netty and the other is native to JDK.

Let's first look at the processing method of JDK native Selector. After understanding this method, it will be easier to look at Netty optimization.

3.1 processSelectedKeysPlain

We are Talk about Netty: the implementation of Reactor in Netty (creation) When introducing the working process of JDK NIO Selector in the article, when an IO ready event occurs in the Channel registered on the Selector, the Selector will insert the IO ready SelectionKey into the set < SelectionKey > selectedkeys set.

At this time, the Reactor thread will start from Java nio. channels. Returned in the selector #select (long) call. Then call java.. nio. channels. Selector #selectedkeys gets the SelectionKey collection of IO ready.

Therefore, the Reactor thread needs to call selector before calling the processselectedkeysplay method to process the IO ready event Selectedkeys() to get all IO ready SelectionKeys.

processSelectedKeysPlain(selector.selectedKeys())
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            //Notice the keyiterator at the end of each iteration Remove() call. The Selector does not remove the SelectionKey instance from the selected keyset by itself.
            //You must remove it yourself when you finish processing the channel. The next time the channel becomes ready, the Selector will put it into the selected keyset again.
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            //The purpose is to enter the for loop again and remove the invalid selectkey (the socketchannel may be removed from the selector)
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

3.1.1 obtain IO ready channels

Set < SelectionKey > selectedkeys set contains all IO ready selectionkeys. Note that at this time, the implementation type of set < SelectionKey > selectedkeys is HashSet type. Because we first introduce the native implementation of JDK NIO.

Start processing IO ready channels one by one by obtaining the iterator of HashSet.

Iterator<SelectionKey> i = selectedKeys.iterator();
final SelectionKey k = i.next();
final Object a = k.attachment();

Do you remember what is stored in the attachment attribute in the SelectionKey??

In the last article Detailed illustration of the whole startup process of Netty Reactor In, when NioServerSocketChannel registers with Main Reactor, we register itself as the attachment property of SelectionKey in Selector through this pointer. This step completes the binding of Netty custom Channel and JDK NIO Channel.

public abstract class AbstractNioChannel extends AbstractChannel {

    //The SelectKey obtained after the channel is registered with the Selector
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............ellipsis....................
            }
        }
    }

}

We also mentioned that the SelectionKey is equivalent to a representation of the Channel in the Selector. When there is an IO ready event on the Channel, the Selector will return the SelectionKey corresponding to the Channel to the Reactor thread. We can obtain the corresponding Netty custom Channel through the attachment attribute in the returned SelectionKey.

When the client connection event (OP_ACCEPT) is active, the Channel type here is NioServerSocketChannel.
When the client Read and Write events are active, the Channel type here is NioSocketChannel.

When we get the Channel defined by Netty through k.attachment(), we need to delete the SelectionKey corresponding to this Channel from the ready set set < SelectionKey > selectedkeys of the Selector. Because the Selector will not actively delete the processed SelectionKey, the caller needs to actively delete it. In this way, when the Channel is IO ready again, the Selector will put the SelectionKey corresponding to the Channel into the ready set set set < SelectionKey > selectedkeys again.

i.remove();

3.1.2 handling IO events on Channel

     if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
     } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
     }

From here, we can see that there are two kinds of objects that Netty attaches to the attachment attribute in SelectionKey:

  • One is the familiar Channel. Both the NioServerSocketChannel used by the server and the NioSocketChannel used by the client belong to AbstractNioChannel. IO events on the Channel are handled by the Netty framework, which we will focus on in this section

  • The other is NioTask, which is provided by Netty. Users can customize some custom processing when IO ready events occur on the Channel.

public interface NioTask<C extends SelectableChannel> {
    /**
     * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
     */
    void channelReady(C ch, SelectionKey key) throws Exception;

    /**
     * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
     * this {@link NioTask} will not be notified anymore.
     *
     * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
     *              the event loop has been shut down.
     */
    void channelUnregistered(C ch, Throwable cause) throws Exception;
}

NioTask and Channel are essentially the same. They are both responsible for handling IO readiness events on the Channel, but one is user-defined and the other is Netty framework. Here we focus on the IO processing logic of Channel

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //Get the underlying operation class Unsafe of the Channel
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            ......If SelectionKey If it has failed, close the corresponding Channel......
        }

        try {
            //Get IO ready event
            int readyOps = k.readyOps();
            //Handling Connect events
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                //Remove listening to Connect events, otherwise the Selector will always notify
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //Trigger channelActive event and handle Connect event
                unsafe.finishConnect();
            }

            //Handling Write events
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

             //Handle Read event or Accept event
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
  • First, we need to obtain the operation class Unsafe at the bottom of the IO ready Channel to handle specific IO ready events.

It can be seen here that Netty's handling of IO ready events is completely encapsulated in the unsafe class. For example: OP_ The specific processing logic of the accept event is encapsulated in the unsafe class in NioServerSocketChannel. To OP_READ or op_ The write event processing is encapsulated in the unsafe class in NioSocketChannel.

  • Get the specific IO ready event readyOps from the Selectionkey.

There are two sets of IO events in SelectonKey. One is interestOps, which is used to record IO events of interest to the Channel. After the Channel registers with the Selector, it is added to the ChannelActive event callback through the HeadContext node in the pipeline. The following code is that in the ChannelActive event callback, the Channel is registering its interested IO events with the Selector.

    public abstract class AbstractNioChannel extends AbstractChannel {
             @Override
              protected void doBeginRead() throws Exception {
                    // Channel.read() or ChannelHandlerContext.read() was called
                    final SelectionKey selectionKey = this.selectionKey;
                    if (!selectionKey.isValid()) {
                        return;
                    }

                    readPending = true;

                    final int interestOps = selectionKey.interestOps();
                    /**
                       * 1: ServerSocketChannel During initialization, readinterestopis set to OP_ACCEPT event
                       * 2: SocketChannel During initialization, readinterestopis set to OP_READ event
                     * */
                    if ((interestOps & readInterestOp) == 0) {
                        //Register listening OP_ACCEPT or OP_READ event
                        selectionKey.interestOps(interestOps | readInterestOp);
                    }
              }
    }

The other is readyOps, which is used to record which IO events are ready in the IO events of interest to the Channel.

Netty saves the collection of various events with an int variable.

  • Use the & operation to judge whether an event is in the event set: (readyops & selectionkey. Op_Connect)= 0, here is to judge whether the Channel is interested in Connect events.

  • Use the | operation to add an event to the event collection: interestops | readinterestops

  • To delete an event from the event set, first reverse the event to be deleted ~, and then do the & operation with the event set: OPS & = ~ selectionkey OP_ CONNECT

Netty's idea of making the best use of space is worth learning in our daily development~~

Now we know which channels are in io ready status and which types of IO events are ready.

Next, we will deal with different IO ready events on the Channel.

3.1.2.1 handling Connect events

The Netty client initiates a connection to the server and registers a Connect event with the client's Reactor. When the connection is established successfully, the client's NioSocketChannel will generate a Connect ready event. Through the operation framework of Reactor mentioned above, the final process will come here.

      if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //Trigger channelActive event
                unsafe.finishConnect();
     }

If the IO ready event is a Connect event, call the finishConnect method in the Unsafe operation class in the corresponding client NioSocketChannel to process the Connect event. At this time, the ChannelActive event will be propagated in the pipeline in the NioSocketChannel of the Netty client.

Finally, the op needs to be_ The Connect event is deleted from the event collection interestOps concerned by the client NioSocketChannel. Otherwise, the Selector will always notify Connect that the event is ready.

3.1.2.2 handling Write events

About the process of Reactor thread processing the Write event in Netty, the author will introduce it in a subsequent article. In this article, we focus on the overall running framework of Reactor thread.

      if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
      }

Here you just need to remember, Op_ The registration of write event is completed by the user. When the Socket sending buffer is full and cannot continue to write data, the user will register op with Reactor_ Write event. When the Socket send buffer becomes writable, Reactor will receive Op_ The write event is an active notification, and then the forceFlush method in the client NioSocketChannel is called here to send the remaining data.

3.1.2.3 handle Read event or Accept event

      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
     }

It can be seen here that both read events and Accept events in Netty are handled by the read method in the Unsafe operation class in the corresponding Channel.

The Read method in the NioServerSocketChannel of the server handles the Accept event, while the Read method in the NioSocketChannel of the client handles the Read event.

Here, you only need to remember the processing entry of each IO event in the corresponding Channel. We will analyze these entry functions in detail in subsequent articles.

3.1.3 remove invalid SelectionKey from Selector

            //It is used to clear invalid selectkeys from selectedKeys in time. For example, socketChannel is removed from the selector by the user
            private boolean needsToSelectAgain;

             //The purpose is to enter the for loop again and remove the invalid selectkey (the socketchannel may be removed from the selector by the user)
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }

When we introduced the Reactor running framework earlier, we saw that needsToSelectAgain will be set to false every time the Reactor thread ends polling and is ready to process IO ready events and asynchronous tasks.

So what exactly is this needsToSelectAgain for? And why do we need to go to "Select Again"?

First, let's see under what circumstances the needsToSelectAgain variable will be set to true. Through this setting process, can we find some clues?

We know that the Channel can register itself with the Selector, so of course it can also cancel removing itself from the Selector.

In the last article, we also spent a lot of time explaining the registration process. Now let's look at the deregistration of Channel.

public abstract class AbstractNioChannel extends AbstractChannel {

   //The SelectKey obtained after the channel is registered with the Selector
    volatile SelectionKey selectionKey;

    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }

    protected SelectionKey selectionKey() {
        assert selectionKey != null;
        return selectionKey;
    }
}

The process of Channel deregistration is very simple. Call the doDeregister method of NioChannel directly, and the Reactor bound to the Channel will cancel it from the Selector and stop listening to IO events on the Channel.

public final class NioEventLoop extends SingleThreadEventLoop {

    //If the number of removed socketchannels on the record Selector reaches 256, you need to clear the invalid selectkeys from the SelectedKeys collection
    private int cancelledKeys;

    private static final int CLEANUP_INTERVAL = 256;

    /**
     * Remove the socketChannel from the selector and cancel listening for IO events
     * */
    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        // When the number of socketchannels removed from the selector reaches 256, set needsToSelectAgain to true
        // In io netty. channel. nio. NioEventLoop. Poll again in processselectedkeysplay to remove the invalid selectKey,
        // To ensure the validity of selectKeySet
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }
}
  • Call the API cancel method of JDK NIO SelectionKey to cancel the Channel from the Selector. After the SelectionKey#cancel method is called, calling SelectionKey#isValid will return false. After the SelectionKey#cancel method is called, the Selector will add the SelectionKey to be cancelled to the canceledkeys collection in the Selector.
public abstract class AbstractSelector extends Selector {

    private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

    void cancel(SelectionKey k) {                      
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }
}
  • When the SelectionKey corresponding to the Channel is cancelled, the Channel cancellation counter canceledkeys will increase by 1. When canceledkeys = 256, set needsToSelectAgain to true.

  • Then, in the * * next polling process of the Selector, the SelectionKey in the canceledkeys set will be removed * * from all keysets in the Selector. The KeySet here includes the selectedKeys set used by the Selector to store ready selectionkeys and the keys set used to store selectionkeys corresponding to all registered channels.

public abstract class SelectorImpl extends AbstractSelector {

    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    
     .....................ellipsis...............
}

We can see that the judgment of needsToSelectAgain in the Reactor thread is made in the loop body of processselectedkeysplay method processing IO ready SelectionKey.

The reason why needsToSelectAgain is specifically mentioned here is to make everyone notice that at this time, Reactor is processing the IO ready event of this poll.

As mentioned earlier, when the SelectionKey#cancel method is called, the Selector will not remove these cancelled selectionkeys from all KeySet sets in the Selector until the next polling process. Of course, this also includes the ready set selectedKeys.

During this polling, if a large number of channels are cancelled from the Selector, the selectionkeys corresponding to these channels will still be saved in the ready set selectedKeys in the Selector until the next polling. Of course, it will affect the validity of the polling result selectedKeys.

Therefore, in order to ensure the validity of all keysets in the Selector, it is necessary to trigger selectNow once when the number of Channel cancellations reaches 256 in order to clear invalid selectionkeys.

    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }

So far, we have introduced the processing method of JDK native Selector, processSelectedKeysPlain method. In fact, the processing logic of IO ready events is the same. After we understand the processSelectedKeysPlain method, the processing of IO ready events by processSelectedKeysOptimized method is very easy for us to understand.

3.2 processSelectedKeysOptimized

Netty will use the optimized Selector to handle IO ready events by default. But the processing logic is similar. Let's mainly introduce the differences between the two methods.

    private void processSelectedKeysOptimized() {
        // When openSelector, the selectedKeys and publicSelectKeys field types in the selector implementation class in JDK will be used
        // Replace the original HashSet type with the SelectedSelectionKeySet type implemented by the array optimized by Netty
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // The remove selector in the corresponding iterator will not clear the selectedKey by itself
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {

                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }
  • The collection of IO ready selectionkeys stored in JDK NIO native Selector is selectedKeys of HashSet type. In order to optimize the traversal efficiency of the selectedKeys set, Netty adopts the type of SelectedSelectionKeySet implemented by itself, so that the traversal of the array is used instead of the iterator of the HashSet.

  • The Selector will insert the SelectionKey corresponding to the IO ready Channel into the selectedKeys set every time the IO ready event is polled, but the Selector only puts the IO ready SelectionKey into the selectedKeys set. When the SelectionKey is processed, the Selector will not actively remove it from the selectedKeys set. The typical tube killing does not matter. Therefore, Netty needs to delete the IO ready SelectionKey after traversing it.

    • In processSelectedKeysPlain, it is deleted directly from the iterator.
    • Set its corresponding position in the array to Null in processSelectedKeysOptimized to facilitate garbage collection.
  • When clearing the invalid SelectionKey at last, because the JDK NIO native Selector is used in processselectedkeysplane, you only need to execute SelectAgain, and the Selector will automatically clear the invalid Key.
    However, in processSelectedKeysOptimized, because it is an optimization type implemented by Netty itself, Netty needs to clear all the selectionkeys in the SelectedSelectionKeySet array, and finally execute SelectAgain.

Well, now that we've finished the whole process of how the Reactor thread handles the IO ready event, let's introduce how the Reactor thread handles the asynchronous tasks in the Netty framework.

4. The reactor thread handles asynchronous tasks

Netty has two ways to handle asynchronous tasks:

  • One is the runAllTasks() method with no timeout limit. When ioRatio is set to 100, the Reactor thread will process the IO ready event in a single brain, and then execute asynchronous tasks in a single brain. There is no time limit.

  • The other is the runAllTasks(long timeoutNanos) method with timeout limit. When ioratio= At 100, there will be a time limit for the Reactor thread to execute asynchronous tasks. The priority is to process the IO ready event and count the time-consuming ioTime of executing IO tasks. Calculate the timeout of the Reactor thread executing asynchronous tasks according to the formula ioTime * (100 - ioRatio) / ioRatio). Execute limited asynchronous tasks within the timeout limit.

Let's take a look at the processing logic of these two methods for executing asynchronous tasks:

4.1 runAllTasks()

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            //The scheduled tasks that arrive at the execution time are transferred to the taskQueue of the common task queue, and the Reactor thread takes them out from the taskQueue for execution
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        //Execute tail Queue task
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

The core logic of the Reactor thread executing asynchronous tasks is:

  • First, take the expired scheduled tasks out of the scheduled task queue and transfer them to the taskQueue of the general task queue.

  • The Reactor thread uniformly takes out tasks from the general task queue taskQueue for execution.

  • After the Reactor thread finishes executing scheduled tasks and normal tasks, it starts to execute the tail tasks stored in the tail task queue tailTasks.

Let's take a look at the implementation of the above core steps:

4.1.1 fetchFromScheduledTaskQueue

    /**
     * Take out the scheduled tasks that have reached the deadline execution time from the scheduled task queue
     * Transfer scheduled tasks to the taskQueue of the common task queue, and the Reactor thread will take them out from the taskQueue for execution
     *
     * */
    private boolean fetchFromScheduledTaskQueue() {
        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
            return true;
        }
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            //Take the scheduled task arriving at the execution deadline from the scheduled task queue. Deadline < = nanotime
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                return true;
            }
            if (!taskQueue.offer(scheduledTask)) {
                // If there is no space in the taskQueue, the scheduled task will be re stuffed into the scheduled task queue for next execution
                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
        }
    }
  1. Gets the nanoTime at which the asynchronous task is currently to be executed
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
    private static final long START_TIME = System.nanoTime();

    static long nanoTime() {
        return System.nanoTime() - START_TIME;
    }
}
  1. Find asynchronous tasks with deadline < = nanotime from the scheduled task queue. That is, find all scheduled tasks that are due.
    protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop();

        //Get the scheduled task to be executed from the scheduled queue deadline < = nanotime
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
            return null;
        }
        //Take out if the conditions are met
        scheduledTaskQueue.remove();
        scheduledTask.setConsumed();
        return scheduledTask;
    }
  1. Insert the expired scheduled tasks into the taskQueue. If there is no space for new tasks in the taskQueue, re insert the scheduled tasks into the scheduled task queue and wait for the next pull.
            if (!taskQueue.offer(scheduledTask)) {
                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
  1. When the return value of the fetchfromscheduledtasqueue method is true, it means that all the scheduled tasks that have expired have been pulled out and transferred to the normal task queue.
    When the return value is false, it means that only part of the expired scheduled tasks are pulled out, because the ordinary task queue is full at this time. When the ordinary task is executed, it needs to be pulled again.

When the expired scheduled tasks are pulled from the scheduled task queue or when the normal task queue is full, the pull will stop and the asynchronous tasks in the normal task queue will be executed.

4.1.2 runAllTasksFrom

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {
            return false;
        }
        for (;;) {
            safeExecute(task);
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                return true;
            }
        }
    }
  • First, the return value of the runAllTasksFrom method indicates whether at least one asynchronous task has been executed. It will be assigned to the ranAtLeastOne variable later, and this return value will be used later.

  • Pull asynchronous tasks from the normal task queue.

    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) {
                return task;
            }
        }
    }
  • The Reactor thread performs asynchronous tasks.
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

4.1.3 afterRunningAllTasks

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        //Execute tail Queue task
        afterRunningAllTasks();
        return ranAtLeastOne;

If the Reactor thread executes at least one asynchronous task, set lastExecutionTime and return the ranAtLeastOne ID. The ranAtLeastOne ID here is the return value of the runAllTasksFrom method.

Finally, the closing task is executed, that is, the tail task in the tail task queue is executed.

    @Override
    protected void afterRunningAllTasks() {
        runAllTasksFrom(tailTasks);
    }

4.2 runAllTasks(long timeoutNanos)

The core logic for handling asynchronous tasks here is the same as before, except that there is more control over timeout.

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            //When there are no tasks in the normal queue, execute the tasks in the queue at the end of the queue
            afterRunningAllTasks();
            return false;
        }

        //Asynchronous task execution timeout deadline
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);
            runTasks ++;
            //Check whether the execution deadline is reached every 64 asynchronous tasks
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    //When the asynchronous task execution timeout deadline is reached, stop executing the asynchronous task
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
  • First, pull the expired scheduled tasks from the scheduled task queue in Reactor through the fetchfromscheduledtask queue method and transfer them to the normal task queue. When the normal task queue is full or all scheduled tasks are pulled at the expiration, the pulling is stopped.

  • Scheduledfuturetask Nanotime() + timeoutnanos is the deadline of the timeout point when the Reactor thread executes asynchronous tasks.

  • Because the system calls system Nanotime () requires a certain amount of system overhead, so it will check whether the execution time reaches the deadline every time 64 asynchronous tasks are executed. If the deadline for execution is reached, exit and stop executing asynchronous tasks. If the deadline is not reached, continue to take out the task from the normal task queue and execute the task cycle.

From this detail, we can see that Netty is very particular about performance

When the process comes to this point, we will analyze the whole operation framework of Reactor and the specific implementation logic of how to poll IO ready events, how to handle IO ready events and how to execute asynchronous tasks.

There is also a small tail below, which is how Netty solves the empty polling BUG of JDK NIO Epoll mentioned at the beginning of the article. Let's take a look~~~

5. Solve JDK Epoll null polling BUG

As mentioned earlier, due to the empty polling BUG of JDK NIO Epoll, the Reactor thread will be awakened unexpectedly without anything to do, resulting in CPU idling.

In fact, Netty did not fundamentally solve the JDK BUG, but chose to skillfully bypass it.

Let's take a look at how Netty does it.

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    //There are neither IO ready events nor asynchronous tasks. The Reactor thread is abnormally awakened from the Selector, triggering the JDK Epoll empty rotation BUG
                    //Rebuild the selector and return selectcnt to zero
                    selectCnt = 0;
                }

After the Reactor thread has processed the IO ready event and asynchronous task, it will check whether the Reactor thread has executed the asynchronous task and whether there is an IO ready Channel.

  • boolean ranTasks comes in handy at this time. This ranTasks is the return value we mentioned earlier when talking about the runAllTasks method. Used to indicate whether at least one asynchronous task has been executed.

  • int strategy is the return value of the select method of JDK NIO Selector, which is used to indicate the number of IO ready channels.

If ranTasks = false and strategy = 0, it means that the Reactor thread has no asynchronous task execution and no IO ready Channel to process this time, but it is unexpectedly awakened. It's like idling around without doing anything.

In this case, Netty will think that the empty polling BUG of JDK NIO Epoll may have been triggered

    int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);

    private boolean unexpectedSelectorWakeup(int selectCnt) {
          ..................ellipsis...............

        /**
         * The condition here is that there are neither IO ready events nor asynchronous tasks, and the Reactor thread is abnormally awakened from the Selector
         * In this case, the empty polling BUG of JDK Epoll may have been triggered. If this situation lasts 512 times, it is considered that the BUG may have been triggered, so the Selector is rebuilt
         *
         * */
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            return true;
        }
        return false;
    }
  • If the number of times the Reactor wakes up unexpectedly exceeds the configured number of times, select CNT_ AUTO_ REBUILD_ Then, Netty will assume that this situation may have triggered the JDK NIO Epoll null polling BUG, rebuild the Selector (re register all previously registered channels to the new Selector and close the old Selector), and set the selectCnt count to 0.

SELECTOR_ AUTO_ REBUILD_ The default value of threshold is 512. You can use the system variable - D io netty. Selectorautorebuildthreshold specifies a custom value.

  • If selectCnt is less than SELECTOR_AUTO_REBUILD_THRESHOLD, no processing will be done and selectCnt will continue counting.

In this way, Netty counts the number of times that Reactor is accidentally awakened. If the count of selectCnt reaches 512, it skilfully bypasses the JDK NIO Epoll empty polling BUG by rebuilding the Selector.

We can also learn from Netty's idea of dealing with problems in daily development. For example, in project development, when we find that we can't guarantee to completely solve a problem, or our input-output ratio is not high in order to solve this problem, we should consider whether we should change a way to bypass this problem, so as to achieve the same effect* The highest level of solving a problem is not to solve it, skillfully around the past ~ ~ ~*

summary

This paper spent a lot of space on the overall operation framework of Reactor, and deeply introduced the specific implementation logic of the core work module of Reactor.

Through the introduction of this article, we know how Reactor polls IO events of interest on all channels registered on it, how Reactor handles IO ready events, and how to execute asynchronous and scheduled tasks submitted in Netty framework.

Finally, it introduces how Netty skilfully bypasses the BUG of JDK NIO Epoll empty polling to solve the problem.

Refined a new way to solve the problem: the highest level of solving the problem is not to solve it, skillfully around the past ~ ~ ~!!

Well, that's all for this article. See you in the next article~~~~~

Keywords: Java Netty Middleware NIO reactor

Added by aiikcmo on Sat, 29 Jan 2022 08:21:29 +0200