5. Resolution of dubbo Request Protocol and Responder for Request or Response Data Processing

In the previous section, we analyzed service exposure. In this section, we analyzed how Dubbo handles consumer requests. Let's review the com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen method

protected void com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

The code above shows that we are familiar with netty services. This time we'll focus on Decoder and NettyServerHandler in NettyCodecAdapter

When a request comes in, netty's processing chain is NettyCodecAdapter.getDecoder () -> NettyServerHandler

First, let's look at the logic of NettyCodecAdapter.getDecoder(), whose corresponding class is
com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder, which inherits from netty's ByteToMessageDecoder

Note that this code comes from netty4, version 4.1.39.Final

When a request arrives, netty triggers a channel read event and eventually calls the channelRead method of InternalDecoder

public void io.netty.handler.codec.ByteToMessageDecoder#channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        //CodecOutputList inherits from AbstractList, so it is a list collection
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            //A cumulation equal to null means that there has been no cached data since the last request (the first request or the last request completely used the data).
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                //Check if there is enough space to expand if not
                //Check the label (*1*) for specific expansion logic
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            //Call decoding method
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // See https://github.com/netty/netty/issues/4275
                numReads = 0;
                discardSomeReadBytes();
            }

            int size = out.size();
            firedChannelRead |= out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}


//(*1*)
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        try {
            final ByteBuf buffer;
            //If you want enough capacity to store new content, you need to meet the following criteria
            //Derivation of cumulation.maxCapacity() - cumulation.writerIndex() > in.readableBytes()
            //Derived from cumulation.maxCapacity() - cumulation.writerIndex() - in.readableBytes() > 0
            //cumulation.maxCapacity()  - in.readableBytes() > cumulation.writerIndex()
            //So if there is a dissatisfaction, the capacity is not enough and needs to be expanded
            //Or this ByteBuf is read-only (we need a ByteBuf that can write data)
            //Or if the reference count is greater than 1, such as a call to retain, slice, etc., we should not change this buff at this time
            //Causes problems where it is used
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain() or if its read-only.
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                //Expansion, Expansion Logic Please check (*2*)
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            return buffer;
        } finally {
            // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
            // for whatever release (for example because of OutOfMemoryError)
            in.release();
        }
    }
};

//(*2*)
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
    ByteBuf oldCumulation = cumulation;
    //Request a ByteBuf with a capacity of oldCumulation.readableBytes() + readable
    cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
    //Write Old Data
    cumulation.writeBytes(oldCumulation);
    //Release old ByteBuf to avoid memory leaks
    oldCumulation.release();
    return cumulation;
}

Call decoding method

protected void io.netty.handler.codec.ByteToMessageDecoder#callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        //Loop as long as there is still availability data
        while (in.isReadable()) {
            //Is there decoded data
            int outSize = out.size();
            //If present, trigger the channel read event, which is handled by the subsequent channel processor
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                //Check to see if the context of the current channel processing has been removed, if it has been removed, no coding will be done, and the next channel will handle the context after exit
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            //Readable Bytes
            int oldInputLength = in.readableBytes();
            //Specific processing logic
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }
            //If there is no well-coded data (probably not enough data is needed)
            if (outSize == out.size()) {
                //If the readable data is the same as it was before, you can leave the task without data decoding logic at all
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }
            //If the data has an understanding code, that is, a value in out, whether it's add ed in blindly or the data encoding obtained by reading the subscript without affecting byteBuf, in byteBuf
            //Error if readable data has not changed
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
            }
            //Whether only this encoding should be done, default false
            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}

decodeRemovalReentryProtection

final void io.netty.handler.codec.ByteToMessageDecoder#decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
    //Modify encoding state to call subclass encoding
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        //Delegate Subclass Processing
        decode(ctx, in, out);
    } finally {
        //If the subclass modifies the encoding state to remove data
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        //Reset to Initialization State
        decodeState = STATE_INIT;
        if (removePending) {
            //(*1*)
            handlerRemoved(ctx);
        }
    }
}

//(*1*)
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
    if (decodeState == STATE_CALLING_CHILD_DECODE) {
        decodeState = STATE_HANDLER_REMOVED_PENDING;
        return;
    }
    ByteBuf buf = cumulation;
    if (buf != null) {
        // Directly set this to null so we are sure we not access it in any other method here anymore.
        //Empty data, free memory
        cumulation = null;
        numReads = 0;
        int readable = buf.readableBytes();
        if (readable > 0) {
            //Trigger channel read event if there is readable data
            ByteBuf bytes = buf.readBytes(readable);
            //Release reference
            buf.release();
            ctx.fireChannelRead(bytes);
            //Trigger Read Complete Event
            ctx.fireChannelReadComplete();
        } else {
            //Release reference
            buf.release();
        }
    }
    //Empty Method
    handlerRemoved0(ctx);
}

Calling a subclass to decode begins the real InternalDecoder.decode code into dubbo

