java network programming - netty (heartbeat detection and reconnection mechanism)

1, What is Netty?

Netty is a client / server framework that uses Java's advanced network capabilities to hide the complexity behind it and provide an easy-to-use API.
Netty is a widely used Java network programming framework (netty won the Duke's Choice Award in 2011, see https://www.java.net/dukeschoice/2011 ). It is active and growing in user communities, such as large companies Facebook and Instagram, as well as popular open source projects such as infinispan, hornetq and vert x. Apache Cassandra and Elasticsearch all use their powerful core code for network abstraction.

2, What's the difference between Netty and Tomcat?

The biggest difference between netty and Tomcat lies in the communication protocol. Tomcat is based on HTTP protocol. Its essence is a web container based on HTTP protocol. However, netty is different. It can customize various protocols through programming, because netty can encode / decode word throttling through codec and complete functions similar to redis access. This is the biggest difference between netty and Tomcat.

Some people say that the performance of netty must be higher than that of tomcat. In fact, it is not. tomcat starts from 6 X supports nio mode at the beginning, and then there is APR mode - a mode of calling apache network library through jni. Compared with the old bio mode, the concurrency performance has been greatly improved, especially APR mode. Whether netty has higher performance than tomcat depends on the technical strength of the author of netty program.

3, Why is Netty popular?

As mentioned in the first part, netty is a framework favored by large companies. In my opinion, there are three reasons why netty can be favored:

  1. High concurrency
  2. Fast transmission
  3. Sealed

4, Why does Netty have high concurrency

Netty is a network communication framework developed based on NIO (non Blocking I/O). Compared with BIO (Blocking I/O), its concurrency performance has been greatly improved. The two figures let you understand the difference between BIO and NIO:

 

 

 

 

It can be seen from these two figures that NIO's single thread can handle a lot more connections than BIO. Why can a single thread handle more connections? The reason is the Selector in Figure 2.
After a connection is established, it has two steps to do. The first step is to receive all the data sent by the client. The second step is to return the response to the client after the server processes the request business. The difference between NIO and BIO is mainly in the first step.
In BIO, the process of waiting for the client to send data is blocked, which leads to the situation that a thread can only process one request, and the maximum number of threads that the machine can support is limited, which is why BIO cannot support high concurrency.
In NIO, after a Socket is established, the Thread does not block to accept the Socket, but gives the request to the Selector, who will constantly traverse all sockets. Once a Socket is established, he will notify the Thread, and then the Thread will process the data and return it to the client - this process is not blocked, This allows one Thread to handle more requests.
The following two diagrams are BIO based processing flow and netty based processing flow to help you understand the differences between the two methods:


 

 

 

In addition to BIO and NIO, there are other IO models. The following figure shows the processing flow of five IO models:

 

 

  • BIO, synchronous blocking IO, blocking the whole step. If there are few connections, its delay is the lowest, because a thread only processes one connection. It is suitable for scenarios with few connections and low delay, such as database connections.
  • NIO, synchronous non blocking IO, blocks service processing but does not block data reception. It is suitable for scenarios with high concurrency and simple processing, such as chat software.
  • For multiplexing IO, its two-step processing is separate, that is, for a connection, its data reception is completed by thread a and data processing is completed by thread b. It can process more requests than BIO.
  • Signal driven io. This IO model is mainly used in embedded development and will not be discussed.
  • Asynchronous IO, its data request and data processing are asynchronous. The data request is returned once, which is suitable for the business scenario of long connection.

5, Why does Netty transmit fast

Netty's fast transmission actually depends on a feature of NIO - zero copy. As we know, JAVA memory includes heap memory, stack memory, string constant pool, etc. among them, heap memory is the largest piece of memory and the place where Java objects are stored. Generally, if our data needs to be read from IO to heap memory, it needs to pass through Socket buffer in the middle, that is, a data will be copied twice to reach its destination, If the amount of data is large, it will cause unnecessary waste of resources.
For this situation, netty uses another feature of NIO - zero copy. When he needs to receive data, he will open up a memory outside the heap memory, and the data will be directly read from IO to that memory. In netty, these data can be directly operated through ByteBuf, so as to speed up the transmission speed.


 

 

 

Vi. why does Netty package well?

Why Netty is well encapsulated can't be explained in words. Go directly to the code:

 

 

ByteBuf data structure

 

It has three modes of use:

 

7, Actual combat

1. Introduce related dependencies

 

 

  • Blocking I/O
    public class PlainOioServer {
    
        public void serve(int port) throws IOException {
            final ServerSocket socket = new ServerSocket(port);     //1
            try {
                for (;;) {
                    final Socket clientSocket = socket.accept();    //2
                    System.out.println("Accepted connection from " + clientSocket);
    
                    new Thread(new Runnable() {                        //3
                        @Override
                        public void run() {
                            OutputStream out;
                            try {
                                out = clientSocket.getOutputStream();
                                out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            //4
                                out.flush();
                                clientSocket.close();                //5
    
                            } catch (IOException e) {
                                e.printStackTrace();
                                try {
                                    clientSocket.close();
                                } catch (IOException ex) {
                                    // ignore on close
                                }
                            }
                        }
                    }).start();                                        //6
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

     

  • non-blocking IO
    public class PlainNioServer {
        public void serve(int port) throws IOException {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket ss = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(port);
            ss.bind(address);                                            //1
            Selector selector = Selector.open();                        //2
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //3
            final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
            for (;;) {
                try {
                    selector.select();                                    //4
                } catch (IOException ex) {
                    ex.printStackTrace();
                    // handle exception
                    break;
                }
                Set<SelectionKey> readyKeys = selector.selectedKeys();    //5
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        if (key.isAcceptable()) {                //6
                            ServerSocketChannel server =
                                    (ServerSocketChannel)key.channel();
                            SocketChannel client = server.accept();
                            client.configureBlocking(false);
                            client.register(selector, SelectionKey.OP_WRITE |
                                    SelectionKey.OP_READ, msg.duplicate());    //7
                            System.out.println(
                                    "Accepted connection from " + client);
                        }
                        if (key.isWritable()) {                //8
                            SocketChannel client =
                                    (SocketChannel)key.channel();
                            ByteBuffer buffer =
                                    (ByteBuffer)key.attachment();
                            while (buffer.hasRemaining()) {
                                if (client.write(buffer) == 0) {        //9
                                    break;
                                }
                            }
                            client.close();                    //10
                        }
                    } catch (IOException ex) {
                        key.cancel();
                        try {
                            key.channel().close();
                        } catch (IOException cex) {
                            // Ignore on close
                        }
                    }
                }
            }
        }
    }

     

  • Netty
    public class NettyOioServer {
    
        public void server(int port) throws Exception {
            final ByteBuf buf = Unpooled.unreleasableBuffer(
                    Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
            EventLoopGroup group = new OioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();        //1
    
                b.group(group)                                    //2
                 .channel(OioServerSocketChannel.class)
                 .localAddress(new InetSocketAddress(port))
                 .childHandler(new ChannelInitializer<SocketChannel>() {//3
                     @Override
                     public void initChannel(SocketChannel ch) 
                         throws Exception {
                         ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                             @Override
                             public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                 ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                             }
                         });
                     }
                 });
                ChannelFuture f = b.bind().sync();  //6
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully().sync();        //7
            }
        }
    }

    In terms of the amount of code, Netty has killed the traditional Socket programming, but this part is broad and profound. Just posting a few codes can't explain the problem. Here, I'll introduce some important concepts of Netty to make you understand Netty better.

  • Channel
    There are four concepts related to data transmission stream and channel. The previous figure allows you to understand the channel in netty.

     

     

    Channel overview

    • Channel refers to a connection. It can be understood that each request is a channel.
    • ChannelHandler, the core processing business, is here to process business requests.
    • ChannelHandlerContext is used to transmit business data.
    • ChannelPipeline is used to save the ChannelHandler and ChannelHandlerContext needed in the processing process.
  • ByteBuf
    ByteBuf is a container for storing bytes. Its biggest feature is easy to use. It not only has its own read index and write index to facilitate you to read and write the whole byte cache, but also supports get/set to facilitate you to read and write each byte. Its data structure is shown in the following figure:
  • Heap Buffer Heap Buffer
    Heap buffer is the most commonly used mode of ByteBuf, which stores data in heap space.
  • Direct Buffer
    Direct buffer is another common mode of ByteBuf. Its memory allocation does not occur in the heap, jdk1 The ByteBuffer class of nio introduced in 4 allows the jvm to allocate memory through local method calls, which has two advantages
    • Improve the IO processing speed by eliminating the memory copy of intermediate exchange; The contents of the direct buffer can reside outside the heap scanned by garbage collection.
    • DirectBuffer uses memory other than Heap under the size limit of - XX:MaxDirectMemorySize=xxM. GC can do nothing about it, which means that it avoids the impact of frequent GC processes on application threads under high load
  • Composite Buffer
    The composite buffer is equivalent to multiple views of different bytebufs, which is provided by netty. jdk does not provide such a function.
  • Codec
    The codec / decoder in Netty, through which you can complete the conversion between bytes and pojo, pojo and pojo, so as to achieve the purpose of customizing the protocol.
    The most famous in Netty are HttpRequestDecoder and HttpResponseEncoder.

