In the previous section, we analyzed the process of calling an interface. When the interface returns data, Dubbo needs to tell the client which interface it is calling. What happens when Dubbo responds?There is the following code in the com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received method
public void received(Channel channel, Object message) throws RemotingException { . . . . . . ellipsis if (request.isTwoWay()) { //Here we read it in Section 6 //Wrap the return value here as a Response Response response = handleRequest(exchangeChannel, request); //Response Client channel.send(response); } } . . . . . . ellipsis } }
Encoding is required when messages are sent to clients, encoding Response s. When creating a netty service, Dubbo adds three channel processors to the netty channel, NettyServerHandler, com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder, com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder
The following is a class diagram of the InternalEncoder encoder
public void io.netty.handler.codec.MessageToByteEncoder#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { //Check if msg's java type is supported if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; //Request memory to create ByteBuf buf = allocateBuffer(ctx, cast, preferDirect); try { //Code encode(ctx, cast, buf); } finally { //If msg is an application class, you need to release the reference ReferenceCountUtil.release(cast); } //Continue processing on the next channel and write out the data if (buf.isReadable()) { ctx.write(buf, promise); } else { //Release Reference Count buf.release(); //Empty data ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { //Delegate to next channel ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { buf.release(); } } } | V protected void com.alibaba.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalEncoder#encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { //Final call to DubboCodec's encode method via DubboCountCodec codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } } | V public void com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec(DubboCodec Examples)#encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { //If it is a request, encode it, such as when the client sends a request to the service provider or when the service sends a heartbeat to the client if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); //Encoding response } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { //Call parent encoding method super.encode(channel, buffer, msg); } }
To code Response, let's review the dubbo protocol data analyzed in Section 5
-
0-15 digits: magic number
-
16-bit: Request,1 request, 0 response
-
17 digits: Whether it's a twoWay or not, from the source code, it seems that the meaning of this twoWay means whether you need to respond or not (incorrectly understood).
-
18 Bit: Is it an event, indicating a heartbeat event, perhaps the current request or response is from a heartbeat, not an interface consumption request
-
19-23 bits: Serialized id, used to identify how data is being serialized, such as hessian2
-
24-31 bits: status, such as ok(20)
-
32-95 bits: Requested id, from the source code below, I find that it will be stored in com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#FUTURES with key as the request ID -> DefaultFuture and this DefaultFuture is similar to JDK Future, calling its get method will block until the result is processed
-
96-127 bits: length of data identifying the requestor or responder
-
128 and later: Data
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { //Use hessian2 serialization by default Serialization serialization = getSerialization(channel); // header. //Protocol Header, 16 Bytes byte[] header = new byte[HEADER_LENGTH]; // set magic number. //Write Magic Number Bytes.short2bytes(MAGIC, header); // set request and serialization flag. //The lower fifth bit of the second byte is the serialization type header[2] = serialization.getContentTypeId(); //Is it an event type if (res.isHeartbeat()) header[2] |= FLAG_EVENT; // set response status. byte status = res.getStatus(); //The third byte represents the state header[3] = status; // set request id. //Write request id starting at the fourth byte, taking up 8 bytes Bytes.long2bytes(res.getId(), header, 4); //Position the start of writing data after 16 bytes buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); //hessian2 serialization ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // encode response data or error message. if (status == Response.OK) { if (res.isHeartbeat()) { //Coded Heart Rate encodeHeartbeatData(channel, out, res.getResult()); } else { //Coded Response Result Set encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else //If the interface call fails, an error message is written out.writeUTF(res.getErrorMessage()); //Refresh out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); //Check Load checkPayload(channel, len); //Write Length Bytes.int2bytes(len, header, 12); // write //Write protocol header buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { . . . . . . Omit exception handling } }
Coded Write of Response Data Body
protected void com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { Result result = (Result) data; // currently, the version value in Response records the version of Request //Check that the dubbo version supports additional parameters boolean attach = Version.isSupportResponseAttatchment(version); Throwable th = result.getException(); if (th == null) { Object ret = result.getValue(); //No exceptions, returned null if (ret == null) { out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); } else { //Write Return Value out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); out.writeObject(ret); } } else { //Abnormal Inhalation out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); out.writeObject(th); } if (attach) { // returns current version of Response to consumer side. //Write additional parameters result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); out.writeObject(result.getAttachments()); } }
Dubbo's channel handling does not have much logic for sending responses except at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#sent when sending requests
Record Future
public void sent(Channel channel, Object message) throws RemotingException { . . . . . . Omit some code if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); } . . . . . . Omit some code }
Looking at the encoding of the response, what about the request encoding?As we can see when parsing the request body, it's just a reverse process