Implementation and thinking of disconnection and reconnection of Netty client

preface

In the implementation of TCP long connection function, disconnection and reconnection of the client is a very common problem. When we use netty to realize disconnection and reconnection, have we considered the following problems:

  • How to listen to the disconnection between the client and the server?
  • How to reconnect after disconnection?
  • How large is netty client thread reasonable?

In fact, the above are the problems encountered by the author when doing disconnection reconnection, and "how large is the netty client thread more reasonable?" This problem is the author's thinking caused by an exception when doing disconnection reconnection. The whole process is described below:

Because the explanation in this section mainly involves the client, but in order for the reader to run the whole program, the server and public dependency and entity classes are given here first.

Server and common code

maven dependency:

<dependencies>
    <!--Just used spring-boot Log framework for-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>2.4.1</version>
    </dependency>

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.56.Final</version>
    </dependency>

    <dependency>
        <groupId>org.jboss.marshalling</groupId>
        <artifactId>jboss-marshalling-serial</artifactId>
        <version>2.0.10.Final</version>
    </dependency>
</dependencies>

Service side business processing code

It is mainly used to record and print the current number of client connections, and return the "hello netty" string after receiving the client information

@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
    public static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
        log.info("Client connection succeeded: client address :{}", ctx.channel().remoteAddress());
        log.info("Current common{}Client connections", channels.size());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server channelRead:{}", msg);
        ctx.channel().writeAndFlush("hello netty");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive: client close");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof java.io.IOException) {
            log.warn("exceptionCaught: client close");
        } else {
            cause.printStackTrace();
        }
    }
}

Server heartbeat check code

After receiving the heartbeat "ping" information, return the client's' pong "information. If the client does not send any information within the specified time, close the client.

public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("server channelRead:{}", msg);
        if (msg.equals("ping")) {
            ctx.channel().writeAndFlush("pong");
        } else {
            //Handled by the next handler, in the example SimpleServerHandler
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //This event requires cooperation io netty. handler. timeout. Idlestatehandler uses
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //If there is no read event after the specified time, close the connection
                log.info("Heart beat time exceeded,Close the connection with the server:{}", ctx.channel().remoteAddress());
                //ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

Codec tool class

The JBoss marshalling serial codec tool is mainly used. You can query its advantages and disadvantages. Here is only an example.

public final class MarshallingCodeFactory {
    /** Create Jboss marshalling decoder */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //The parameter serial indicates that a Java sequence factory object is created, which is provided by JBoss marshalling serial
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
        return new MarshallingDecoder(provider, 1024);
    }

    /** Create Jboss marshalling encoder */
    public static MarshallingEncoder buildMarshallingEncoder() {
        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
        MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
        return new MarshallingEncoder(provider);
    }
}

Public entity class

public class UserInfo implements Serializable {
    private static final long serialVersionUID = 6271330872494117382L;
 
    private String username;
    private int age;

    public UserInfo() {
    }

    public UserInfo(String username, int age) {
        this.username = username;
        this.age = age;
    }
   //Omit getter/setter/toString
}

Let's start with the focus of this paper, client disconnection and reconnection and problem thinking.

Client implementation

  • Synchronous connection is required at the beginning of startup. If it fails to pass within the specified number of connections, an exception is thrown and the process exits.
  • After the client starts, start the scheduled task to simulate the client data transmission.

The client business processing handler receives the data and prints it through the log.

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client receive:{}", msg);
    }
}

Encapsulate the connection method, disconnect method and getChannel() return io netty. channel. Channel is used to send data to the server. boolean connect() is a synchronous connection method. It returns true if the connection is successful and false if the connection fails.