7, Actual combat

1. Introduce related dependencies

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.pubing</groupId>
  <artifactId>helloNetty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
    </dependencies>
  
  
</project>

2. Server

package com.zhouzhiyao.netty;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
 
//When the client sends a request, the server will return Hello Netty
public class HelloNettyServer {
    public static void main(String[] args) throws InterruptedException {
        /**
         * Define a pair of thread groups (two thread pools)
         * 
         */
        //The main thread group is used to receive the link from the client without any processing
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //Define the slave thread group. The master thread group will transfer the task to the slave thread group for processing
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            /**
             * Service startup class, automatic processing of task allocation
             * 
             */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //You need to target a previous thread model (master-slave thread is defined above)
            serverBootstrap.group(bossGroup, workerGroup)
                //Set up two-way channel of NIO
                .channel(NioServerSocketChannel.class)
                //Sub processor for processing workerGroup
                /**
                 * Set chanel initializer
                 * Each chanel consists of multiple handler s to form a pipeline
                 */
                .childHandler(new HelloNettyServerInitializer());    
            
            /**
             * start-up
             * 
             */
            //Bind the port and set it to synchronous mode. It is an asynchronous chanel
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            
            /**
             * close
             */
            //Get the chanel corresponding to a client, close and set the synchronization mode
            future.channel().closeFuture().sync();
            
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            //Use an elegant way to close
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
        
        
        
        
    
        
    }
}