protected void com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
    //Packaged backup of data passed by netty
    //Basically this is an application of adapter mode
    ChannelBuffer message = new NettyBackedChannelBuffer(input);
    //Create NettyChannel, maintain this netty channel and service providers url, service providers channel processing
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

    Object msg;
    //Record Start Reading Subscript
    int saveReaderIndex;

    try {
        // decode object.
        do {
            saveReaderIndex = message.readerIndex();
            try {
                //Call DubboCountCodec processing
                msg = codec.decode(channel, message);
            } catch (IOException e) {
                throw e;
            }
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                //Restore the original read location if more data is needed
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                //is it possible to go here ?
                if (saveReaderIndex == message.readerIndex()) {
                    throw new IOException("Decode without read data.");
                }
                //Add decoded data
                if (msg != null) {
                    out.add(msg);
                }
            }
        } while (message.readable());
    } finally {
        //If this channel is closed, remove it from the cache
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

DubboCountCodec#decode

public Object com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();
    //Create a MultiMessage collection, which maintains a List collection internally and implements the Iterable interface itself
    MultiMessage result = MultiMessage.create();
    do {
        //Call DubboCodec
        Object obj = codec.decode(channel, buffer);
        //More data is needed
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            //Restore Read Start Subscript
            buffer.readerIndex(save);
            break;
        } else {
            //Record decoded data
            result.addMessage(obj);
            //Give the result type, if Request, then set the number of bytes read to it as an additional parameter
            //If Response, set it to output its own number as an additional parameter
            logMessageLength(obj, buffer.readerIndex() - save);
            //Update Read Start Subscript
            save = buffer.readerIndex();
        }
    } while (true);
    //If empty, more data is required
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    //Return decoded data
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}

DubboCodec decoding

Before proceeding with the code below, let us know about the dubbo request agreement for a better understanding
The protocol header has 16 bytes (0 ~ 127) in size

  • 0-15 digits: magic number

  • 16-bit: Request,1 request, 0 response

  • 17 digits: Whether it's a twoWay or not, from the source code, it seems that the meaning of this twoWay means whether you need to respond or not (incorrectly understood).

  • 18 Bit: Is it an event, indicating a heartbeat event, perhaps the current request or response is from a heartbeat, not an interface consumption request

  • 19-23 bits: Serialized id, used to identify how data is being serialized, such as hessian2

  • 24-31 bits: status, such as ok(20)

  • 32-95 bits: the requested id, from the source code that follows, I find that it will be stored in com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#FUTURES with key as the request ID -> DefaultFuture
    This DefaultFuture is similar to JDK's Future, calling its get method will block until the result is processed

  • 96-127 bits: length of data identifying the requestor or responder

  • 128 and later: Data

public Object com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec#decode(Channel channel, ChannelBuffer buffer) throws IOException {
    //Readable Bytes
    int readable = buffer.readableBytes();
    //HEADER_LENGTH = 16, in dubbo protocol, hessian2 is used as serializer
    //Here 16 bytes is the dubbo protocol header
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    //Read data from buffer into header byte array
    buffer.readBytes(header);
    return decode(channel, buffer, readable, header);
}
                                        |
                                        V
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    //Check magic number, magic number takes up two bytes
    //If the first two bytes are not magic numbers, they will go into the following branches
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        //If there is still readable data, all data for the buffer will be read into the header array here
        if (header.length < readable) {
            //Expand header array to readable size and copy header's original data into a new byte array
            header = Bytes.copyOf(header, readable);
            //Write the rest of the data to the header
            buffer.readBytes(header, length, readable - length);
        }
        //Being able to enter this branch means position 0 is definitely not magic, so start from 1
        for (int i = 1; i < header.length - 1; i++) {
            //Loop to find where magic begins
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                //Set the reading subscript to the beginning of the magic number
                //Why use buffer.readerIndex() - header.length here?Because this i is based on header
                //The subscript is not buffer, so you need to add the difference between this i and (buffer.readerIndex() - header.length)
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                //Save the data before the magic number starts with a new array and assign it to the header
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        //Call the parent class, which is TelnetCodec, for handling telnet commands
        //For example, typing Ctrl + C will close the channel, the up and down keys will send historical commands, and you will find out for yourself if you are interested
        return super.decode(channel, buffer, readable, header);
    }
    // check length.
    //If the first byte is where magic begins and if the current readable value is less than 16 bytes, an identity requiring more data is returned
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    //Reads 4 bytes from the 12th byte (int takes up 4 bytes) and converts them to int values
    //This value identifies the length of data carried by the message body
    int len = Bytes.bytes2int(header, 12);
    //Check the load and throw an exception if it exceeds the set size
    checkPayload(channel, len);
    //Data length plus protocol header length
    int tt = len + HEADER_LENGTH;
    //Tips need more data if there is not enough readable data
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    //Create a stream that starts reading buffer.readerIndex() = 17 and ends at 17 + len (data length)
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        //Resolve Request Body
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

Resolve Request Body

