Netty Source Code Analysis--read Process Source Code Analysis

In the previous article, we analyzed the accept process in the process SelectedKey method. This article will analyze the read process in the work thread.

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //Check the SelectionKey Is it valid? If it is invalid, close it channel
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // If ready READ or ACCEPT Trigger unsafe.read() ,Check whether it is 0 or not, as stated in the English commentary on the source code above: JDK One that may produce a dead cycle bug. 
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {//If it is closed, it can be returned directly without further processing. channel Other events
                // Connection already closed - no need to handle write.
                return;
            }
        }
        // If ready WRITE The data in the buffer is sent out, and if the data in the buffer is sent out, the previous concerns are cleared. OP_WRITE sign
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // If it is OP_CONNECT,Need to be removed OP_CONNECT otherwise Selector.select(timeout)It will return immediately without any blockage, which may occur. cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

This method mainly checks the Selection Key. There are several different situations as follows.

1) OP_ACCEPT, accepting client connection

2) OP_READ, a readable event that receives new data from Channel for upper-level reading.

3) OP_WRITE, writable event, that is, upper layer can write data to Channel.

4) OP_CONNECT, connection establishment event, that is, TCP connection has been established, Channel is in active state.

This blog post focuses on what happens internally when the work thread selector detects the OP_READ event.

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {//If it is closed, it can be returned directly without further processing. channel Other events
        // Connection already closed - no need to handle write.
        return;
    }
} 

As you can see from the code, when the selectionKey event is SelectionKey.OP_READ, the unsafe read method is executed. Note that unsafe here is an example of NioByteUnsafe

Why is unsafe here an example of NioByteUnsafe? In the previous blog, Netty source code analysis: accept, we know that NioEventLoop in Boss NioEventLoop Group is only responsible for the accpt client connection, and then registers the client to NioEventLoop in Work NioEventLoop Group, that is, it is the selector corresponding to the work thread that eventually monitors the read waiting time, that is, in the work thread. channel is Socket channel. The unsafe of Socket channel is an example of NioByteUnsafe.

Let's look at the read method in NioByteUnsafe

@Override
    public void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                //1,Allocation buffer
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();//Writable byte capacity
                //2,take socketChannel Data write cache
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    close = localReadAmount < 0;
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
                //3,trigger pipeline Of ChannelRead Incident to right byteBuf Follow-up processing
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
} 

Here's an overview of the more important code

The instantiation process of allocHandler

allocHandle is responsible for adaptively adjusting the size of the current cache allocation to prevent overallocation or underallocation. Let's first look at the instantiation process of allocHandler.

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
    this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}

Among them, config.getRecvByteBufAllocator() gets an instance of Adaptive RecvByteBufAllocator DEFAULT.

public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

The code for the newHandler() method in AdaptiveRecvByteBufAllocator is as follows:

@Override
public Handle newHandle() {
    return new HandleImpl(minIndex, maxIndex, initial);
}

HandleImpl(int minIndex, int maxIndex, int initial) {
    this.minIndex = minIndex;
    this.maxIndex = maxIndex;

    index = getSizeTableIndex(initial);
    nextReceiveBufferSize = SIZE_TABLE[index];
}

Among them, the parameter used in the above method: minIndex maxIndex initial, what does it mean? The meaning is as follows: minIndex is the corresponding subscript in SIZEE_TABLE for the minimum cache. MaxIndex is the corresponding subscript in SISE_TABLE for the largest cache, and initial is the initial cache size.

Relevant Constant Fields of Adaptive RecvByteBuf Allocator

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {

        static final int DEFAULT_MINIMUM = 64;
        static final int DEFAULT_INITIAL = 1024;
        static final int DEFAULT_MAXIMUM = 65536;

        private static final int INDEX_INCREMENT = 4;
        private static final int INDEX_DECREMENT = 1;

        private static final int[] SIZE_TABLE; 

The specific meanings of the above fields are as follows:

SIZE_TABLE: Pre-store the allocatable cache size in order from small to large.  
Starting from 16, each time accumulates 16, until 496, then from 512, each time doubles, until overflow. SIZE_TABLE initialization process is as follows.

static {
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

DEFAULT_MINIMUM: Minimum cache (64), with the corresponding subscript of 3 in SIZEE_TABLE.

DEFAULT_MAXIMUM: Maximum cache (65536), with a corresponding subscript of 38 in SIZEE_TABLE.

DEFAULT_INITIAL: Initialize the size of the cache. When allocating the cache for the first time, there is no reference to the actual number of bytes received in the last time, we need to give a default initial value.

5) INDEX_INCREMENT: The last estimated cache is small, and the next index increment.

6) INDEX_DECREMENT: The last estimated cache is too large, and the next index decreases.

Constructor:

