In previous installments, we analyzed the service startup, client connection and client message processing processes of Netty. In this installment, we will explain an extremely important component of Netty, event loop.
NioEventLoopGroup analysis
When we usually use Netty, we usually operate with NioEventLoopGroup, so we start with NioEventLoopGroup. First, let's take a look at the inheritance system of NioEventLoopGroup
As can be seen from the above figure, NioEventLoopGroup is a thread pool, so it has an execute() method and implements the ScheduledExecutorService interface, indicating that it can also perform some scheduling tasks. However, looking at the source code of itself and its parent class, we can find that it does not hold the real thread resources, but is held by NioEventLoop. Because NioEventLoopGroup manages multiple nioeventloops internally, I think it would be better to call NioEventLoopGroup a thread group. Let's start with the structure of NioEventLoopGroup.
#NioEventLoopGroup
//Nonparametric structure public NioEventLoopGroup() { this(0); } //The number of nioeventloops created, that is, the number of threads public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup(int nThreads, Executor executor) { //Passed a null executor and a selector provider this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { //An extra selection strategy is transmitted this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { //A rejection policy (the rejection policy of the thread pool) is passed, and the construction of the parent class is called super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
#MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { //Set the number of threads to the number of cpu cores * 2, and pass the previous parameters to the parent class constructor super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
#MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { //An extra selector factory was transmitted this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } //Here, the construction is called. Summarize the parameters passed //1. Number of nthreads threads //2. executor (currently null) //3. chooserFactory selector factory //4. args wraps the previous parameters into an object array // args[0] selectorProvider selector provider // args[1] selectStrategyFactory select policy factory // args[2] RejectedExecutionHandlers.reject() reject policy protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //It is null from the point of view of constructing the link if (executor == null) { //I'll talk about this next executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //Create an array that manages EventLoop children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //Create NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { //As long as one creation fails, the others must be closed if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } //Selector factory create selector //The function of selector is to select a NioEventLoop from children through different algorithms //See #DefaultEventExecutorChooserFactory for specific algorithms. Two algorithms are implemented by default chooser = chooserFactory.newChooser(children); //Terminate listener final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //Add listening for each NioEventLoop for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
From the above structure, we select the following two lines as the focus
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); //...... children[i] = newChild(executor, args);
Let's start with newDefaultThreadFactory()
protected ThreadFactory newDefaultThreadFactory() { //A default thread factory was returned return new DefaultThreadFactory(getClass()); }
See #threadpertaskeexecutor again
public final class ThreadPerTaskExecutor implements Executor { //Holds a reference to the default thread factory private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } @Override public void execute(Runnable command) { //Each time a task is executed, a thread is created threadFactory.newThread(command).start(); } }
This newThread() is in #DefaultThreadFactory
@Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { } return t; } protected Thread newThread(Runnable r, String name) { //The thread type created is FastThreadLocalThread return new FastThreadLocalThread(threadGroup, r, name); }
Here we know that the executor stores a thread factory, and a thread will be created when a task is submitted.
Let's look at * * children[i] = newChild(executor, args)** The just assigned executor and args are passed in.
#NioEventLoopGroup
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { //For the construction link we analyzed, args has only three parameters, so the queueFactory is also null EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; //Pass the parameters into the NioEventLoop construct and create the object return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
Here, the process of NioEventLoopGroup has been completed. We know that NioEventLoopGroup saves an array of NioEventLoop and a selector (used to select NioEventLoop). Next, it's time to analyze NioEventLoop.
NioEventLoop analysis
Let's first look at the inheritance system of NioEventLoop
It can be found that NioEventLoop is a single threaded thread pool. Let's start with the construction.
//parent: nioEventLoopGroup //executor: threadpertaskeexecutor holds the default thread factory internally //selectorProvider: selector provider //Strategy: select strategy //rejectedExecutionHandler: reject policy //queueFactory: null NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { //Continue to pass to the parent class construction, and newTaskQueue() creates a queue super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //The following three lines of code create a seletor instance, that is, each NioEventLoop holds a seletor instance. final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) { //taskQueue: task queue super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); //Not the point tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); }
#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { //Finally, to #AbstractEventExecutor, only the parent field is assigned to indicate which Group the current NioEventLoop belongs to super(parent); //Assignment operation this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; //The threadpertask executor created in the MultithreadEventExecutorGroup structure will be used below. Don't forget! this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
From here, we can know that NioEventLoop holds a Selector, a task queue, and a Thread pool of threadpertaskeexecutor type. The parent class SingleThreadEventExecutor of NioEventLoop also holds a reference to Thread, but so far, we haven't seen how threads are created. Remember the tasks NioEventLoop performed in previous installments? They all submit tasks through NioEventLoop's execute(), so now let's take a look at the execute() method.
The execute() method of NioEventLoop is implemented in its parent class SingleThreadEventExecutor
#SingleThreadEventExecutor
@Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { //be careful!!! //In the first phase of the Netty server startup process, the main thread calls this method, so inEventLoop is false boolean inEventLoop = inEventLoop(); //Add a task to the task queue addTask(task); //The main thread calls, so enter if (!inEventLoop) { //Core method startThread(); //The closed state enters the reject logic if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } } //Core method private void doStartThread() { assert thread == null; //executor: ThreadPerTaskExecutor //A default thread factory is held internally, and a thread will be created by calling execute() executor.execute(new Runnable() { @Override public void run() { //Assign the current thread to thread //At this time, NioEventLoop holds threads internally thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //Here comes the point!!! //The thread called run() of SingleThreadEventExecutor //The current implementation class is NioEventLoop, so it will call run() of NioEventLoop SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //...... } } }); }
It's finally clear that after a big turn, we finally called the run() method of NioEventLoop. As mentioned in previous issues, the run() of NioEventLoop is the core, which will handle various tasks (io events, tasks submitted by users). So let's just look at run().
run() of NioEventLoop
There are many source codes here, and we also focus on it.
@Override protected void run() { //A feature count variable of epoll bug. It will be explained later int selectCnt = 0; for (;;) { try { // 1. > = 0 indicates the return value of the selector and the number of ready registers on the multiplexer. // 2. < 0 constant status: continue busy_ WAIT SELECT int strategy; try { //selectStrategy: DefaultSelectStrategy object. // Decide how to handle it according to whether there is a local task in the current NioEventLoop. // 1. If there is a task, call the selectNow() method of the multiplexer to return the number of ready channel s on the multiplexer. // 2. If there is no task, return - 1. The following logic will be performed according to the constant. strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { //-2 case SelectStrategy.CONTINUE: continue; //-3 case SelectStrategy.BUSY_WAIT: //-1 case SelectStrategy.SELECT: //Because NioEventLoop implements ScheduledExecutorService, it can also perform scheduling tasks //Get the execution deadline of schedulable tasks. If there is no task, return - 1 long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { //There are no tasks that need to be executed periodically in EventLoop. //Set to the maximum value of long. curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { //Determine whether there are tasks to be performed if (!hasTasks()) { //There are no local common tasks to perform. // 1.curDeadlineNanos long is the maximum value, indicating that there are no periodic tasks. // 2.curDeadlineNanos indicates the deadline for periodic tasks. // The final strategy indicates the number of ch events ready. strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } //epoll bug count variable++ selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; //The percentage of time that threads spend processing IO events. The default value is 50%. final int ioRatio = this.ioRatio; //Indicates whether the current round of thread has processed local tasks. boolean ranTasks; //The proportion is 100, indicating that IO takes priority. After IO processing, local tasks can be processed. if (ioRatio == 100) { try { if (strategy > 0) { //Currently, there are ready ch events on the selector of NioEventLoop. //Method entry to handle IO events. Friends who have seen the previous two issues should have an impact processSelectedKeys(); } } finally { //After the IO event is executed, execute the tasks in the local task queue. ranTasks = runAllTasks(); } } //The proportion is not 100. Currently, there are ready events on the selector in NioEventLoop. else if (strategy > 0) { // Start time of IO event processing. final long ioStartTime = System.nanoTime(); try { //Handling IO events processSelectedKeys(); } finally { //Total IO event processing time. final long ioTime = System.nanoTime() - ioStartTime; //ioTime*(100 - ioRatio)/ioRatio calculates the maximum time for executing a local queue task according to the IO time. ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } //Currently, there are no ready events on the selector in NioEventLoop. Only local tasks can be processed. else { // Perform a minimum number of local tasks.. a maximum of 64 tasks. ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { //...... //The normal NioEventLoop thread works after waking up from the selector multiplexer because of io events. //Will set selectCnt to 0. selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { selectCnt = 0; } } catch (CancelledKeyException e) { //...... } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
From the perspective of the whole method, the threads in NioEventLoop have been spinning in for(;;) to process IO events and tasks in the task queue respectively. For handling IO events
The processSelectedKeys() method was analyzed at the beginning of the chapter dealing with the client connection process. If you haven't seen or forgotten it, you can take another look. Here, we mainly analyze the two methods of processing task queues, runAllTasks() and runAllTasks(long timeoutNanos) with running time.
#SingleThreadEventExecutor
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { //fetchedAll: whether to take out all the scheduling tasks. If not, the next cycle will continue fetchedAll = fetchFromScheduledTaskQueue(); //Execute tasks in a normal task queue if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. //At least one task was processed if (ranAtLeastOne) { //Set the last execution time lastExecutionTime = ScheduledFutureTask.nanoTime(); } //The method of executing tailTasks queue is relatively rare afterRunningAllTasks(); return ranAtLeastOne; } private boolean fetchFromScheduledTaskQueue() { //The queue is empty and returns directly if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } //Get current time long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //Take out the task from the scheduling queue circularly and put it into the normal task queue for (;;) { //pollScheduledTask(): take a scheduled task whose deadline has expired from the scheduled task queue, and remove the task from the scheduling queue Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } //Add a scheduled task to a normal task queue if (!taskQueue.offer(scheduledTask)) { //Adding failed, indicating that the normal task queue is full. You want to put this task back into the scheduling task queue scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; } } } protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { //Take a task from the normal task queue Runnable task = pollTaskFrom(taskQueue); //The first task is null, indicating that the queue is empty if (task == null) { //The idea of rapid failure can be seen in the queue entry of AQS nodes return false; } //Spin to process tasks in the task queue for (;;) { //Perform every task safely safeExecute(task); //Take out the next task task = pollTaskFrom(taskQueue); if (task == null) { //The queue is empty. All tasks have been processed return true; } } }
The above is the analysis of the parameterless runAllTasks() method, which is to remove the tasks in the scheduling queue and add them to the ordinary task queue, and then cycle and process all the tasks in the ordinary task queue. Next, let's look at run all tasks (long timeout nanos) with runtime.
#SingleThreadEventExecutor
//timeoutNanos: execute this nanosecond at most protected boolean runAllTasks(long timeoutNanos) { //Remove the task from the scheduling queue and add it to the normal task queue fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { //It is rare to fail quickly and execute the tailTasks queue afterRunningAllTasks(); return false; } //Calculate the deadline for processing tasks final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; //Number of tasks processed long runTasks = 0; long lastExecutionTime; for (;;) { //Perform tasks safely safeExecute(task); //Number of processing tasks++ runTasks ++; //0X3F: 111111 decimal 63 //What number & 63 is 0? // 1000000 & 111111 64 //10000000 & 111111 128 //11000000 & 111111 192 ... //The multiple of 64 & 63 is 0, which means that this condition will be true after 64 tasks are executed if ((runTasks & 0x3F) == 0) { //Update last execution time lastExecutionTime = ScheduledFutureTask.nanoTime(); //If the last execution time exceeds the deadline for executing this method, there are two cases //1. If timeoutnanos is not 0, it will be executed until the deadline //2. If timeoutnanos is 0, only 64 tasks will be executed at most if (lastExecutionTime >= deadline) { //Exit loop break; } } //Next task task = pollTask(); if (task == null) { //Update time if null lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //The method of executing tailTasks queue is relatively rare afterRunningAllTasks(); //Assign the last execution time to the member property this.lastExecutionTime = lastExecutionTime; return true; }
From this method, we can see that there are two branches: whether timeoutNanos is 0 or not. These two branches can also be found in the run() method of NioEventLoop. In the runAllTasks(long timeoutNanos) method, a judgment will be made every 64 tasks to see whether the execution of ordinary tasks timed out. In addition to this judgment, the processing of tasks is basically the same as that of runAllTasks().
The above is the logic of NioEventLoop performing ordinary tasks. It is not very difficult. Friends who have not sorted out can sort out the process again.
In the run() of NioEventLoop, we left a problem to be explained, that is, the selectCnt variable is used to deal with epoll bug s. This problem is explained below.
Netty solves epoll bug
Let's look at the run() method again. This time, we only annotate the code related to epoll bug
@Override protected void run() { // epoll bug count variable. int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { //Normally, when there is no task or IO event, the thread will block here //It will not wake up until there is a ready channel or the deadline for scheduling tasks //If the epoll bug is triggered, the selector.select() will also wake up when no channel is ready //As a result, the thread has been empty polling in for(;;), and the cpu utilization rate is 100% strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { //The IO exception also reconstructs the selector, which will be described below rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } //epoll bug count variable //Normally, the thread is not blocked in the select() above, indicating that there are tasks to be executed //However, if there is no task execution, the thread will always poll empty and then select CNT++ //Therefore, after triggering the bug, selectCnt will become very large in a short time selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; //Whether the task has been executed. true if it has been executed, false if it has not been executed //This is to judge the situation where there are ordinary tasks to be performed //Because the execution time of ordinary tasks may be very short, which will cause the thread to spin all the time, selectCnt will also accumulate greatly boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { //ranTasks assignment ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; //ranTasks assignment ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //ranTasks assignment ranTasks = runAllTasks(0); } //According to the judgment of ranTasks, the general task conditions must be true if (ranTasks || strategy > 0) { //MIN_PREMATURE_SELECTOR_RETURNS=3 //It may be that the deadline of the scheduling task is not far away. selector.select() wakes up several times in a short time //Just print the log if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } //Here, it is explained that IO events or common tasks are executed, and it is judged that the selector does not have epoll bug, //Set selectCnt to 0 selectCnt = 0; //If you enter the if branch below //ranTasks is false and no ordinary tasks have been executed //Strategy < = 0 indicates that there is no ready channel on the selector, but it is not blocked by selector.select() //Go in and see unexpected selector wakeup (select CNT) } else if (unexpectedSelectorWakeup(selectCnt)) { //Reassign attribute selectCnt = 0; } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } private boolean unexpectedSelectorWakeup(int selectCnt) { //The thread was interrupted, not the point if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } //SELECTOR_AUTO_REBUILD_THRESHOLD: the default value is 512 //If selectcnt > = 512, Netty thinks the epoll bug is triggered if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); //Rebuild Selector rebuildSelector(); return true; } return false; } public void rebuildSelector() { //The normal flow occurs in the EventLoop thread if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } //Follow this logic and rebuild the selector rebuildSelector0(); } private void rebuildSelector0() { //Get the selector that has triggered the bug final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } try { //Recreate the selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; //Traverse the old selector and re register the channel with the new selector for (SelectionKey key: oldSelector.keys()) { //channel Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //Get the event of interest of the old selector int interestOps = key.interestOps(); //Destroy key.cancel(); //Register the channel to the new selector SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } //Reassign the field of NioEventLoop selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { //Close the old selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
Here's how Netty solves the epoll bug. To sum up, in the run() method, judge whether the current selector has triggered a bug through two variables, loop count and whether it has processed ordinary tasks. If it is triggered, rebuild it, and re register the channel registered on the old selector with the new selector. In this way, the reconstruction of the selector is completed, and then you can continue to process IO events or ordinary tasks.
The above is the source code analysis of NioEventLoop. After reading it, we have a deeper understanding of EventLoopGroup and EventLoop. The EventLoopGroup does not save thread instances, but a selector (used to select a single EventLoop) and an EventLoop array. The EventLoop stores thread instances, selectors, common task queues and scheduling task queues. After calling the execute() method, the thread will be started and called back to the run() method of EventLoop to process IO events and common tasks. If you are still a friend who doesn't know the process here, you'd better open the source code and sort it out several times according to the process of this article. I believe you can master this knowledge soon.