public class NettyClient {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);

    private EventLoopGroup workerGroup;
    private Bootstrap bootstrap;
    private volatile Channel clientChannel;

    public NettyClient() {
        this(-1);
    }

    public NettyClient(int threads) {
        workerGroup = threads > 0 ? new NioEventLoopGroup(threads) : new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
                .handler(new ClientHandlerInitializer(this));
    }

    public boolean connect() {
        log.info("Trying to connect to the server: 127.0.0.1:8088");
        try {
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);

            boolean notTimeout = channelFuture.awaitUninterruptibly(30, TimeUnit.SECONDS);
            clientChannel = channelFuture.channel();
            if (notTimeout) {
                if (clientChannel != null && clientChannel.isActive()) {
                    log.info("netty client started !!! {} connect to server", clientChannel.localAddress());
                    return true;
                }
                Throwable cause = channelFuture.cause();
                if (cause != null) {
                    exceptionHandler(cause);
                }
            } else {
                log.warn("connect remote host[{}] timeout {}s", clientChannel.remoteAddress(), 30);
            }
        } catch (Exception e) {
            exceptionHandler(e);
        }
        clientChannel.close();
        return false;
    }

    private void exceptionHandler(Throwable cause) {
        if (cause instanceof ConnectException) {
            log.error("Connection exception:{}", cause.getMessage());
        } else if (cause instanceof ClosedChannelException) {
            log.error("connect error:{}", "client has destroy");
        } else {
            log.error("connect error:", cause);
        }
    }

    public void close() {
        if (clientChannel != null) {
            clientChannel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    public Channel getChannel() {
        return clientChannel;
    }

    static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
        private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
        private NettyClient client;

        public ClientHandlerInitializer(NettyClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
            pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
            //pipeline.addLast(new IdleStateHandler(25, 0, 10));
            //pipeline.addLast(new ClientHeartbeatHandler());
            pipeline.addLast(new SimpleClientHandler(client));
        }
    }
}

Client startup class

public class NettyClientMain {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClientMain.class);
    private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        boolean connect = false;
        //Try to connect 10 times at the beginning of startup. If the connection cannot be established, you will not try again
        //If you want to keep trying to connect after just starting, you need to put it in the thread and execute asynchronously to prevent blocking the program
        for (int i = 0; i < 10; i++) {
            connect = nettyClient.connect();
            if (connect) {
                break;
            }
            //The connection is not successful. Try to connect again after an interval of 5s
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        if (connect) {
            log.info("Send data regularly");
            send(nettyClient);
        } else {
            nettyClient.close();
            log.info("process exit");
        }
    }

    /** Send data regularly */
    static void send(NettyClient client) {
        scheduledExecutor.schedule(new SendTask(client,scheduledExecutor), 2, TimeUnit.SECONDS);
    }
}

Client disconnection and reconnection

Disconnection and reconnection requirements:

  • If the network between the server and the client is abnormal, or the response times out (for example, there is a long fullGC), the client needs to actively reconnect to other nodes.
  • When the server goes down or any exception occurs with the client, the client needs to actively reconnect to other nodes.
  • When the server actively sends (server) offline notification to the client, the client needs to actively reconnect to other nodes.

How to listen to the disconnection between the client and the server?

Netty's io netty. channel. The channelinboundhandler interface provides us with many important interface methods. To avoid implementing all interface methods, you can inherit io netty. channel. Channel inboundhandleradapter to override the corresponding method.

1.void channelInactive(ChannelHandlerContext ctx); Called when the client is closed, indicating that the client is disconnected. Triggered when the following conditions occur:

  • When the client is in the normal active state, it actively calls the close method of channel or ctx.
  • The server actively calls the close method of channel or ctx to close the client connection.
  • A Java. Net error occurred io. IOException (in general, both parties are disconnected) or Java Lang. outofmemoryerror (new in version 4.1.52)

2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; Is called when any exception occurs in the stack. If the exception is Java io. IOException or Java Lang. outofmemoryerror (new in version 4.1.52) will also trigger the channelInactive method, that is, the third case in which channelInactive is triggered above.

