Java IO, NIO and Netty network programming

1. Introduction to IO, NIO and Netty

1.1 blocking I / O

Synchronous blocking I/O mode: when a thread executes the read() or write() method, the thread will block until it reads some data or writes out all the data. During this period, the thread cannot do anything else. When the number of active connections is not particularly high (less than 1000 for a single machine), this model is relatively good. Each connection can focus on its own I/O, and the programming model is simple, without too much consideration of system overload, current limit and other problems.

However, when faced with 100000 or even millions of connections, the traditional BIO model is powerless. Therefore, we need a more efficient I/O processing model to deal with higher concurrency.

1.2 non blocking NIO(New I/O)

NIO is a synchronous non blocking I/O model. NIO has the same function and purpose as the original IO, but the use method is completely different. NIO supports Buffer oriented and Channel based operations. NIO will read and write files in a more efficient way. The core of JAVA NIO is: Channel and Buffer. A Channel represents a connection that opens an IO device (e.g., file, Socket). If NIO system needs to be used, it is necessary to obtain the Channel used to connect IO devices and the Buffer data used to hold data for processing. Blocking IO will wait all the time, so non blocking IO is used to solve the decoupling problem between IO threads and sockets. By introducing a mechanism, if the Socket sending Buffer is writable, it will notify the IO thread to write, and if the Socket receiving Buffer is readable, it will notify the IO thread to read. NIO provides SocketChannel and ServerSocketChannel corresponding to Socket and ServerSocket in the traditional BIO model.

For high load and high concurrency (Network) applications, NIO's non blocking mode should be used for development.

1.3 Netty

Netty is an asynchronous event driven network application framework for rapid development of maintainable high-performance protocol servers and clients.

Netty is a NIO client server framework, which can quickly and easily develop network applications, such as protocol server and client. It greatly simplifies and simplifies network programming such as TCP and UDP socket server.

"Fast and simple" does not mean that the resulting application will be affected by maintainability or performance problems. Netty is carefully designed based on experience gained from the implementation of many protocols, such as FTP, SMTP, HTTP, and various legacy protocols based on binary and text. As a result, netty successfully found a way to achieve ease of development, performance, stability, and flexibility without compromise.

2. Coding

2.1 IO server

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
​
public class OioServer {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
​
    ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    //Create a socket service and listen on port 10101
    ServerSocket server=new ServerSocket(10101);
    System.out.println("Server startup!");
    while(true){
        //Get a socket (blocking)
        final Socket socket = server.accept();
        System.out.println("A new client!");
        newCachedThreadPool.execute(new Runnable() {
​
            @Override
            public void run() {
                //Business processing
                handler(socket);
            }
        });
​
    }
}
​
/**
 * Read data
 * @param socket
 * @throws Exception
 */
