In the previous article, we analyzed the accept process in the process SelectedKey method. This article will analyze the read process in the work thread.
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //Check the SelectionKey Is it valid? If it is invalid, close it channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // If ready READ or ACCEPT Trigger unsafe.read() ,Check whether it is 0 or not, as stated in the English commentary on the source code above: JDK One that may produce a dead cycle bug. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//If it is closed, it can be returned directly without further processing. channel Other events // Connection already closed - no need to handle write. return; } } // If ready WRITE The data in the buffer is sent out, and if the data in the buffer is sent out, the previous concerns are cleared. OP_WRITE sign if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // If it is OP_CONNECT,Need to be removed OP_CONNECT otherwise Selector.select(timeout)It will return immediately without any blockage, which may occur. cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
This method mainly checks the Selection Key. There are several different situations as follows.
1) OP_ACCEPT, accepting client connection
2) OP_READ, a readable event that receives new data from Channel for upper-level reading.
3) OP_WRITE, writable event, that is, upper layer can write data to Channel.
4) OP_CONNECT, connection establishment event, that is, TCP connection has been established, Channel is in active state.
This blog post focuses on what happens internally when the work thread selector detects the OP_READ event.
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//If it is closed, it can be returned directly without further processing. channel Other events // Connection already closed - no need to handle write. return; } }
As you can see from the code, when the selectionKey event is SelectionKey.OP_READ, the unsafe read method is executed. Note that unsafe here is an example of NioByteUnsafe
Why is unsafe here an example of NioByteUnsafe? In the previous blog, Netty source code analysis: accept, we know that NioEventLoop in Boss NioEventLoop Group is only responsible for the accpt client connection, and then registers the client to NioEventLoop in Work NioEventLoop Group, that is, it is the selector corresponding to the work thread that eventually monitors the read waiting time, that is, in the work thread. channel is Socket channel. The unsafe of Socket channel is an example of NioByteUnsafe.
Let's look at the read method in NioByteUnsafe
@Override public void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { //1,Allocation buffer byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();//Writable byte capacity //2,take socketChannel Data write cache int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } //3,trigger pipeline Of ChannelRead Incident to right byteBuf Follow-up processing pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
Here's an overview of the more important code
The instantiation process of allocHandler
allocHandle is responsible for adaptively adjusting the size of the current cache allocation to prevent overallocation or underallocation. Let's first look at the instantiation process of allocHandler.
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); }
Among them, config.getRecvByteBufAllocator() gets an instance of Adaptive RecvByteBufAllocator DEFAULT.
public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
The code for the newHandler() method in AdaptiveRecvByteBufAllocator is as follows:
@Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); } HandleImpl(int minIndex, int maxIndex, int initial) { this.minIndex = minIndex; this.maxIndex = maxIndex; index = getSizeTableIndex(initial); nextReceiveBufferSize = SIZE_TABLE[index]; }
Among them, the parameter used in the above method: minIndex maxIndex initial, what does it mean? The meaning is as follows: minIndex is the corresponding subscript in SIZEE_TABLE for the minimum cache. MaxIndex is the corresponding subscript in SISE_TABLE for the largest cache, and initial is the initial cache size.
Relevant Constant Fields of Adaptive RecvByteBuf Allocator
public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator { static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1; private static final int[] SIZE_TABLE;
The specific meanings of the above fields are as follows:
SIZE_TABLE: Pre-store the allocatable cache size in order from small to large.
Starting from 16, each time accumulates 16, until 496, then from 512, each time doubles, until overflow. SIZE_TABLE initialization process is as follows.
static { List<Integer> sizeTable = new ArrayList<Integer>(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }
DEFAULT_MINIMUM: Minimum cache (64), with the corresponding subscript of 3 in SIZEE_TABLE.
DEFAULT_MAXIMUM: Maximum cache (65536), with a corresponding subscript of 38 in SIZEE_TABLE.
DEFAULT_INITIAL: Initialize the size of the cache. When allocating the cache for the first time, there is no reference to the actual number of bytes received in the last time, we need to give a default initial value.
5) INDEX_INCREMENT: The last estimated cache is small, and the next index increment.
6) INDEX_DECREMENT: The last estimated cache is too large, and the next index decreases.
Constructor:
private AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) { if (minimum <= 0) { throw new IllegalArgumentException("minimum: " + minimum); } if (initial < minimum) { throw new IllegalArgumentException("initial: " + initial); } if (maximum < initial) { throw new IllegalArgumentException("maximum: " + maximum); } int minIndex = getSizeTableIndex(minimum); if (SIZE_TABLE[minIndex] < minimum) { this.minIndex = minIndex + 1; } else { this.minIndex = minIndex; } int maxIndex = getSizeTableIndex(maximum); if (SIZE_TABLE[maxIndex] > maximum) { this.maxIndex = maxIndex - 1; } else { this.maxIndex = maxIndex; } this.initial = initial; }
The constructor checks the validity of the parameters, and then initializes the following three fields, which are the parameters used to generate the allocHandle object above.
private final int minIndex; private final int maxIndex; private final int initial;
The code of the getSizeTableIndex function is as follows. The function of the getSizeTableIndex function is to find elements in SIZE_TABLE that are just equal to or greater than size.
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { //Here's the case. a < size <= b Situation return mid + 1; } } }
byteBuf = allocHandle.allocate(allocator);
Apply for a specified size of memory
AdaptiveRecvByteBufAllocator#HandlerImpl
@Override public ByteBuf allocate(ByteBufAllocator alloc) { return alloc.ioBuffer(nextReceiveBufferSize); }
Call the ioBuffer method directly, and continue to see
AbstractByteBufAllocator.java
@Override public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); }
The main logic of the ioBuffer function is: to see if the platform supports unsafe, choose to use direct physical memory or heap memory. First look at heap Buffer
AbstractByteBufAllocator.java
@Override public ByteBuf heapBuffer(int initialCapacity) { return heapBuffer(initialCapacity, Integer.MAX_VALUE); } @Override public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); }
There are two implementations of the new HeapBuffer: as for which one to use, it depends on the settings of the system property io.netty.allocator.type. If set to "pooled", the cache allocator is: Pooled ByteBufAllocator, and then uses object pool technology to allocate memory. If not set or otherwise, the cache allocator is Unpooled ByteBufAllocator, which returns an Unpooled HeapByteBuf object directly.
UnpooledByteBufAllocator.java
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
PooledByteBufAllocator.java
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<byte[]> heapArena = cache.heapArena; ByteBuf buf; if (heapArena != null) { buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
Look at directBuffer again
AbstractByteBufAllocator.java
@Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, Integer.MAX_VALUE); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity);//Validity check of parameters return newDirectBuffer(initialCapacity, maxCapacity); }
Like the new HeapBuffer, there are two implementations of the new DirectBuffer method here: as for which one to use, it depends on the settings of the system attribute io.netty.allocator.type. If it is set to "pooled", the cache allocator is: Pooled ByteBufAllocator, which then uses object pool technology to allocate memory. . If not set or otherwise, the cache allocator is UnPooled ByteBuf Allocator. Here's a look at the internal implementation of Unpooled ByteBuf Allocator. newDirectBuffer
UnpooledByteBufAllocator.java
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
How does Unpooled Unsafe Direct ByteBuf implement cache management? Nio's ByteBuffer is encapsulated and cached by ByteBuffer's allocateDirect method.
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); //Some code for parameter checking is omitted this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity)); }
protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; capacity = buffer.remaining(); }
The main logic of the above code is:
1. First, the allocateDirect method of ByteBuffer is used to allocate the cache size of initial Capacity.
2. Then decide to free the old cache
3. Finally, assign the new cache to the field buffer
Where: memoryAddress = PlatformDependent.directBufferAddress(buffer) gets the address field value of the buffer and points to the cache address.
capacity = buffer.remaining() gets the cache capacity.
Next, look at the toLeak Aware Buffer (buf) method
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeak leak; switch (ResourceLeakDetector.getLevel()) { case SIMPLE: leak = AbstractByteBuf.leakDetector.open(buf); if (leak != null) { buf = new SimpleLeakAwareByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.open(buf); if (leak != null) { buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; } return buf; }
Methods ToLeak Aware Buffer (buf) packaged the applied buf again.
After getting the cache, go back to the AbstractNioByteChannel.read method and continue.
doReadBytes method
Let's look at the doReadBytes method: write socketChannel data to the cache.
NioSocketChannel.java
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); }
Read the data from Channel into the cache byteBuf. Keep looking.
WrappedByteBuf.java
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { return buf.writeBytes(in, length); }
AbstractByteBuf.java
@Override public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; }
The setBytes method here has different implementations. Here we look at the setBytes implementation of Unpooled Unsafe DirectByteBuf.
UnpooledUnsafeDirectByteBuf.java
@Override public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); ByteBuffer tmpBuf = internalNioBuffer(); tmpBuf.clear().position(index).limit(index + length); try { return in.read(tmpBuf); } catch (ClosedChannelException ignored) { return -1;//When Channel Closed, returned-1. } } private ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; if (tmpNioBuf == null) { this.tmpNioBuf = tmpNioBuf = buffer.duplicate(); } return tmpNioBuf; }
Ultimately, the bottom layer uses ByteBuffer to implement read operation. Whether it is Pooled ByteBuf or Unpooled XXXBuf, the bottom data structure BufBuffer/array is converted into ByteBuffer to realize read operation. That is, whether it is UnPooled XXXBuf or Pooled XXXBuf, there is a ByteBuffer tmpNioBuf, which is really used to store the content read from the pipeline Channel. This completes reading channel data into the cache Buf.
Let's look at in.read(tmpBuf); FileChannel and SocketChannel's reads are finally implemented by IOUtil, which relies on, as follows
public int read(ByteBuffer dst) throws IOException { ensureOpen(); if (!readable) throw new NonReadableChannelException(); synchronized (positionLock) { int n = 0; int ti = -1; try { begin(); ti = threads.add(); if (!isOpen()) return 0; do { n = IOUtil.read(fd, dst, -1, nd); } while ((n == IOStatus.INTERRUPTED) && isOpen()); return IOStatus.normalize(n); } finally { threads.remove(ti); end(n > 0); assert IOStatus.check(n); } } }
The final goal is to store the data readout from Socket Channel in ByteBuffer dst. Let's look at IOUtil.read(fd, dst, -1, nd).
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); //If the data is ultimately loaded buffer yes DirectBuffer,Read data directly into out-of-heap memory } else if (var1 instanceof DirectBuffer) { return readIntoNativeBuffer(var0, var1, var2, var4); } else { // Allocate temporary out-of-heap memory ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); int var7; try { // Socket I/O Operations read data into out-of-heap memory int var6 = readIntoNativeBuffer(var0, var5, var2, var4); var5.flip(); if (var6 > 0) { // Copy data from out-of-heap memory into heap memory (user-defined cache, in jvm Distribution of memory) var1.put(var5); } var7 = var6; } finally { // It will call DirectBuffer.cleaner().clean()To release temporary out-of-heap memory Util.offerFirstTemporaryDirectBuffer(var5); } return var7; } }
2. If the cache memory is heap memory, first apply for a temporary DirectByteBuffer var5 of the same size as the cache.
3. Read the data from the kernel cache to the out-of-heap cache var5. The bottom layer is implemented by read of Native Dispatcher.
4. Copy the data of out-of-heap cache var5 to heap memory var1 (user-defined cache, allocating memory in jvm).
private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj) throws IOException { int i = bytebuffer.position(); int j = bytebuffer.limit(); //If the assertion is opened, buffer Of position greater than limit,Throw an assertion error if(!$assertionsDisabled && i > j) throw new AssertionError(); //Get the number of bytes to read int k = i > j ? 0 : j - i; if(k == 0) return 0; int i1 = 0; //Read from the input stream k Byte to buffer if(l != -1L) i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj); else i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k); //Relocation buffer Of position if(i1 > 0) bytebuffer.position(i + i1); return i1; }
Go back to the AbstractNioByteChannel.read method and continue.
@Override public void read() { //... try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
int localReadAmount = doReadBytes(byteBuf);
1. If 0 is returned, it means that no data has been read, then it exits the loop.
2. If it returns to - 1, indicating that the connection has been closed, it exits the loop.
3. Otherwise, it means that the data is read. After the data is read into the cache, the ChannelRead event of pipeline is triggered, and byteBuf is used as a parameter for subsequent processing. Then the handler of custom Inbound type can be used for business processing. Pipeline's event handling was described in detail in my previous blog post. After processing is completed, the data is read from Channel again until the loop exits.
4. When the number of loops exceeds that of maxMessages PerRead, the data of maxMessages PerRead times can only be read in the pipeline, either not finished or exited. In the previous blog post, Boss thread accepts client connections using this variable, which means that only maxMessagesPerRead client connections can be accepted once the BOSS thread selector detects the OP_ACCEPT event.