3. Create initializer HelloNettyServerInitializer

package com.zhouzhiyao.netty;
 
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
 
/**
 * This is an initializer. After chanel is registered, it will execute the corresponding initialization methods (that is, add handler s one by one)
 * 
 * @author phubing
 *
 */
public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{
 
    //Initialize chanel
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //Get the corresponding pipeline through socketChannel
        ChannelPipeline channelPipeline = socketChannel.pipeline();
        /**
         * pipeline There will be many handler classes (also known as interceptor classes) in
         * After obtaining the pipeline, you can directly Add: add the handler developed by yourself or provided by netty
         * 
         */
        //Generally speaking, you can add it to last, add a handler and call it HttpServerCodec
        //When the request reaches the server, it needs to decode and respond to the client for coding
        channelPipeline.addLast("HttpServerCodec", new HttpServerCodec());
        //Add a custom custom handler, which returns Hello Netty
        channelPipeline.addLast("customHandler", new CustomHandler());
        
    }
    
}

4. Create custom processor CustomHandler

package com.zhouzhiyao.netty;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
 
/**
 * Create a custom helper class
 * @author phubing
 *
 */
//The processing request is: the client sends data to the server, puts the data in the buffer, and the server reads it from the buffer, which is similar to [stack, entry]
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http request, so HttpObject is used
 
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel register");
        super.channelRegistered(ctx);
    }
 
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel register");
        super.channelUnregistered(ctx);
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel Active state");
        super.channelActive(ctx);
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("After the client is disconnected from the server");
        super.channelInactive(ctx);
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel Finished reading data");
        super.channelReadComplete(ctx);
    }
 
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("User event trigger");
        super.userEventTriggered(ctx, evt);
    }
 
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel Writable event change");
        super.channelWritabilityChanged(ctx);
    }
 
    @Override
    //The channel is abnormal. If it is not closed, the performance will decline with the gradual increase of abnormal channels
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("capture channel abnormal");
        super.exceptionCaught(ctx, cause);
    }
 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Helper class addition");
        super.handlerAdded(ctx);
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Helper class removal");
        super.handlerRemoved(ctx);
    }
 
    /**
     * ChannelHandlerContext: Context object
     * 
     * 
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        //Get current channel
        Channel currentChannel = ctx.channel();
        
        //Judge whether msg is an HttpRequest request type
        if(msg instanceof HttpRequest) {
 
            //Client remote address
            System.out.println(currentChannel.remoteAddress());
            /**
             * 
             * Without judging the type, the remote address printed on the console is as follows:
             * 
                /0:0:0:0:0:0:0:1:5501
                /0:0:0:0:0:0:0:1:5501
                /0:0:0:0:0:0:0:1:5502
                /0:0:0:0:0:0:0:1:5502
                /0:0:0:0:0:0:0:1:5503
                /0:0:0:0:0:0:0:1:5503
             * 
             * The reason is that the received MSG does not make type judgment
             * 
             * 
             * Increased judgment, why print twice?
             * 
                /0:0:0:0:0:0:0:1:5605
                /0:0:0:0:0:0:0:1:5605
             * 
             * When you open the browser's network, you will find that the client has made two requests to the server:
             *     1,The first time is needed
             *  2,The second time is an icon
             *  Because there is no routing (equivalent to requestMapping in spring MVC), as long as the request is initiated, it will go to the handler
             * 
             */
            /**
             * In Linux, you can also send a request through CURL native Ip: port number (only print once, clean request)            
             */
            
            //Define the message to be sent (not directly, but to copy the data to the buffer and pass through the buffer)
            //Unpooed: it is a deep copy specially used for copying Buffer. There can be one or more
            //CharsetUtil.UTF_8: Provided by Netty
            ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8);
            
            //Build an HttpResponse to respond to the client
            FullHttpResponse response = 
                    /**
                     * params1:Version number for Http
                     * params2:Status (response success or failure)
                     * params3:content
                     */
                    //HttpVersion.HTTP_1_1: Keep alive is enabled by default
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            //Set the current content length, type, etc
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            //readableBytes: readable length
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            
            //Brush the response to the client through the long text object
            ctx.writeAndFlush(response);
            
        }
        
    }
 
}