private AdaptiveRecvByteBufAllocator() {
    this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
    if (minimum <= 0) {
        throw new IllegalArgumentException("minimum: " + minimum);
    }
    if (initial < minimum) {
        throw new IllegalArgumentException("initial: " + initial);
    }
    if (maximum < initial) {
        throw new IllegalArgumentException("maximum: " + maximum);
    }

    int minIndex = getSizeTableIndex(minimum);
    if (SIZE_TABLE[minIndex] < minimum) {
        this.minIndex = minIndex + 1;
    } else {
        this.minIndex = minIndex;
    }

    int maxIndex = getSizeTableIndex(maximum);
    if (SIZE_TABLE[maxIndex] > maximum) {
        this.maxIndex = maxIndex - 1;
    } else {
        this.maxIndex = maxIndex;
    }

    this.initial = initial;
}

The constructor checks the validity of the parameters, and then initializes the following three fields, which are the parameters used to generate the allocHandle object above.

private final int minIndex;
private final int maxIndex;
private final int initial;

The code of the getSizeTableIndex function is as follows. The function of the getSizeTableIndex function is to find elements in SIZE_TABLE that are just equal to or greater than size.

private static int getSizeTableIndex(final int size) {
    for (int low = 0, high = SIZE_TABLE.length - 1;;) {
        if (high < low) {
            return low;
        }
        if (high == low) {
            return high;
        }

        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else { //Here's the case. a < size <= b Situation
            return mid + 1;
        }
    }
}

byteBuf = allocHandle.allocate(allocator);

Apply for a specified size of memory

AdaptiveRecvByteBufAllocator#HandlerImpl

@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(nextReceiveBufferSize);
}

Call the ioBuffer method directly, and continue to see

AbstractByteBufAllocator.java

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

The main logic of the ioBuffer function is: to see if the platform supports unsafe, choose to use direct physical memory or heap memory. First look at heap Buffer

AbstractByteBufAllocator.java 

@Override
public ByteBuf heapBuffer(int initialCapacity) {
    return heapBuffer(initialCapacity, Integer.MAX_VALUE);
}

@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newHeapBuffer(initialCapacity, maxCapacity);
} 

There are two implementations of the new HeapBuffer: as for which one to use, it depends on the settings of the system property io.netty.allocator.type. If set to "pooled", the cache allocator is: Pooled ByteBufAllocator, and then uses object pool technology to allocate memory. If not set or otherwise, the cache allocator is Unpooled ByteBufAllocator, which returns an Unpooled HeapByteBuf object directly.

UnpooledByteBufAllocator.java

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

PooledByteBufAllocator.java

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<byte[]> heapArena = cache.heapArena;

    ByteBuf buf;
    if (heapArena != null) {
        buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

Look at directBuffer again

AbstractByteBufAllocator.java

@Override
public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, Integer.MAX_VALUE);
}  

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);//Validity check of parameters
    return newDirectBuffer(initialCapacity, maxCapacity);
}

Like the new HeapBuffer, there are two implementations of the new DirectBuffer method here: as for which one to use, it depends on the settings of the system attribute io.netty.allocator.type. If it is set to "pooled", the cache allocator is: Pooled ByteBufAllocator, which then uses object pool technology to allocate memory. . If not set or otherwise, the cache allocator is UnPooled ByteBuf Allocator. Here's a look at the internal implementation of Unpooled ByteBuf Allocator. newDirectBuffer

UnpooledByteBufAllocator.java

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

How does Unpooled Unsafe Direct ByteBuf implement cache management? Nio's ByteBuffer is encapsulated and cached by ByteBuffer's allocateDirect method.

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    //Some code for parameter checking is omitted
    this.alloc = alloc;
    setByteBuffer(allocateDirect(initialCapacity));
}
protected ByteBuffer allocateDirect(int initialCapacity) {
    return ByteBuffer.allocateDirect(initialCapacity);
}

private void setByteBuffer(ByteBuffer buffer) {
    ByteBuffer oldBuffer = this.buffer;
    if (oldBuffer != null) {
        if (doNotFree) {
            doNotFree = false;
        } else {
            freeDirect(oldBuffer);
        }
    }

    this.buffer = buffer;
    memoryAddress = PlatformDependent.directBufferAddress(buffer);
    tmpNioBuf = null;
    capacity = buffer.remaining();
}

The main logic of the above code is:

1. First, the allocateDirect method of ByteBuffer is used to allocate the cache size of initial Capacity.

2. Then decide to free the old cache

3. Finally, assign the new cache to the field buffer

Where: memoryAddress = PlatformDependent.directBufferAddress(buffer) gets the address field value of the buffer and points to the cache address.
capacity = buffer.remaining() gets the cache capacity.

Next, look at the toLeak Aware Buffer (buf) method

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeak leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.open(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.open(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
    }
    return buf;
}

Methods ToLeak Aware Buffer (buf) packaged the applied buf again.

After getting the cache, go back to the AbstractNioByteChannel.read method and continue.

doReadBytes method

Let's look at the doReadBytes method: write socketChannel data to the cache.

NioSocketChannel.java

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}

Read the data from Channel into the cache byteBuf. Keep looking.

WrappedByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    return buf.writeBytes(in, length);
} 

AbstractByteBuf.java

@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

The setBytes method here has different implementations. Here we look at the setBytes implementation of Unpooled Unsafe DirectByteBuf.

UnpooledUnsafeDirectByteBuf.java

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;//When Channel Closed, returned-1.    
    }
} 

private ByteBuffer internalNioBuffer() {
    ByteBuffer tmpNioBuf = this.tmpNioBuf;
    if (tmpNioBuf == null) {
        this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
    }
    return tmpNioBuf;
}

Ultimately, the bottom layer uses ByteBuffer to implement read operation. Whether it is Pooled ByteBuf or Unpooled XXXBuf, the bottom data structure BufBuffer/array is converted into ByteBuffer to realize read operation. That is, whether it is UnPooled XXXBuf or Pooled XXXBuf, there is a ByteBuffer tmpNioBuf, which is really used to store the content read from the pipeline Channel. This completes reading channel data into the cache Buf.

Let's look at in.read(tmpBuf); FileChannel and SocketChannel's reads are finally implemented by IOUtil, which relies on, as follows

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.read(fd, dst, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

The final goal is to store the data readout from Socket Channel in ByteBuffer dst. Let's look at IOUtil.read(fd, dst, -1, nd).

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    if (var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    //If the data is ultimately loaded buffer yes DirectBuffer,Read data directly into out-of-heap memory
    } else if (var1 instanceof DirectBuffer) {
        return readIntoNativeBuffer(var0, var1, var2, var4);
    } else {
        // Allocate temporary out-of-heap memory
        ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var7;
        try {
            // Socket I/O Operations read data into out-of-heap memory
            int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
            var5.flip();
            if (var6 > 0) {
                // Copy data from out-of-heap memory into heap memory (user-defined cache, in jvm Distribution of memory)
                var1.put(var5);
            }

            var7 = var6;
        } finally {
            // It will call DirectBuffer.cleaner().clean()To release temporary out-of-heap memory
            Util.offerFirstTemporaryDirectBuffer(var5);
        }

        return var7;
    }
}
Through the above implementation, we can see that the data reading steps based on channel are as follows:
1. If the cache memory is DirectBuffer, read the data from Channel directly into out-of-heap memory.
2. If the cache memory is heap memory, first apply for a temporary DirectByteBuffer var5 of the same size as the cache.
3. Read the data from the kernel cache to the out-of-heap cache var5. The bottom layer is implemented by read of Native Dispatcher.
4. Copy the data of out-of-heap cache var5 to heap memory var1 (user-defined cache, allocating memory in jvm).
5. DirectBuffer.cleaner().clean() is called to release the temporary out-of-heap memory created.
If the first step in AbstractNioByteChannel.read is to create out-of-heap memory, the data will be read directly into out-of-heap memory, instead of creating temporary out-of-heap memory first, then reading data into out-of-heap memory, and finally copying out-of-heap memory into heap memory.
Simply put, if you use out-of-heap memory, you will only copy data once, and if you use heap memory, you will copy data twice.
Let's take a look at readInto Native Buffer
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)  throws IOException  {  
    int i = bytebuffer.position();  
    int j = bytebuffer.limit();  
    //If the assertion is opened, buffer Of position greater than limit,Throw an assertion error  
    if(!$assertionsDisabled && i > j)  
        throw new AssertionError();  
    //Get the number of bytes to read  
    int k = i > j ? 0 : j - i;  
    if(k == 0)  
        return 0;  
    int i1 = 0;  
    //Read from the input stream k Byte to buffer  
    if(l != -1L)  
        i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);  
    else  
        i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);  
    //Relocation buffer Of position  
    if(i1 > 0)  
        bytebuffer.position(i + i1);  
    return i1;  
}  
This function reads data from the kernel buffer to the out-of-heap cache DirectBuffer

Go back to the AbstractNioByteChannel.read method and continue.

@Override
public void read() {
        //...
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    close = localReadAmount < 0;
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;

                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
}

int localReadAmount = doReadBytes(byteBuf);
1. If 0 is returned, it means that no data has been read, then it exits the loop.
2. If it returns to - 1, indicating that the connection has been closed, it exits the loop.
3. Otherwise, it means that the data is read. After the data is read into the cache, the ChannelRead event of pipeline is triggered, and byteBuf is used as a parameter for subsequent processing. Then the handler of custom Inbound type can be used for business processing. Pipeline's event handling was described in detail in my previous blog post. After processing is completed, the data is read from Channel again until the loop exits.

4. When the number of loops exceeds that of maxMessages PerRead, the data of maxMessages PerRead times can only be read in the pipeline, either not finished or exited. In the previous blog post, Boss thread accepts client connections using this variable, which means that only maxMessagesPerRead client connections can be accepted once the BOSS thread selector detects the OP_ACCEPT event.

Keywords: Java Netty socket JDK

Added by mrfritz379 on Thu, 12 Sep 2019 06:47:59 +0300