8. Response to request or response data processing

In the previous section, we analyzed the process of calling an interface. When the interface returns data, Dubbo needs to tell the client which interface it is calling. What happens when Dubbo responds?There is the following code in the com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received method

public void received(Channel channel, Object message) throws RemotingException {
        . . . . . . ellipsis
                if (request.isTwoWay()) {
                    //Here we read it in Section 6
                    //Wrap the return value here as a Response
                    Response response = handleRequest(exchangeChannel, request);
                    //Response Client
                    channel.send(response);
                }
            }
            
             . . . . . . ellipsis
    }
}

Encoding is required when messages are sent to clients, encoding Response s. When creating a netty service, Dubbo adds three channel processors to the netty channel, NettyServerHandler, com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder, com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder

The following is a class diagram of the InternalEncoder encoder

public void io.netty.handler.codec.MessageToByteEncoder#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        //Check if msg's java type is supported
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            //Request memory to create ByteBuf
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                //Code
                encode(ctx, cast, buf);
            } finally {
                //If msg is an application class, you need to release the reference
                ReferenceCountUtil.release(cast);
            }
            //Continue processing on the next channel and write out the data
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                //Release Reference Count
                buf.release();
                //Empty data
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            //Delegate to next channel
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}
                                                            |
                                                            V
protected void com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder#encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
        Channel ch = ctx.channel();
        NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
        try {
            //Final call to DubboCodec's encode method via DubboCountCodec
            codec.encode(channel, buffer, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ch);
        }
    }
                                                            |
                                                            V
public void com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec(DubboCodec Examples)#encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    //If it is a request, encode it, such as when the client sends a request to the service provider or when the service sends a heartbeat to the client
    if (msg instanceof Request) {
        encodeRequest(channel, buffer, (Request) msg);
        //Encoding response
    } else if (msg instanceof Response) {
        encodeResponse(channel, buffer, (Response) msg);
    } else {
        //Call parent encoding method
        super.encode(channel, buffer, msg);
    }
}

To code Response, let's review the dubbo protocol data analyzed in Section 5

  • 0-15 digits: magic number

  • 16-bit: Request,1 request, 0 response

  • 17 digits: Whether it's a twoWay or not, from the source code, it seems that the meaning of this twoWay means whether you need to respond or not (incorrectly understood).

  • 18 Bit: Is it an event, indicating a heartbeat event, perhaps the current request or response is from a heartbeat, not an interface consumption request

  • 19-23 bits: Serialized id, used to identify how data is being serialized, such as hessian2

  • 24-31 bits: status, such as ok(20)

  • 32-95 bits: Requested id, from the source code below, I find that it will be stored in com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#FUTURES with key as the request ID -> DefaultFuture and this DefaultFuture is similar to JDK Future, calling its get method will block until the result is processed

  • 96-127 bits: length of data identifying the requestor or responder

  • 128 and later: Data

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
    int savedWriteIndex = buffer.writerIndex();
    try {
        //Use hessian2 serialization by default
        Serialization serialization = getSerialization(channel);
        // header.
        //Protocol Header, 16 Bytes
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.
        //Write Magic Number
        Bytes.short2bytes(MAGIC, header);
        // set request and serialization flag.
        //The lower fifth bit of the second byte is the serialization type
        header[2] = serialization.getContentTypeId();
        //Is it an event type
        if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
        // set response status.
        byte status = res.getStatus();
        //The third byte represents the state
        header[3] = status;
        // set request id.
        //Write request id starting at the fourth byte, taking up 8 bytes
        Bytes.long2bytes(res.getId(), header, 4);
        //Position the start of writing data after 16 bytes
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        //hessian2 serialization
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        // encode response data or error message.
        if (status == Response.OK) {
            if (res.isHeartbeat()) {
                //Coded Heart Rate
                encodeHeartbeatData(channel, out, res.getResult());
            } else {
                //Coded Response Result Set
                encodeResponseData(channel, out, res.getResult(), res.getVersion());
            }
        } else 
        //If the interface call fails, an error message is written
        out.writeUTF(res.getErrorMessage());
        //Refresh
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        int len = bos.writtenBytes();
        //Check Load
        checkPayload(channel, len);
        //Write Length
        Bytes.int2bytes(len, header, 12);
        // write
        //Write protocol header
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    } catch (Throwable t) {
        . . . . . . Omit exception handling
    }
}

Coded Write of Response Data Body

protected void com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
    Result result = (Result) data;
    // currently, the version value in Response records the version of Request
    //Check that the dubbo version supports additional parameters
    boolean attach = Version.isSupportResponseAttatchment(version);
    Throwable th = result.getException();
    if (th == null) {
        Object ret = result.getValue();
        //No exceptions, returned null
        if (ret == null) {
            out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
        } else {
            //Write Return Value
            out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
            out.writeObject(ret);
        }
    } else {
        //Abnormal Inhalation
        out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
        out.writeObject(th);
    }

    if (attach) {
        // returns current version of Response to consumer side.
        //Write additional parameters
        result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        out.writeObject(result.getAttachments());
    }
}

Dubbo's channel handling does not have much logic for sending responses except at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#sent when sending requests
Record Future

public void sent(Channel channel, Object message) throws RemotingException {
    . . . . . . Omit some code
    if (message instanceof Request) {
        Request request = (Request) message;
        DefaultFuture.sent(channel, request);
    }
    . . . . . . Omit some code
}

Looking at the encoding of the response, what about the request encoding?As we can see when parsing the request body, it's just a reverse process

Keywords: Dubbo encoding Netty codec

Added by FMB on Fri, 13 Sep 2019 06:50:35 +0300