In the previous section, we analyzed service exposure. In this section, we analyzed how Dubbo handles consumer requests. Let's review the com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen method
protected void com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
The code above shows that we are familiar with netty services. This time we'll focus on Decoder and NettyServerHandler in NettyCodecAdapter
When a request comes in, netty's processing chain is NettyCodecAdapter.getDecoder () -> NettyServerHandler
First, let's look at the logic of NettyCodecAdapter.getDecoder(), whose corresponding class is
com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder, which inherits from netty's ByteToMessageDecoder
Note that this code comes from netty4, version 4.1.39.Final
When a request arrives, netty triggers a channel read event and eventually calls the channelRead method of InternalDecoder
public void io.netty.handler.codec.ByteToMessageDecoder#channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { //CodecOutputList inherits from AbstractList, so it is a list collection CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; //A cumulation equal to null means that there has been no cached data since the last request (the first request or the last request completely used the data). first = cumulation == null; if (first) { cumulation = data; } else { //Check if there is enough space to expand if not //Check the label (*1*) for specific expansion logic cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //Call decoding method callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } } //(*1*) public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { final ByteBuf buffer; //If you want enough capacity to store new content, you need to meet the following criteria //Derivation of cumulation.maxCapacity() - cumulation.writerIndex() > in.readableBytes() //Derived from cumulation.maxCapacity() - cumulation.writerIndex() - in.readableBytes() > 0 //cumulation.maxCapacity() - in.readableBytes() > cumulation.writerIndex() //So if there is a dissatisfaction, the capacity is not enough and needs to be expanded //Or this ByteBuf is read-only (we need a ByteBuf that can write data) //Or if the reference count is greater than 1, such as a call to retain, slice, etc., we should not change this buff at this time //Causes problems where it is used if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain() or if its read-only. // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 //Expansion, Expansion Logic Please check (*2*) buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } buffer.writeBytes(in); return buffer; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) in.release(); } } }; //(*2*) static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { ByteBuf oldCumulation = cumulation; //Request a ByteBuf with a capacity of oldCumulation.readableBytes() + readable cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); //Write Old Data cumulation.writeBytes(oldCumulation); //Release old ByteBuf to avoid memory leaks oldCumulation.release(); return cumulation; }
Call decoding method
protected void io.netty.handler.codec.ByteToMessageDecoder#callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //Loop as long as there is still availability data while (in.isReadable()) { //Is there decoded data int outSize = out.size(); //If present, trigger the channel read event, which is handled by the subsequent channel processor if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 //Check to see if the context of the current channel processing has been removed, if it has been removed, no coding will be done, and the next channel will handle the context after exit if (ctx.isRemoved()) { break; } outSize = 0; } //Readable Bytes int oldInputLength = in.readableBytes(); //Specific processing logic decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } //If there is no well-coded data (probably not enough data is needed) if (outSize == out.size()) { //If the readable data is the same as it was before, you can leave the task without data decoding logic at all if (oldInputLength == in.readableBytes()) { break; } else { continue; } } //If the data has an understanding code, that is, a value in out, whether it's add ed in blindly or the data encoding obtained by reading the subscript without affecting byteBuf, in byteBuf //Error if readable data has not changed if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } //Whether only this encoding should be done, default false if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); } }
decodeRemovalReentryProtection
final void io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //Modify encoding state to call subclass encoding decodeState = STATE_CALLING_CHILD_DECODE; try { //Delegate Subclass Processing decode(ctx, in, out); } finally { //If the subclass modifies the encoding state to remove data boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; //Reset to Initialization State decodeState = STATE_INIT; if (removePending) { //(*1*) handlerRemoved(ctx); } } } //(*1*) public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { if (decodeState == STATE_CALLING_CHILD_DECODE) { decodeState = STATE_HANDLER_REMOVED_PENDING; return; } ByteBuf buf = cumulation; if (buf != null) { // Directly set this to null so we are sure we not access it in any other method here anymore. //Empty data, free memory cumulation = null; numReads = 0; int readable = buf.readableBytes(); if (readable > 0) { //Trigger channel read event if there is readable data ByteBuf bytes = buf.readBytes(readable); //Release reference buf.release(); ctx.fireChannelRead(bytes); //Trigger Read Complete Event ctx.fireChannelReadComplete(); } else { //Release reference buf.release(); } } //Empty Method handlerRemoved0(ctx); }
Calling a subclass to decode begins the real InternalDecoder.decode code into dubbo
protected void com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { //Packaged backup of data passed by netty //Basically this is an application of adapter mode ChannelBuffer message = new NettyBackedChannelBuffer(input); //Create NettyChannel, maintain this netty channel and service providers url, service providers channel processing NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; //Record Start Reading Subscript int saveReaderIndex; try { // decode object. do { saveReaderIndex = message.readerIndex(); try { //Call DubboCountCodec processing msg = codec.decode(channel, message); } catch (IOException e) { throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { //Restore the original read location if more data is needed message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } //Add decoded data if (msg != null) { out.add(msg); } } } while (message.readable()); } finally { //If this channel is closed, remove it from the cache NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
DubboCountCodec#decode
public Object com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); //Create a MultiMessage collection, which maintains a List collection internally and implements the Iterable interface itself MultiMessage result = MultiMessage.create(); do { //Call DubboCodec Object obj = codec.decode(channel, buffer); //More data is needed if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { //Restore Read Start Subscript buffer.readerIndex(save); break; } else { //Record decoded data result.addMessage(obj); //Give the result type, if Request, then set the number of bytes read to it as an additional parameter //If Response, set it to output its own number as an additional parameter logMessageLength(obj, buffer.readerIndex() - save); //Update Read Start Subscript save = buffer.readerIndex(); } } while (true); //If empty, more data is required if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } //Return decoded data if (result.size() == 1) { return result.get(0); } return result; }
DubboCodec decoding
Before proceeding with the code below, let us know about the dubbo request agreement for a better understanding
The protocol header has 16 bytes (0 ~ 127) in size
-
0-15 digits: magic number
-
16-bit: Request,1 request, 0 response
-
17 digits: Whether it's a twoWay or not, from the source code, it seems that the meaning of this twoWay means whether you need to respond or not (incorrectly understood).
-
18 Bit: Is it an event, indicating a heartbeat event, perhaps the current request or response is from a heartbeat, not an interface consumption request
-
19-23 bits: Serialized id, used to identify how data is being serialized, such as hessian2
-
24-31 bits: status, such as ok(20)
-
32-95 bits: the requested id, from the source code that follows, I find that it will be stored in com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#FUTURES with key as the request ID -> DefaultFuture
This DefaultFuture is similar to JDK's Future, calling its get method will block until the result is processed -
96-127 bits: length of data identifying the requestor or responder
-
128 and later: Data
public Object com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec#decode(Channel channel, ChannelBuffer buffer) throws IOException { //Readable Bytes int readable = buffer.readableBytes(); //HEADER_LENGTH = 16, in dubbo protocol, hessian2 is used as serializer //Here 16 bytes is the dubbo protocol header byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; //Read data from buffer into header byte array buffer.readBytes(header); return decode(channel, buffer, readable, header); } | V protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. //Check magic number, magic number takes up two bytes //If the first two bytes are not magic numbers, they will go into the following branches if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; //If there is still readable data, all data for the buffer will be read into the header array here if (header.length < readable) { //Expand header array to readable size and copy header's original data into a new byte array header = Bytes.copyOf(header, readable); //Write the rest of the data to the header buffer.readBytes(header, length, readable - length); } //Being able to enter this branch means position 0 is definitely not magic, so start from 1 for (int i = 1; i < header.length - 1; i++) { //Loop to find where magic begins if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { //Set the reading subscript to the beginning of the magic number //Why use buffer.readerIndex() - header.length here?Because this i is based on header //The subscript is not buffer, so you need to add the difference between this i and (buffer.readerIndex() - header.length) buffer.readerIndex(buffer.readerIndex() - header.length + i); //Save the data before the magic number starts with a new array and assign it to the header header = Bytes.copyOf(header, i); break; } } //Call the parent class, which is TelnetCodec, for handling telnet commands //For example, typing Ctrl + C will close the channel, the up and down keys will send historical commands, and you will find out for yourself if you are interested return super.decode(channel, buffer, readable, header); } // check length. //If the first byte is where magic begins and if the current readable value is less than 16 bytes, an identity requiring more data is returned if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. //Reads 4 bytes from the 12th byte (int takes up 4 bytes) and converts them to int values //This value identifies the length of data carried by the message body int len = Bytes.bytes2int(header, 12); //Check the load and throw an exception if it exceeds the set size checkPayload(channel, len); //Data length plus protocol header length int tt = len + HEADER_LENGTH; //Tips need more data if there is not enough readable data if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. //Create a stream that starts reading buffer.readerIndex() = 17 and ends at 17 + len (data length) ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { //Resolve Request Body return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
Resolve Request Body
protected Object com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { //flag contains whether a request request is requested, whether a response is required, whether it is a heartbeat event, and the identification of a serialized id //proto is used to identify what serialization protocol is byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // get request id. //8 bytes after the fourth byte (long takes up 8 bytes) is the request id/response ID long id = Bytes.bytes2long(header, 4); //Is it a request request request, if not a response to branch into if ((flag & FLAG_REQUEST) == 0) { // decode response. //Create Response Object Response res = new Response(id); //Is it a heartbeat, and if so, sets the event to a heartbeat if ((flag & FLAG_EVENT) != 0) { res.setEvent(Response.HEARTBEAT_EVENT); } // get status. //Response status, such as 20 identifying ok byte status = header[3]; res.setStatus(status); try { //Internally obtained the corresponding serialization protocol by serializing the id, default is hessian2 ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); //If the response is ok if (status == Response.OK) { Object data; //Is it a heartbeat event? if (res.isHeartbeat()) { //Internal direct in.readObject() data = decodeHeartbeatData(channel, in); //Is there anything else?At present this version seems to only have a heartbeat } else if (res.isEvent()) { //in.readObject() called directly by internal deserialization data = decodeEventData(channel, in); } else { //Used to encapsulate response result sets DecodeableRpcResult result; //Decode in the Current io thread?Default to true if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); //Response volume deserialization result.decode(); } else { //This is left to be resolved in later multithreads result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } //DecodeableRpcResult res.setResult(data); } else { //Read error message if response is unsuccessful res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } //CLIENT_ERROR = 90 res.setStatus(Response.CLIENT_ERROR); //Exception message res.setErrorMessage(StringUtils.toString(t)); } return res; } else { // decode request. //Create Request Request req = new Request(id); //Set up dubbo protocol version req.setVersion(Version.getProtocolVersion()); //Is response required req.setTwoWay((flag & FLAG_TWOWAY) != 0); //Is it a heartbeat if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; //Send serialization, using hessian2 by default ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); //heartbeat if (req.isHeartbeat()) { //in.readObject() data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { //in.readObject() data = decodeEventData(channel, in); } else { //Build a call context that parses out information about the interface, method, parameters, and so on for a brief call DecodeableRpcInvocation inv; //Is it resolved in the current thread if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); //Resolve Request Body inv.decode(); } else { //Resolve in thread pool inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } // bad request req.setBroken(true); req.setData(t); } return req; } }
Parse the response body (for example, we call a remote interface and the remote service returns data)
public Object DecodeableRpcResult#decode(Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); //Reads a byte that identifies the type of response result byte flag = in.readByte(); switch (flag) { //If the response result is null case DubboCodec.RESPONSE_NULL_VALUE: break; case DubboCodec.RESPONSE_VALUE: try { //Return type of parsing method Type[] returnType = RpcUtils.getReturnTypes(invocation); //Return type deserialized values based on interface methods setValue(returnType == null || returnType.length == 0 ? in.readObject() : (returnType.length == 1 ? in.readObject((Class<?>) returnType[0]) : in.readObject((Class<?>) returnType[0], returnType[1]))); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; //If there is an error calling the remote interface case DubboCodec.RESPONSE_WITH_EXCEPTION: try { Object obj = in.readObject(); //If this obj is not a type, hee-hee, error if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); //Record exceptions setException((Throwable) obj); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; //Response to null values, but with additional parameters case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: try { //Direct deserialization of additional parameters is sufficient setAttachments((Map<String, String>) in.readObject(Map.class)); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; //Normal calls with additional parameters case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS: try { //Resolve the return value type of the calling method Type[] returnType = RpcUtils.getReturnTypes(invocation); //Send Serialized Return Value setValue(returnType == null || returnType.length == 0 ? in.readObject() : (returnType.length == 1 ? in.readObject((Class<?>) returnType[0]) : in.readObject((Class<?>) returnType[0], returnType[1]))); //Deserialization Additional Parameters setAttachments((Map<String, String>) in.readObject(Map.class)); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; //An exception occurred while calling the remote interface, but with parameters attached case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: try { Object obj = in.readObject(); if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); //First Deserialize Exceptions setException((Throwable) obj); //Then deserialize the additional parameters setAttachments((Map<String, String>) in.readObject(Map.class)); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; default: throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); } //Clear if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } return this; }
From the data parsing the responder above, its responder structure is probably flag (used to indicate whether there are results, whether there are exceptions, etc., taking up a byte)+the returned results/exceptions/additional parameters
In the next section, let's continue with the analysis if the request body is resolved