3. Heartbeat check is also a necessary way to check the connection status between the client and the server, because in some states, both ends have actually been disconnected, but the client cannot perceive it. At this time, it is necessary to judge the connection status of both ends through heartbeat. Heartbeat can be client heartbeat and server heartbeat.

  • Client hop: send heartbeat ping information to the client, and the server replies to the pong information. In this way, if there is data interaction between the two sides within the specified time, it is considered to be a normal connection state.
  • Server information: the server sends ping information to the client, and the client replies with pong information. If no reply is received within the specified time, the other party is considered offline.

Netty provides us with a very simple way to check the heartbeat. We only need to add io on the handler chain of the channel netty. handler. timeout. Idlestatehandler can be implemented.

IdleStateHandler has the following important parameters:

  • readerIdleTimeSeconds, read timeout That is, when no data is read from the Channel within the specified time interval, a reader will be triggered_ IdleStateEvent event event of idle
  • writerIdleTimeSeconds, write timeout That is, when no data is written to the Channel within the specified time interval, a writer will be triggered_ IdleStateEvent event event of idle
  • allIdleTimeSeconds, read / write timeout When there is no read or write operation within the specified time interval, an all is triggered_ IdleStateEvent event event of idle

In order to listen to the triggering of these events, you also need to override the ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt) method to judge the event type through the parameter evt. If there is no reading or writing within the specified time, a heartbeat ping request is sent. If there is no reading operation within the specified time, the task has been disconnected from the server. Then call the close method of channel or ctx to make the client Handler execute the channelInactive method.

So far, it seems that we only need to implement our own reconnection logic in the two methods of channelInactive and exceptionguess, but the author encountered the first pit and the reconnection method was executed twice.

Let's look at the sample code and results at com bruce. netty. rpc. client. Add the following code to simpleclienthandler:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    //Omit part of the code
    /** This method is executed when the client goes offline normally */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
        reconnection(ctx);
    }

    /** Execute exceptioncaution when an exception occurs on the stack */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:client[{}]Disconnect from remote", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        reconnection(ctx);
    }

    private void reconnection(ChannelHandlerContext ctx) {
        log.info("5s Then re-establish the connection");
        //Temporarily empty implementation
    }
}

Add IO to ClientHandlerInitializer + netty. handler. timeout. Idlestatehandler is used for heartbeat check, and ClientHeartbeatHandler is used to listen for heartbeat events and receive heartbeat pong replies.

static class ClientHandlerInitializer extends ChannelInitializer<SocketChannel> {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
    private NettyClient client;

    public ClientHandlerInitializer(NettyClient client) {
        this.client = client;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
        pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
        //If there is no read operation within 25s, reader is triggered_ Idle event
        //If there is neither read nor write operation within 10s, all is triggered_ Idle event
        pipeline.addLast(new IdleStateHandler(25, 0, 10));
        pipeline.addLast(new ClientHeartbeatHandler());
        pipeline.addLast(new SimpleClientHandler(client));
    }
}

com.bruce.netty.rpc.client.ClientHeartbeatHandler

public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.equals("pong")) {
            log.info("Heartbeat reply received");
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //This event requires cooperation io netty. handler. timeout. Idlestatehandler uses
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.ALL_IDLE) {
                //Send heartbeat detection to the server
                ctx.writeAndFlush("ping");
                log.info("Send heartbeat data");
            } else if (idleStateEvent.state() == IdleState.READER_IDLE) {
                //If there is no read event after the specified time, close the connection
                log.info("Heart beat time exceeded,Close the connection with the server:{}", ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

Start the server side first and then the client side. After the connection is successful, kill the server side process.

It can be seen from the client log that the exceptionguess method is executed first and then the channelInactive method is executed, but the reconnection method is called in both methods, resulting in two reconnections at the same time.

Why does the exceptionguess method execute and the channelInactive method execute?

We can add breakpoints in the exceptionguess and channelInactive methods to view the source code step by step

After NioEventLoop performs the select operation, it processes the corresponding SelectionKey. After an exception occurs, it will call abstractniobytechannel Niobyteunsafe#handlereadexception method to process and trigger pipeline Fireexceptioncause (cause), and finally call the fireexceptioncause method of the user handler.

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
  RecvByteBufAllocator.Handle allocHandle) {
 if (byteBuf != null) {
  if (byteBuf.isReadable()) {
   readPending = false;
   pipeline.fireChannelRead(byteBuf);
  } else {
   byteBuf.release();
  }
 }
 allocHandle.readComplete();
 pipeline.fireChannelReadComplete();
 pipeline.fireExceptionCaught(cause);

 // If oom will close the read event, release connection.
 // See https://github.com/netty/netty/issues/10434
 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
  closeOnRead(pipeline);
 }
}

