NioEventLoop source code analysis

In previous installments, we analyzed the service startup, client connection and client message processing processes of Netty. In this installment, we will explain an extremely important component of Netty, event loop.

NioEventLoopGroup analysis

When we usually use Netty, we usually operate with NioEventLoopGroup, so we start with NioEventLoopGroup. First, let's take a look at the inheritance system of NioEventLoopGroup

As can be seen from the above figure, NioEventLoopGroup is a thread pool, so it has an execute() method and implements the ScheduledExecutorService interface, indicating that it can also perform some scheduling tasks. However, looking at the source code of itself and its parent class, we can find that it does not hold the real thread resources, but is held by NioEventLoop. Because NioEventLoopGroup manages multiple nioeventloops internally, I think it would be better to call NioEventLoopGroup a thread group. Let's start with the structure of NioEventLoopGroup.

#NioEventLoopGroup

	//Nonparametric structure
	public NioEventLoopGroup() {
        this(0);
    }
	
	//The number of nioeventloops created, that is, the number of threads
	public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
	
	public NioEventLoopGroup(int nThreads, Executor executor) {
        //Passed a null executor and a selector provider
        this(nThreads, executor, SelectorProvider.provider());
    }

	public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        //An extra selection strategy is transmitted
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

	public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        //A rejection policy (the rejection policy of the thread pool) is passed, and the construction of the parent class is called
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

#MultithreadEventLoopGroup

	protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //Set the number of threads to the number of cpu cores * 2, and pass the previous parameters to the parent class constructor
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

#MultithreadEventExecutorGroup

	protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        //An extra selector factory was transmitted
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
	
	//Here, the construction is called. Summarize the parameters passed
	//1. Number of nthreads threads
	//2. executor (currently null)
	//3. chooserFactory selector factory
	//4. args wraps the previous parameters into an object array
	//   args[0] selectorProvider selector provider
    //	 args[1] selectStrategyFactory select policy factory
	//	 args[2] RejectedExecutionHandlers.reject() reject policy
	protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        
		//It is null from the point of view of constructing the link
        if (executor == null) {
            //I'll talk about this next
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        //Create an array that manages EventLoop
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //Create NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                //As long as one creation fails, the others must be closed
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
		
        //Selector factory create selector
        //The function of selector is to select a NioEventLoop from children through different algorithms
        //See #DefaultEventExecutorChooserFactory for specific algorithms. Two algorithms are implemented by default
        chooser = chooserFactory.newChooser(children);

        //Terminate listener
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        //Add listening for each NioEventLoop
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

From the above structure, we select the following two lines as the focus

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//......
children[i] = newChild(executor, args);

Let's start with newDefaultThreadFactory()

	protected ThreadFactory newDefaultThreadFactory() {
        //A default thread factory was returned
        return new DefaultThreadFactory(getClass());
    }

See #threadpertaskeexecutor again

public final class ThreadPerTaskExecutor implements Executor {
    //Holds a reference to the default thread factory
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        //Each time a task is executed, a thread is created
        threadFactory.newThread(command).start();
    }
}

This newThread() is in #DefaultThreadFactory

	@Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
        }
        return t;
    }

	protected Thread newThread(Runnable r, String name) {
        //The thread type created is FastThreadLocalThread
        return new FastThreadLocalThread(threadGroup, r, name);
    }

Here we know that the executor stores a thread factory, and a thread will be created when a task is submitted.

Let's look at * * children[i] = newChild(executor, args)** The just assigned executor and args are passed in.

#NioEventLoopGroup

	@Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        //For the construction link we analyzed, args has only three parameters, so the queueFactory is also null
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        //Pass the parameters into the NioEventLoop construct and create the object
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

Here, the process of NioEventLoopGroup has been completed. We know that NioEventLoopGroup saves an array of NioEventLoop and a selector (used to select NioEventLoop). Next, it's time to analyze NioEventLoop.

NioEventLoop analysis

Let's first look at the inheritance system of NioEventLoop

It can be found that NioEventLoop is a single threaded thread pool. Let's start with the construction.

	//parent: nioEventLoopGroup
    //executor: threadpertaskeexecutor holds the default thread factory internally
	//selectorProvider: selector provider
	//Strategy: select strategy
	//rejectedExecutionHandler: reject policy
	//queueFactory: null
	NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        //Continue to pass to the parent class construction, and newTaskQueue() creates a queue
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
		
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");

        //The following three lines of code create a seletor instance, that is, each NioEventLoop holds a seletor instance.
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