protected Object com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    //flag contains whether a request request is requested, whether a response is required, whether it is a heartbeat event, and the identification of a serialized id
    //proto is used to identify what serialization protocol is
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    //8 bytes after the fourth byte (long takes up 8 bytes) is the request id/response ID
    long id = Bytes.bytes2long(header, 4);
    //Is it a request request request, if not a response to branch into
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        //Create Response Object
        Response res = new Response(id);
        //Is it a heartbeat, and if so, sets the event to a heartbeat
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(Response.HEARTBEAT_EVENT);
        }
        // get status.
        //Response status, such as 20 identifying ok
        byte status = header[3];
        res.setStatus(status);
        try {
            //Internally obtained the corresponding serialization protocol by serializing the id, default is hessian2
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
            //If the response is ok
            if (status == Response.OK) {
                Object data;
                //Is it a heartbeat event?
                if (res.isHeartbeat()) {
                    //Internal direct in.readObject()
                    data = decodeHeartbeatData(channel, in);
                    //Is there anything else?At present this version seems to only have a heartbeat
                } else if (res.isEvent()) {
                    //in.readObject() called directly by internal deserialization
                    data = decodeEventData(channel, in);
                } else {
                    //Used to encapsulate response result sets
                    DecodeableRpcResult result;
                    //Decode in the Current io thread?Default to true
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        result = new DecodeableRpcResult(channel, res, is,
                                (Invocation) getRequestData(id), proto);
                        //Response volume deserialization
                        result.decode();
                    } else {
                        //This is left to be resolved in later multithreads
                        result = new DecodeableRpcResult(channel, res,
                                new UnsafeByteArrayInputStream(readMessageData(is)),
                                (Invocation) getRequestData(id), proto);
                    }
                    data = result;
                }
                //DecodeableRpcResult
                res.setResult(data);
            } else {
                //Read error message if response is unsuccessful
                res.setErrorMessage(in.readUTF());
            }
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode response failed: " + t.getMessage(), t);
            }
            //CLIENT_ERROR = 90
            res.setStatus(Response.CLIENT_ERROR);
            //Exception message
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request.
        //Create Request
        Request req = new Request(id);
        //Set up dubbo protocol version
        req.setVersion(Version.getProtocolVersion());
        //Is response required
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        //Is it a heartbeat
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            //Send serialization, using hessian2 by default
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
            //heartbeat
            if (req.isHeartbeat()) {
                //in.readObject()
                data = decodeHeartbeatData(channel, in);
            } else if (req.isEvent()) {
                //in.readObject()
                data = decodeEventData(channel, in);
            } else {
                //Build a call context that parses out information about the interface, method, parameters, and so on for a brief call
                DecodeableRpcInvocation inv;
                //Is it resolved in the current thread
                if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    //Resolve Request Body
                    inv.decode();
                } else {
                    //Resolve in thread pool
                    inv = new DecodeableRpcInvocation(channel, req,
                            new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            req.setData(data);
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}



Parse the response body (for example, we call a remote interface and the remote service returns data)

public Object DecodeableRpcResult#decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
    //Reads a byte that identifies the type of response result
    byte flag = in.readByte();
    switch (flag) {
        //If the response result is null
        case DubboCodec.RESPONSE_NULL_VALUE:
            break;
        case DubboCodec.RESPONSE_VALUE:
            try {
                //Return type of parsing method
                Type[] returnType = RpcUtils.getReturnTypes(invocation);
                //Return type deserialized values based on interface methods
                setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                        (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                : in.readObject((Class<?>) returnType[0], returnType[1])));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
            //If there is an error calling the remote interface
        case DubboCodec.RESPONSE_WITH_EXCEPTION:
            try {
                Object obj = in.readObject();
                //If this obj is not a type, hee-hee, error
                if (obj instanceof Throwable == false)
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                    //Record exceptions
                setException((Throwable) obj);
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
            //Response to null values, but with additional parameters
        case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
            try {
                //Direct deserialization of additional parameters is sufficient
                setAttachments((Map<String, String>) in.readObject(Map.class));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
            //Normal calls with additional parameters
        case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
            try {
                //Resolve the return value type of the calling method
                Type[] returnType = RpcUtils.getReturnTypes(invocation);
                //Send Serialized Return Value
                setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                        (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                : in.readObject((Class<?>) returnType[0], returnType[1])));
                //Deserialization Additional Parameters
                setAttachments((Map<String, String>) in.readObject(Map.class));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
            //An exception occurred while calling the remote interface, but with parameters attached
        case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
            try {
                Object obj = in.readObject();
                if (obj instanceof Throwable == false)
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                    //First Deserialize Exceptions
                setException((Throwable) obj);
                //Then deserialize the additional parameters
                setAttachments((Map<String, String>) in.readObject(Map.class));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
    }
    //Clear
    if (in instanceof Cleanable) {
        ((Cleanable) in).cleanup();
    }
    return this;
}

From the data parsing the responder above, its responder structure is probably flag (used to indicate whether there are results, whether there are exceptions, etc., taking up a byte)+the returned results/exceptions/additional parameters

In the next section, let's continue with the analysis if the request body is resolved

Keywords: Netty Dubbo codec github

Added by scrypte on Fri, 13 Sep 2019 06:47:47 +0300