Finally, this method will judge the exception type and execute the close connection method. In the scenario of disconnection, this is Java io. IOException, so the close method is executed when debug to abstractchannel In the abstractunsafe #close (channelpromise, throwable, closedchannelexception, notify) method, it will be found that the abstractunsafe #fireChannelInactive and register method is called at last. Continue to debug, and finally execute the custom fireChannelInactive method.

Here we can summarize a knowledge point: in netty, when the fireexceptioncaution method of handler is executed, it may or may not trigger fireChannelInactive.

In addition to netty judging whether to execute the close method according to the exception type, developers can also call the close method through ctx or channel. The code is as follows:

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof IOException) {
        log.warn("exceptionCaught:client[{}]Disconnect from remote", ctx.channel().localAddress());
    } else {
        log.error(cause);
    }
    //ctx.close();
    ctx.channel().close();
}

But does this display call the close method necessarily trigger the call of fireChannelInactive?

If so, you only need to call the close method in exceptionCaught and do the logic of reconnection in fireChannelInactive!!

In the log, I observed that calling the close method in exceptionCaught calls the fireChannelInactive method every time. But looking at the source code, the author believes that this is not necessarily because in abstractchannel Abstract unsafe #close (channelpromise, throwable, closedchannelexception, notify) will call io netty. channel. Channel #isactive judges that the firechannelinactive method will be executed only if it is true.

//io.netty.channel.socket.nio.NioSocketChannel#isActive
@Override
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

How to solve the problem of executing twice at the same time?

During the initialization of netty, we will add a series of handler processors. These handlers will actually be encapsulated in the DefaultChannelPipeline when netty creates the Channel object (NioSocketChannel), and the DefaultChannelPipeline is actually a two-way linked list. The head node is TailContext and the tail node is TailContext, The middle nodes are the handlers we added (encapsulated into DefaultChannelHandlerContext). When the method on the Pipeline is executed, the handler will be traversed from the linked list for execution. Therefore, when the exceptionguess method is executed, we only need to remove the custom handlers on the linked list in advance, and the firechaininactive method cannot be executed.

Finally, the implementation code is as follows:

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {

    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
  ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnection(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:client[{}]Disconnect from remote", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        //ctx.close();
        ctx.channel().close();
        reconnection(ctx);
    }
}

The execution effect is as follows. You can see that when an exception occurs, only the exceptionguess method is executed, the last connected resource is closed through the channel, and the fireChannelInactive method of the current handler is not executed.

How to reconnect after disconnection?

Through the above analysis, we already know what method to implement our reconnection logic, but how to implement it. With curiosity, we searched the implementation schemes of major coders. Mostly through CTX channel(). eventLoop(). Schedule adds a scheduled task to call the connection method of the client. The author also refers to the implementation code of this method as follows:

private void reconnection(ChannelHandlerContext ctx) {
 log.info("5s Then re-establish the connection");
 ctx.channel().eventLoop().schedule(new Runnable() {
  @Override
  public void run() {
   boolean connect = client.connect();
   if (connect) {
    log.info("Reconnection succeeded");
   } else {
    reconnection(ctx);
   }
  }
 }, 5, TimeUnit.SECONDS);
}

Test: start the server side first, and then the client side. After the connection is successful, kill the server side process. The client performs reconnection regularly as scheduled, but it also goes to the tea room to pour a cup of water. After returning, the following exceptions are found.