#SingleThreadEventLoop

	protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
          boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
          RejectedExecutionHandler rejectedExecutionHandler) {
        //taskQueue: task queue
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        //Not the point
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

#SingleThreadEventExecutor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                        RejectedExecutionHandler rejectedHandler) {
    	//Finally, to #AbstractEventExecutor, only the parent field is assigned to indicate which Group the current NioEventLoop belongs to
        super(parent);
    	//Assignment operation
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    	//The threadpertask executor created in the MultithreadEventExecutorGroup structure will be used below. Don't forget!
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

From here, we can know that NioEventLoop holds a Selector, a task queue, and a Thread pool of threadpertaskeexecutor type. The parent class SingleThreadEventExecutor of NioEventLoop also holds a reference to Thread, but so far, we haven't seen how threads are created. Remember the tasks NioEventLoop performed in previous installments? They all submit tasks through NioEventLoop's execute(), so now let's take a look at the execute() method.

The execute() method of NioEventLoop is implemented in its parent class SingleThreadEventExecutor

#SingleThreadEventExecutor

	@Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
	
	private void execute(Runnable task, boolean immediate) {
        //be careful!!!
        //In the first phase of the Netty server startup process, the main thread calls this method, so inEventLoop is false
        boolean inEventLoop = inEventLoop();
        //Add a task to the task queue
        addTask(task);
        //The main thread calls, so enter
        if (!inEventLoop) {
            //Core method
            startThread();
            //The closed state enters the reject logic
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

	private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

	//Core method
	private void doStartThread() {
        assert thread == null;
        //executor: ThreadPerTaskExecutor
        //A default thread factory is held internally, and a thread will be created by calling execute()
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //Assign the current thread to thread
                //At this time, NioEventLoop holds threads internally
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                updateLastExecutionTime();
                try {
                    //Here comes the point!!!
                    //The thread called run() of SingleThreadEventExecutor
                    //The current implementation class is NioEventLoop, so it will call run() of NioEventLoop
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    //......
                }
            }
        });
    }

It's finally clear that after a big turn, we finally called the run() method of NioEventLoop. As mentioned in previous issues, the run() of NioEventLoop is the core, which will handle various tasks (io events, tasks submitted by users). So let's just look at run().

run() of NioEventLoop