5. Client

public class NettyClient extends SimpleChannelInboundHandler<Response> {

    private final String ip;
    private final int port;
    private Response response;

    public NettyClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
        this.response = response;
    }

    public Response client(Request request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            // Create and initialize the Netty client Bootstrap object
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast(new RpcDecoder(Response.class));
                    pipeline.addLast(new RpcEncoder(Request.class));
                    pipeline.addLast(NettyClient.this);
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);


//            String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

            // Connect to RPC server
            ChannelFuture future = bootstrap.connect(ip, port).sync();

            // Write RPC request data and close the connection
            Channel channel = future.channel();

            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();

            return response;
        } finally {
            group.shutdownGracefully();
        }
    }

6. Heartbeat mechanism

Example of netty heartbeat mechanism: netty is used to implement the heartbeat mechanism, and netty4 and IdleStateHandler are used to implement the heartbeat mechanism

The server adds IdleStateHandler heartbeat detection processor, and adds a custom Handler class to implement userEventTriggered() method as the logical processing of timeout events;

Set IdleStateHandler heartbeat detection to perform read detection every five seconds. If the ChannelRead() method is not called within five seconds, the userEventTrigger() method will be triggered once

ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
             socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new HeartBeatServerHandler());
            }
        });
  • The custom processing class Handler inherits ChannlInboundHandlerAdapter and implements its userEventTriggered() method. It will be triggered when a timeout event occurs, including read idle timeout or write idle timeout;
    class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
        private int lossConnectCount = 0;
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("No message from the client has been received for 5 seconds!");
            if (evt instanceof IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                if (event.state()== IdleState.READER_IDLE){
                    lossConnectCount++;
                    if (lossConnectCount>2){
                        System.out.println("Close this inactive channel!");
                        ctx.channel().close();
                    }
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            lossConnectCount = 0;
            System.out.println("client says: "+msg.toString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    The client adds IdleStateHandler heartbeat detection processor, and adds a custom Handler class to implement userEventTriggered() method as the logical processing of timeout events;

    Set IdleStateHandler heartbeat detection to perform write detection every four seconds. If the write() method is not called within four seconds, the userEventTrigger() method will be triggered once to enable the client to send a message to the server every four seconds;

Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                socketChannel.pipeline().addLast(new StringEncoder());
                socketChannel.pipeline().addLast(new HeartBeatClientHandler());
            }
        });
  • The custom processing class Handler inherits ChannlInboundHandlerAdapter and implements the custom userEventTrigger() method. If there is a timeout, it will be triggered, including read idle timeout or write idle timeout;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    System.out.println("Client cyclic heartbeat monitoring sending: "+new Date());
    if (evt instanceof IdleStateEvent){
        IdleStateEvent event = (IdleStateEvent)evt;
        if (event.state()== IdleState.WRITER_IDLE){
            if (curTime<beatTime){
                curTime++;
                ctx.writeAndFlush("biubiu");
            }
        }
    }
}

7. Client reconnection

/**
     * Connect the server and reconnect
     */
    protected void doConnect() {
 
        if (channel != null && channel.isActive()){
            return;
        }       
        ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081);
        //Method for realizing connection of monitoring channel
        connect.addListener(new ChannelFutureListener() {
 
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
 
                if(channelFuture.isSuccess()){
                    channel = channelFuture.channel();
                    System.out.println("Successfully connected to the server");
                }else{
                    System.out.println("Every 2 s Reconnection....");
                    channelFuture.channel().eventLoop().schedule(new Runnable() {
 
                        @Override
                        public void run() {
                            doConnect();
                        }
                    },2,TimeUnit.SECONDS);
                }   
            }
        });     
    }

 

Added by jorgep on Fri, 28 Jan 2022 08:25:07 +0200