......Omit 14 identical retry logs
[2021-01-17 18:46:45.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s Then re-establish the connection
[2021-01-17 18:46:48.032] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : Trying to connect to the server: 127.0.0.1:8088
[2021-01-17 18:46:50.038] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : Connection exception:Connection refused: no further information: /127.0.0.1:8088
[2021-01-17 18:46:50.038] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.SimpleClientHandler] : 5s Then re-establish the connection
[2021-01-17 18:46:53.040] INFO   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : Trying to connect to the server: 127.0.0.1:8088
[2021-01-17 18:46:53.048] ERROR   [nioEventLoopGroup-2-1] [com.bruce.netty.rpc.client.NettyClient] : connect error:
io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@10122121(incomplete)
 at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462)
 at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159)
 at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667)
 at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305)
 at com.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49)
 at com.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65)
 at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
 at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
 at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
 at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)

According to the exception stack, it can be found that it is com bruce. netty. rpc. client. The waiting method is called in the NettyClient#connect method.

boolean notTimeout = channelFuture.awaitUninterruptibly(20, TimeUnit.SECONDS);

The method will detect whether the synchronization wait is performed on the io thread, which will cause the exception BlockingOperationException to be thrown.

@Override
protected void checkDeadLock() {
    if (channel().isRegistered()) {
        super.checkDeadLock();
    }
}
protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(toString());
    }
}

The strange thing is, why not throw this exception every time you try to reconnect, but every 16 times?

This reminds me that my notebook is an 8-core processor, while netty's default thread pool is 2 * c, that is, 16 threads, which seems to be related.

In fact, when calling channelfuture, channelfuture = bootstrap connect("127.0.0.1", 8088);, Netty first creates an IO netty. channel. Channel (niosocketchannel in the example), and then through io netty. util. concurrent. EventExecutorChooserFactory. Eventexecutorchooser successively selects a NioEventLoop and binds the channel to the NioEventLoop.

io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop

//Return true if the given Thread is executed in the event loop, false otherwise.
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

The reconnection method is called on a NioEventLoop (i.e. io thread). The first reconnection actually selects the second NioEventLoop, the second reconnection actually selects the third NioEventLoop, and so on. When the first NioEventLoop is re selected after a round of selection, boolean inEventLoop() returns true, and a BlockingOperationException is thrown.

Option 1

Do not perform synchronous connection on the io thread of netty. Use a separate thread pool to retry periodically. This thread can also perform its own business logic operation of reconnection without blocking the io thread. (destroy the thread pool after business operations are not required).

com.bruce.netty.rpc.client.SimpleClientHandler modifies the reconnection method

private static ScheduledExecutorService SCHEDULED_EXECUTOR;

private void initScheduledExecutor() {
 if (SCHEDULED_EXECUTOR == null) {
  synchronized (SimpleClientHandler.class) {
   if (SCHEDULED_EXECUTOR == null) {
    SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(r -> {
     Thread t = new Thread(r, "Client-Reconnect-1");
     t.setDaemon(true);
     return t;
    });
   }
  }
 }
}

private void reconnection(ChannelHandlerContext ctx) {
 log.info("5s Then re-establish the connection");
 initScheduledExecutor();

 SCHEDULED_EXECUTOR.schedule(() -> {
  boolean connect = client.connect();
  if (connect) {
   //The connection is successful. Close the thread pool
   SCHEDULED_EXECUTOR.shutdown();
   log.info("Reconnection succeeded");
  } else {
   reconnection(ctx);
  }
 }, 3, TimeUnit.SECONDS);
}

Option 2

Asynchronous reconnection can be used on io threads:

