1, Heartbeat
What is heartbeat
In TPC, after the client and server establish a connection, they need to send data packets regularly to inform each other that they are still online to ensure the effectiveness of the TPC connection. If a connection has no heartbeat for a long time, it needs to be disconnected in time, otherwise the server will maintain many useless connections and waste the resources of the server.
IdleStateHandler
Netty has provided us with a heartbeat Handler: IdleStateHandler. When the idle time (read or write) of the connection is too long, the IdleStateHandler will trigger an IdleStateEvent event to pass the next Handler. We can handle this event by overriding the usereventtriggered method in the Pipeline Handler. Note that our own Handler needs to be behind the IdleStateHandler.
Let's take a look at the source code of IdleStateHandler.
1. Constructor
The most complete constructor is as follows:
public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { }
Parameter resolution:
- observeOutput: whether to consider the slow outbound. If true: when the outbound time is too long and exceeds the idle time, this event will not be triggered. If false, an event will be triggered when the idle time is exceeded. The default is false.
- readerIdleTime: the time when the read is idle. 0 means that the read idle event is disabled.
- writerIdleTime: write idle time. 0 means write idle event is disabled.
- allIdleTime: the idle time for reading or writing. 0 means to disable the event.
- unit: the first three time units.
2. Event handling
IdleStateHandler inherits ChannelDuplexHandler and rewrites outbound and inbound events. Let's take a look at the code.
initialize(ctx) will be initialized when the channel is added, registered and active. destroy() will be destroyed when it is deleted or inactive. The lastReadTime and lastWriteTime fields will be set when reading and writing.
public class IdleStateHandler extends ChannelDuplexHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive() && ctx.channel().isRegistered()) { initialize(ctx); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { destroy(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isActive()) { initialize(ctx); } super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { initialize(ctx); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { destroy(); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // Judge whether to enable read idle or read-write idle monitoring if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // Setting the reading flag bit reading = true; firstReaderIdleEvent = firstAllIdleEvent = true; } ctx.fireChannelRead(msg); } // After reading @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // Judge whether to enable read idle or read-write idle monitoring, and check the reading flag bit if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { // Set lastReadTime. It is useful to judge the read timeout later lastReadTime = ticksInNanos(); reading = false; } ctx.fireChannelReadComplete(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // Judge whether to enable write idle or read-write idle monitoring if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // The writeListener method is described below, mainly to set lastWriteTime ctx.write(msg, promise.unvoid()).addListener(writeListener); } else { ctx.write(msg, promise); } } private final ChannelFutureListener writeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lastWriteTime = ticksInNanos(); firstWriterIdleEvent = firstAllIdleEvent = true; } }; }
3. Initialization
initialize(ctx) will be initialized when the channel is added, registered and active. Let's take a look at the initialization code:
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
In fact, initialization is very simple. It determines which scheduled tasks to initialize according to the read-write idle time given by the constructor, namely: readeridletimeouttask (read idle timeout task), writeridletimeouttask (write idle timeout task), alliidletimeouttask (read-write idle timeout task).
4. Scheduled tasks
Let's take a look at ReaderIdleTimeoutTask. The principle of the remaining two is similar to that of ReaderIdleTimeoutTask. Interested students can read the source code by themselves.
private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { // Check to see if it timed out long nextDelay = readerIdleTimeNanos; if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // Timeout, restart a new timer, and then trigger the event // Reader is idle - set a new timeout and notify the callback. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { // Construction Event IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); // Trigger event channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // There is no timeout. Set a new timer, but this time it is shorter // Read occurred before the timeout - set a new timeout with shorter delay. readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }
As can be seen from the above code:
① If the read idle times out, restart a timer and trigger the event
② If the read idle does not time out, a new timer with a shorter time (readeridletimenanos - tickesinnanos() - lastreadtime) will be started
5. Trigger event
The above method of triggering the event is: channelIdle. After many codes are pushed, in fact, the following code is finally called:
private void invokeUserEventTriggered(Object event) { if (invokeHandler()) { try { // To trigger an event, to put it bluntly, is to directly call the userEventTriggered method ((ChannelInboundHandler) handler()).userEventTriggered(this, event); } catch (Throwable t) { notifyHandlerException(t); } } else { fireUserEventTriggered(event); } }
In fact, to trigger an event is to pass the event to the next Handler (next), which is to call the userEventTriggered method. Therefore, the Handler that handles the heartbeat must be written to IdleStateHandler.
Implementation of CCX RPC heartbeat
1. Client
IdleStateHandler is placed on the PipleLine registration of the startup class, and the business processor NettyClientHandler is behind it.
public class NettyClient { // ... ignore other codes private NettyClient() { bootstrap = new Bootstrap() // ... omit other code .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); // Set IdleStateHandler heartbeat detection to perform write detection every 5 seconds // If the write() method is not called for more than 5 seconds, the userEventTrigger is called p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)); // encoder p.addLast(new RpcMessageEncoder()); // decoder p.addLast(new RpcMessageDecoder()); // Service processor p.addLast(new NettyClientHandler()); } }); } }
Next, let's take a look at how NettyClientHandler handles heartbeat events:
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> { // ... ignore other codes @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // According to the above configuration, if there is no write request for more than 5 seconds, the writer will be triggered_ Idle event IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { log.info("write idle happen [{}]", ctx.channel().remoteAddress()); Channel channel = ctx.channel(); // After the write idle event is triggered, the heartbeat should occur. // Assembly message RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setSerializeType(SerializeType.PROTOSTUFF.getValue()); rpcMessage.setCompressTye(CompressType.DUMMY.getValue()); rpcMessage.setMessageType(MessageType.HEARTBEAT.getValue()); // Send heartbeat message channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super.userEventTriggered(ctx, evt); } } }
2. Server
Similarly, the IdleStateHandler of the server is placed on the PipleLine registration of the startup class, and the business processor NettyServerHandler is behind it.
public class NettyServerBootstrap { public void start() { ServerBootstrap bootstrap = new ServerBootstrap() // ... ignore other codes .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); // Close the connection if no client request is received within 30 seconds p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)); // Codec p.addLast(new RpcMessageEncoder()); p.addLast(new RpcMessageDecoder()); // RPC message processor p.addLast(serviceHandlerGroup, new NettyServerHandler()); } }); // ... ignore other codes } }
After receiving more than 30 seconds of no read request, the server calls ctx.close to close the connection.
At the same time, if you receive a heartbeat message from the client, you can ignore it directly. If every heartbeat has to respond, it will increase the burden on the server.
The code of NettyServerHandler is as follows
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcMessage requestMsg) { // Do not process heartbeat messages if (requestMsg.getMessageType() != MessageType.REQUEST.getValue()) { return; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // Handling idle state if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { log.info("idle check happen, so close the connection"); ctx.close(); } } else { super.userEventTriggered(ctx, evt); } } }
2, Reconnection mechanism
Many times, the connection between the server and the client is disconnected only because of network problems or slow processing programs, not because the programs hang up. If the client wants to make another request, it can't send it. At this time, a function is needed: when the connection is found to be broken, if you want to write data to the connection, you will automatically reconnect. This is the reconnection mechanism.
If the client wants to request the interface of the server, first obtain the address of the server from the registry, then connect with the server, and then write data.
The simple code is as follows:
protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException { // ... ignore other codes // Server address InetSocketAddress socketAddress = new InetSocketAddress(selected.getHost(), selected.getPort()); // Get connection (Channel) Channel channel = nettyClient.getChannel(socketAddress); // Build message RpcMessage rpcMessage = buildRpcMessage(request); // Write message (send request) channel.writeAndFlush(rpcMessage); }
This nettyClient.getChannel(socketAddress) is the secret of the reconnection mechanism:
/** * Gets the channel connected to the specified address. If it cannot be obtained, it will be connected * * @param address Specify the address to connect to * @return channel */ public Channel getChannel(SocketAddress address) { // Get Channel from cache based on address Channel channel = CHANNEL_MAP.get(address); // If it is not available or the channel has been disconnected, reconnect and put it in CHANNEL_MAP cached if (channel == null || !channel.isActive()) { // connect channel = connect(address); CHANNEL_MAP.put(address, channel); } return channel; }
The code is clear at a glance, that is, channel is used_ Map is used as a cache. If it is found that it cannot be found or has been disconnected, it will be reconnected and placed in channel_ In the map for the next time.
summary
Heartbeat is a means for the server to maintain an effective connection with the client. The client sends a heartbeat packet every short period of time. The server does not respond after receiving it, but will record the last reading time of the client. The server starts the timer to regularly detect that the last time the client reads the request exceeds the configured value. If it exceeds the configured value, an event will be triggered and the connection will be disconnected.
Reconnection mechanism is a mechanism that automatically reconnects when the connection is disconnected.
The heartbeat and reconnection mechanism are combined to make the connection between the server and the client more reasonable. The disconnection saves the server resources, and the reconnection of the reconnection improves the availability.
CCX RPC code is open source
Github: https://github.com/chenchuxin/ccx-rpc
Gitee: https://gitee.com/imccx/ccx-rpc