There are many source codes here, and we also focus on it.

	@Override
    protected void run() {
        //A feature count variable of epoll bug. It will be explained later
        int selectCnt = 0;
        for (;;) {
            try {
                // 1. > = 0 indicates the return value of the selector and the number of ready registers on the multiplexer.
                // 2. < 0 constant status: continue busy_ WAIT  SELECT
                int strategy;
                try {
                    //selectStrategy: DefaultSelectStrategy object.
                    // Decide how to handle it according to whether there is a local task in the current NioEventLoop.
                    // 1. If there is a task, call the selectNow() method of the multiplexer to return the number of ready channel s on the multiplexer.
                    // 2. If there is no task, return - 1. The following logic will be performed according to the constant.
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        //-2
                        case SelectStrategy.CONTINUE:
                            continue;
						//-3
                        case SelectStrategy.BUSY_WAIT:

                        //-1
                        case SelectStrategy.SELECT:
                            
                            //Because NioEventLoop implements ScheduledExecutorService, it can also perform scheduling tasks
                            //Get the execution deadline of schedulable tasks. If there is no task, return - 1
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();

                            if (curDeadlineNanos == -1L) {
                                //There are no tasks that need to be executed periodically in EventLoop.
                                //Set to the maximum value of long.
                                curDeadlineNanos = NONE; // nothing on the calendar
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                //Determine whether there are tasks to be performed
                                if (!hasTasks()) {
                                    //There are no local common tasks to perform.
                                    // 1.curDeadlineNanos long is the maximum value, indicating that there are no periodic tasks.
                                    // 2.curDeadlineNanos indicates the deadline for periodic tasks.
                                    // The final strategy indicates the number of ch events ready.
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                } catch (IOException e) {
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                
                //epoll bug count variable++
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;

                //The percentage of time that threads spend processing IO events. The default value is 50%.
                final int ioRatio = this.ioRatio;
                //Indicates whether the current round of thread has processed local tasks.
                boolean ranTasks;

                //The proportion is 100, indicating that IO takes priority. After IO processing, local tasks can be processed.
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            //Currently, there are ready ch events on the selector of NioEventLoop.
                            //Method entry to handle IO events. Friends who have seen the previous two issues should have an impact
                            processSelectedKeys();
                        }
                    } finally {
                        //After the IO event is executed, execute the tasks in the local task queue.
                        ranTasks = runAllTasks();
                    }

                }
                //The proportion is not 100. Currently, there are ready events on the selector in NioEventLoop.
                else if (strategy > 0) {
                    // Start time of IO event processing.
                    final long ioStartTime = System.nanoTime();
                    try {
                        //Handling IO events
                        processSelectedKeys();
                    } finally {
                        //Total IO event processing time.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //ioTime*(100 - ioRatio)/ioRatio calculates the maximum time for executing a local queue task according to the IO time.
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
                //Currently, there are no ready events on the selector in NioEventLoop. Only local tasks can be processed.
                else {
                    // Perform a minimum number of local tasks.. a maximum of 64 tasks.
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    //......
                    //The normal NioEventLoop thread works after waking up from the selector multiplexer because of io events.
                    //Will set selectCnt to 0.
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                //......
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

From the perspective of the whole method, the threads in NioEventLoop have been spinning in for(;;) to process IO events and tasks in the task queue respectively. For handling IO events

The processSelectedKeys() method was analyzed at the beginning of the chapter dealing with the client connection process. If you haven't seen or forgotten it, you can take another look. Here, we mainly analyze the two methods of processing task queues, runAllTasks() and runAllTasks(long timeoutNanos) with running time.

#SingleThreadEventExecutor

	protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            //fetchedAll: whether to take out all the scheduling tasks. If not, the next cycle will continue
            fetchedAll = fetchFromScheduledTaskQueue();
            //Execute tasks in a normal task queue
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        //At least one task was processed
        if (ranAtLeastOne) {
            //Set the last execution time
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        //The method of executing tailTasks queue is relatively rare
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

	private boolean fetchFromScheduledTaskQueue() {
        //The queue is empty and returns directly
        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
            return true;
        }
        //Get current time
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        //Take out the task from the scheduling queue circularly and put it into the normal task queue
        for (;;) {
            //pollScheduledTask(): take a scheduled task whose deadline has expired from the scheduled task queue, and remove the task from the scheduling queue
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                return true;
            }
            //Add a scheduled task to a normal task queue
            if (!taskQueue.offer(scheduledTask)) {
                //Adding failed, indicating that the normal task queue is full. You want to put this task back into the scheduling task queue
                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
        }
    }

	protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        //Take a task from the normal task queue
        Runnable task = pollTaskFrom(taskQueue);
        //The first task is null, indicating that the queue is empty
        if (task == null) {
            //The idea of rapid failure can be seen in the queue entry of AQS nodes
            return false;
        }
        //Spin to process tasks in the task queue
        for (;;) {
            //Perform every task safely
            safeExecute(task);
            //Take out the next task
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                //The queue is empty. All tasks have been processed
                return true;
            }
        }
    }

The above is the analysis of the parameterless runAllTasks() method, which is to remove the tasks in the scheduling queue and add them to the ordinary task queue, and then cycle and process all the tasks in the ordinary task queue. Next, let's look at run all tasks (long timeout nanos) with runtime.

#SingleThreadEventExecutor

	//timeoutNanos: execute this nanosecond at most
	protected boolean runAllTasks(long timeoutNanos) {
        //Remove the task from the scheduling queue and add it to the normal task queue
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            //It is rare to fail quickly and execute the tailTasks queue
            afterRunningAllTasks();
            return false;
        }
		//Calculate the deadline for processing tasks
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        //Number of tasks processed
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            //Perform tasks safely
            safeExecute(task);
            //Number of processing tasks++
            runTasks ++;
            
            //0X3F: 111111 decimal 63
            //What number & 63 is 0?
            // 1000000 & 111111  64
            //10000000 & 111111  128
            //11000000 & 111111  192 ...
            //The multiple of 64 & 63 is 0, which means that this condition will be true after 64 tasks are executed
            if ((runTasks & 0x3F) == 0) {
                //Update last execution time
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                //If the last execution time exceeds the deadline for executing this method, there are two cases
                //1. If timeoutnanos is not 0, it will be executed until the deadline
                //2. If timeoutnanos is 0, only 64 tasks will be executed at most
                if (lastExecutionTime >= deadline) {
                    //Exit loop
                    break;
                }
            }
			
            //Next task
            task = pollTask();
            if (task == null) {
                //Update time if null
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
		//The method of executing tailTasks queue is relatively rare
        afterRunningAllTasks();
        //Assign the last execution time to the member property
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

From this method, we can see that there are two branches: whether timeoutNanos is 0 or not. These two branches can also be found in the run() method of NioEventLoop. In the runAllTasks(long timeoutNanos) method, a judgment will be made every 64 tasks to see whether the execution of ordinary tasks timed out. In addition to this judgment, the processing of tasks is basically the same as that of runAllTasks().

The above is the logic of NioEventLoop performing ordinary tasks. It is not very difficult. Friends who have not sorted out can sort out the process again.

In the run() of NioEventLoop, we left a problem to be explained, that is, the selectCnt variable is used to deal with epoll bug s. This problem is explained below.

Netty solves epoll bug

Let's look at the run() method again. This time, we only annotate the code related to epoll bug

@Override
    protected void run() {
        // epoll bug count variable.
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;

                        case SelectStrategy.BUSY_WAIT:

                        case SelectStrategy.SELECT:
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                curDeadlineNanos = NONE;
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) {
                                    //Normally, when there is no task or IO event, the thread will block here
                                    //It will not wake up until there is a ready channel or the deadline for scheduling tasks
                                    //If the epoll bug is triggered, the selector.select() will also wake up when no channel is ready
                                    //As a result, the thread has been empty polling in for(;;), and the cpu utilization rate is 100%
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                } catch (IOException e) {
                    //The IO exception also reconstructs the selector, which will be described below
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                
                //epoll bug count variable
                //Normally, the thread is not blocked in the select() above, indicating that there are tasks to be executed
                //However, if there is no task execution, the thread will always poll empty and then select CNT++
                //Therefore, after triggering the bug, selectCnt will become very large in a short time
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                
                //Whether the task has been executed. true if it has been executed, false if it has not been executed
                //This is to judge the situation where there are ordinary tasks to be performed
                //Because the execution time of ordinary tasks may be very short, which will cause the thread to spin all the time, selectCnt will also accumulate greatly
                boolean ranTasks;
                
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        //ranTasks assignment
                        ranTasks = runAllTasks();
                    }
                }
                else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //ranTasks assignment
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }


                }
                else {
                    //ranTasks assignment
                    ranTasks = runAllTasks(0);
                }

                //According to the judgment of ranTasks, the general task conditions must be true
                if (ranTasks || strategy > 0) {
                    //MIN_PREMATURE_SELECTOR_RETURNS=3
                    //It may be that the deadline of the scheduling task is not far away. selector.select() wakes up several times in a short time
                    //Just print the log
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    //Here, it is explained that IO events or common tasks are executed, and it is judged that the selector does not have epoll bug,
                    //Set selectCnt to 0
                    selectCnt = 0;
                    
                //If you enter the if branch below
                //ranTasks is false and no ordinary tasks have been executed
                //Strategy < = 0 indicates that there is no ready channel on the selector, but it is not blocked by selector.select()
                //Go in and see unexpected selector wakeup (select CNT)
                } else if (unexpectedSelectorWakeup(selectCnt)) {
                    //Reassign attribute
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

	private boolean unexpectedSelectorWakeup(int selectCnt) {
        //The thread was interrupted, not the point
        if (Thread.interrupted()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
            }
            return true;
        }
        //SELECTOR_AUTO_REBUILD_THRESHOLD: the default value is 512
 		//If selectcnt > = 512, Netty thinks the epoll bug is triggered
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            //Rebuild Selector
            rebuildSelector();
            return true;
        }
        return false;
    }

	public void rebuildSelector() {
        //The normal flow occurs in the EventLoop thread
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector0();
                }
            });
            return;
        }
        //Follow this logic and rebuild the selector
        rebuildSelector0();
    }

	private void rebuildSelector0() {
        //Get the selector that has triggered the bug
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
		
        if (oldSelector == null) {
            return;
        }

        try {
            //Recreate the selector
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        
        int nChannels = 0;
        //Traverse the old selector and re register the channel with the new selector
        for (SelectionKey key: oldSelector.keys()) {
            //channel
            Object a = key.attachment();
            try {
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }
				
                //Get the event of interest of the old selector
                int interestOps = key.interestOps();
                //Destroy
                key.cancel();
                
                //Register the channel to the new selector
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }
		
        //Reassign the field of NioEventLoop
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            //Close the old selector
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }

Here's how Netty solves the epoll bug. To sum up, in the run() method, judge whether the current selector has triggered a bug through two variables, loop count and whether it has processed ordinary tasks. If it is triggered, rebuild it, and re register the channel registered on the old selector with the new selector. In this way, the reconstruction of the selector is completed, and then you can continue to process IO events or ordinary tasks.

The above is the source code analysis of NioEventLoop. After reading it, we have a deeper understanding of EventLoopGroup and EventLoop. The EventLoopGroup does not save thread instances, but a selector (used to select a single EventLoop) and an EventLoop array. The EventLoop stores thread instances, selectors, common task queues and scheduling task queues. After calling the execute() method, the thread will be started and called back to the run() method of EventLoop to process IO events and common tasks. If you are still a friend who doesn't know the process here, you'd better open the source code and sort it out several times according to the process of this article. I believe you can master this knowledge soon.

Keywords: Java Netty

Added by Dragoa on Wed, 01 Dec 2021 19:49:00 +0200