public static void handler(Socket socket){
    try {
        byte[] bytes = new byte[1024];
        InputStream inputStream = socket.getInputStream();
​
        while(true){
            //Read data (blocking)
            int read = inputStream.read(bytes);
            if(read != -1){
                System.out.println(new String(bytes, 0, read));
            }else{
                break;
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }finally{
        try {
            System.out.println("socket close");
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}}

2.2 NIO server

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
​
public class NIOServer {
// Channel Manager
private Selector selector;
/**
 * Get a ServerSocket channel and do some initialization for the channel
 *
 * @param port
 *            Bound port number
 * @throws IOException
 */
public void initServer(int port) throws IOException {
    // Get a ServerSocket channel
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    // Set channel to non blocking
    serverChannel.configureBlocking(false);
    // Bind the ServerSocket corresponding to the channel to the port
    serverChannel.socket().bind(new InetSocketAddress(port));
    // Get a channel manager
    this.selector = Selector.open();
    // Bind the channel manager to the channel and register the selectionkey for the channel OP_ Accept event. After registering this event,
    // When the event arrives, the selector Select () will return if the event does not reach the selector Select() will always block.
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
​
/**
 * Use polling to monitor whether there are events to be processed on the selector. If so, process them
 *
 * @throws IOException
 */
public void listen() throws IOException {
    System.out.println("The server is started successfully!");
    // Polling access selector
    while (true) {
        // When the registered event arrives, the method returns; Otherwise, the method will always block
        selector.select();
        // Gets the iterator of the selected item in the selector. The selected item is the registered event
        Iterator<?> ite = this.selector.selectedKeys().iterator();
        while (ite.hasNext()) {
            SelectionKey key = (SelectionKey) ite.next();
            // Delete the selected key to prevent repeated processing
            ite.remove();
​
            handler(key);
        }
    }
}
​
/**
 * Processing requests
 *
 * @param key
 * @throws IOException
 */
public void handler(SelectionKey key) throws IOException {
​
    // Client request connection event
    if (key.isAcceptable()) {
        handlerAccept(key);
        // Get readable events
    } else if (key.isReadable()) {
        handelerRead(key);
    }
}
​
/**
 * Processing connection requests
 *
 * @param key
 * @throws IOException
 */
public void handlerAccept(SelectionKey key) throws IOException {
    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    // Get the channel to connect with the client
    SocketChannel channel = server.accept();
    // Set to non blocking
    channel.configureBlocking(false);
​
    // Here you can send information to the client
    System.out.println("New client connection");
    // After the connection with the client is successful, in order to receive the information from the client, you need to set the read permission for the channel.
    channel.register(this.selector, SelectionKey.OP_READ);
}
​
/**
 * Handling read events
 *
 * @param key
 * @throws IOException
 */
public void handelerRead(SelectionKey key) throws IOException {
    // The server can read the message: get the Socket channel of the event
    SocketChannel channel = (SocketChannel) key.channel();
    // Create read buffer
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int read = channel.read(buffer);
    if(read > 0){
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("Information received by the server:" + msg);
​
        //Write back data
        ByteBuffer outBuffer = ByteBuffer.wrap("well".getBytes());
        channel.write(outBuffer);// Send the message back to the client
    }else{
        System.out.println("Client shutdown");
        key.cancel();
    }
}
​
/**
 * Start server test
 *
 * @throws IOException
 */
public static void main(String[] args) throws IOException {
    NIOServer server = new NIOServer();
    server.initServer(8000);
    server.listen();
}
}

2.3 Netty echo server

package Netty;
​
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
​
​
public final class EchoServer {
​
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
​
public static void main(String[] args) throws Exception {
  // Configure SSL.
  final SslContext sslCtx;
  if (SSL) {
          SelfSignedCertificate ssc = new SelfSignedCertificate();
          sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
      } else {
          sslCtx = null;
      }
​
  // configure server
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  final EchoServerHandler serverHandler = new EchoServerHandler();
  try {
          ServerBootstrap b = new ServerBootstrap();
          b.group(bossGroup, workerGroup)
           .channel(NioServerSocketChannel.class)
           .option(ChannelOption.SO_BACKLOG, 100)
           .handler(new LoggingHandler(LogLevel.INFO))
           .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                    p.addLast(sslCtx.newHandler(ch.alloc()));
                                }
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(serverHandler);
                        }
       });
​
          // Start the server
          ChannelFuture f = b.bind(PORT).sync();
​
          // Block until disconnected
          f.channel().closeFuture().sync();
      } finally {
          // Close all event loops and close threads
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }
}
}
​

Server processing events:

package Netty;
​
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
​
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
​
​
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        Scanner s = new Scanner(System.in);
        while(true) {
            String words = s.nextLine();
            final ByteBuf wordbuf = Unpooled.wrappedBuffer(words.getBytes(StandardCharsets.UTF_8));
           
            final ChannelFuture f = ctx.writeAndFlush(wordbuf); // (3)
​
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    assert f == future;
                    if(words.equals("exit")){
                        ctx.close();
                        return;
                    }
​
                }
            }); // (4)
        }
    }
​
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

Server:

package Netty;
​
   /*
   * Copyright 2012 The Netty Project
   *
   * The Netty Project licenses this file to you under the Apache License,
   * version 2.0 (the "License"); you may not use this file except in compliance
   * with the License. You may obtain a copy of the License at:
   *
   *   https://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   * License for the specific language governing permissions and limitations
   * under the License.
   */
​
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
​
​
public final class EchoClient {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
​
    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
                sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
​
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
               @Override
        public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                            }
                        //p.addLast(new LoggingHandler(LogLevel.INFO));
                      p.addLast(new EchoClientHandler());
                  }
  });
​
                 // Start the client.
                 ChannelFuture f = b.connect(HOST, PORT).sync();
​
                 // Wait until the server closes the connection
                 f.channel().closeFuture().sync();
             } finally {
                 // Shut down the event loop to terminate all threads.
                 group.shutdownGracefully();
             }
     }
}

Client handling events:

package Netty;
​
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
​
import java.util.Date;
​
/**
* Handler implementation for the echo client.  It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
      public class EchoClientHandler extends ChannelInboundHandlerAdapter {
​
          private final ByteBuf firstMessage;
​
          /**
    * Creates a client-side handler.
    */
          public EchoClientHandler() {
              firstMessage = Unpooled.buffer(EchoClient.SIZE);
              for (int i = 0; i < firstMessage.capacity(); i ++) {
                firstMessage.writeByte((byte) i);
            }
    }
​
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        String word=m.toString(CharsetUtil.UTF_8);
        try {
            System.out.println(word);
            if(word.equals("exit")){
                ctx.close();
            }
​
        } finally {
            m.release();
        }
    }
​
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

This program allows sending messages from the server to the client:

Sender:

Receiver:

3. Reference

[1] Netty3 learning notes (I) -- comparison between traditional IO and NIO

[2] Netty3 learning notes (II) -- Introduction to Netty Helloworld

[3] Netty echo server

[4] Java interview - Netty

Keywords: Netty

Added by callie212 on Sat, 11 Dec 2021 11:43:25 +0200