netty source code analysis-thread pool

As we all know, netty is a nio framework with excellent performance. As the underlying data transmission framework of many excellent projects such as dubbo, it is absolutely beneficial and harmless for our future development to study it thoroughly. So from today on, we will study netty. This analysis is based on netty 4. Please download the jar package and source code by yourselves. Okay, let's talk about netty's thread pool today.
We often see the following sentence in netty's code.

EventLoopGroup workerGroup = new NioEventLoopGroup();

Simple new is an event handler (without looking at how the official explains the concept, define it yourself, don't spray). But what he did inside was far more simple than what he saw. This is also a guideline for us to read the source code. Don't ignore every code you think is insignificant. Maybe it plays an important role. His concrete realization

### io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
/**
 * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
 * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
 */
public NioEventLoopGroup() {
    this(0);
}

### io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
/**
 * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

### MultithreadEventLoopGroup.java:39
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

### Represents the class and method names from which the code comes, because they are highly relevant, so I put them in the same block of code to prevent thinking from jumping too much and everyone can't keep up with the rhythm. Although the number of threads is set to zero at the time of initialization, it is not the final result. After many calls to constructors and references to parent constructors, a transformation is made here. When 0, the value of DEFAULT_EVENT_LOOP_THREADS will be taken, and his value will be taken if the value of io.netty.eventLoopThreads is set, if not set, then the value of DEFAULT_EVENT_LOOP_THREADS will be taken. The default value is twice the number of available kernels, and the maximum value is assigned when compared with 1. Finally, we arrived at the best constructor.

### io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)

/**
 * Create a new instance.
 *
 * @param nThreads          the number of threads that will be used by this instance.
 * @param executor          the Executor to use, or {@code null} if the default should be used.
 * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {   //How to empty executor, then set the default executor to ThreadPerTaskExecutor
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];   //MultithreadEventExecutor Group is a general managed class, which is handed over to its child ren specifically related to threads. It is an array of EventExecutors.

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);   //Initialize each instance of EventExecutor, as explained in detail below
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);  //Create a thread selector to select which thread to process

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);   //Set these subprocessors to read-only and cannot be added
}

There are annotations on key steps in the above code. Generally speaking, the last dirty work is not done by this Group. It's done by our own internal children, and it's done by Event Executor. Let's see how this Event Executor is instantiated.

### io.netty.channel.nio.NioEventLoopGroup#newChild

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

### io.netty.channel.nio.NioEventLoop#NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

EventExecutor is initialized using NioEventLoop, which is a subclass of SingleThreadEventLoop, so super calls the construction method of SingleThreadEventLoop.


/**
 * Create a new instance
 *
 * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
 * @param executor          the {@link Executor} which will be used for executing
 * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
 *                          executor thread
 * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
 * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
 */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);    //Set up the processing queue for the task, and subsequent tasks will be added to it.
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

OK, each sub-event handler has a queue of tasks. So far, the initialization of thread pools has come to an end. I feel no addiction. Let's see next article.

Keywords: Netty Java Dubbo

Added by zeroecko on Sun, 26 May 2019 01:56:48 +0300