com.bruce.netty.rpc.client.NettyClient adds the connectAsync method. The difference between the two is that the connectAsync method does not call the synchronization waiting method of channelFuture. Instead, the listener is changed to a channel future listener. In fact, the listener runs on the io thread.

 public void connectAsync() {
    log.info("Trying to connect to the server: 127.0.0.1:8088");
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);
    channelFuture.addListener((ChannelFutureListener) future -> {
        Throwable cause = future.cause();
        if (cause != null) {
            exceptionHandler(cause);
            log.info("Wait for the next reconnection");
            channelFuture.channel().eventLoop().schedule(this::connectAsync, 5, TimeUnit.SECONDS);
        } else {
            clientChannel = channelFuture.channel();
            if (clientChannel != null && clientChannel.isActive()) {
                log.info("Netty client started !!! {} connect to server", clientChannel.localAddress());
            }
        }
    });
}

com.bruce.netty.rpc.client.SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {
    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
    private NettyClient client;

    public SimpleClientHandler(NettyClient client) {
        this.client = client;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("client receive:{}", msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("channelInactive:{}", ctx.channel().localAddress());
        ctx.pipeline().remove(this);
        ctx.channel().close();
        reconnectionAsync(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            log.warn("exceptionCaught:client[{}]Disconnect from remote", ctx.channel().localAddress());
        } else {
            log.error(cause);
        }
        ctx.pipeline().remove(this);
        ctx.close();
        reconnectionAsync(ctx);
    }

    private void reconnectionAsync(ChannelHandlerContext ctx) {
        log.info("5s Then re-establish the connection");
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                client.connectAsync();
            }
        }, 5, TimeUnit.SECONDS);
    }
}

How large is netty client thread reasonable?

The number of threads created by a NioEventLoopGroup in netty by default is the number of cpu cores * 2. These threads are used for io operations. Do you really need so many io threads for client applications?

When analyzing the BlockingOperationException above, we found that after creating a Channel object, netty will only select one NioEventLoop from the NioEventLoopGroup to bind. Only when multiple channels are created will the next NioEventLoop be selected in turn, that is, a Channel will only correspond to one NioEventLoop, and NioEventLoop can bind multiple channels.

1. For the client, if only one server node is connected, only one thread can be set. Even in case of disconnection and reconnection, the previous Channel will be removed from NioEventLoop after the connection is disconnected. After reconnection, only a new Channel will be registered in the only NioEventLoop.

2. If the client calls IO several times at the same time in the following ways: netty. bootstrap. Bootstrap#connect (string inethost, int inetport) connects multiple Server nodes, so the thread can be set larger, but not more than 2*c. In addition, as long as there is disconnection and reconnection, it can not be guaranteed that each NioEventLoop will be bound to a client Channel.

 public boolean connect() {
      try {
          ChannelFuture channelFuture1 = bootstrap.connect("127.0.0.1", 8088);
          ChannelFuture channelFuture2 = bootstrap.connect("127.0.0.1", 8088);
          ChannelFuture channelFuture3 = bootstrap.connect("127.0.0.1", 8088);
      } catch (Exception e) {
          exceptionHandler(e);
      }
      clientChannel.close();
      return false;
  }

3. What is the impact if the number of netty client threads is set to greater than 1?

Obvious exceptions will certainly not exist, but they will cause a waste of resources. First, multiple NioEventLoop objects will be created, but NioEventLoop is in a non running state. In case of disconnection and reconnection, the next NioEventLoop will be selected when reconnecting, and the create / start thread will always be in runnable state. The last NioEventLoop is always in the runnable state. Because the last Channel has been close d, the result of each selection will be empty and meaningless empty polling.

The following is a total of 5 NioEventLoop threads created by the netty client after 4 disconnection reconnections using the default number of threads, but actually only the fifth thread is performing read-write operations.

4. If the client has time-consuming business logic, the business thread pool should be used separately to avoid time-consuming logic processing in the io thread of netty.

summary

This article mainly explains the two implementation schemes of netty disconnection and reconnection, as well as the abnormal problems encountered in the implementation process. By analyzing the problems, let us understand the implementation details of netty.

Keywords: Java JavaEE Back-end

Added by curlious on Thu, 06 Jan 2022 15:19:28 +0200