From IO to NIO to Netty

1, IO, NIO, Netty features

1.IO

1. Server blocking point

server.accept(); When getting socket, InputStream read(bytes); When the input stream reads data

2. The traditional socket is a short connection and can be used as a short connection server. It cannot make a long connection. It belongs to the ask and answer mode. For example, the underlying layer of the old tomcat uses the socket, and the thread will be turned off when it is used up. Therefore, the thread will not be occupied all the time. It supports processing multiple client connections
(1) In the case of single thread, only one client (one thread maintains one connection, that is, one socket client connection) thread can be occupied all the time.
(2) Using thread pool can have multiple client connections, but it is very performance consuming (using this case is the old tomcat principle, but it is released after use)

2.NIO

Main API introduction: ServerSocketChannel corresponds to ServerSocket in traditional IO.
SocketChannel corresponds to the Socket in traditional IO. Selector is the core of NIO and load monitoring
ServerSocketChannel and SocketChannel
Support single thread to connect multiple clients; Similar to the channel manager, and the bottom layer is implemented in c; A thread with a selector can support multiple clients. SelectionKey
The key in the map is equivalent to recording different things according to different actions. One key is one event.

3.Netty

Netty is a client / server framework that uses Java's advanced network capabilities to hide the complexity behind it and provide an easy-to-use API.
Netty is a network application framework based on Java NIO. Netty is completely implemented based on NIO, so the whole netty is asynchronous.
① Concurrent high Netty is a network communication framework developed based on NIO (non Blocking I / O), which is compared with BIO (Blocking)
I/O, blocking IO), and its concurrency performance has been greatly improved. ② Fast transmission
Netty's fast transmission actually depends on a feature of NIO - zero copy. We know that JAVA memory includes heap memory, stack memory, string constant pool, etc. heap memory is the largest piece of memory and the place where Java objects are stored. Generally, if our data needs to be read from IO to heap memory, it needs to pass through Socket buffer, that is, a data will be copied twice to reach its destination, If the amount of data is large, it will cause unnecessary waste of resources.
Netty uses another feature of NIO - zero copy. When it needs to receive data, it will open up a memory outside the heap memory, and the data will be directly read from IO to that memory. In netty, these data can be directly operated through ByteBuf, so as to speed up the transmission speed.
③ Sealed

2, Code implementation

IO

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Hello {
    public static void main(String[] args) throws IOException {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //Create a socket service and listen on port 10101
        ServerSocket server=new ServerSocket(10101);
        System.out.println("Server startup!");
        while(true){
            //Get a socket (blocking)
            final Socket socket = server.accept();
            System.out.println("A new client!");
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    //Business processing
                }
            });

        }
    }
}

NIO

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Hello {

    // Channel Manager
    private Selector selector;

    /**
     * Get a ServerSocket channel and do some initialization for the channel
     *
     * @param port
     *            Bound port number
     * @throws IOException
     */
    public void initServer(int port) throws IOException {
        // Get a ServerSocket channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // Set channel to non blocking
        serverChannel.configureBlocking(false);
        // Bind the ServerSocket corresponding to the channel to the port
        serverChannel.socket().bind(new InetSocketAddress(port));
        // Get a channel manager
        this.selector = Selector.open();
        // Bind the channel manager to the channel and register the selectionkey for the channel OP_ Accept event. After registering this event,
        // When the event arrives, the selector Select () will return if the event does not reach the selector Select() will always block.
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * Use polling to monitor whether there are events to be processed on the selector. If so, process them
     *
     * @throws IOException
     */
    public void listen() throws IOException {
        System.out.println("The server is started successfully!");
        // Polling access selector
        while (true) {
            // When the registered event arrives, the method returns; Otherwise, the method will always block
            selector.select();
            // Gets the iterator of the selected item in the selector. The selected item is the registered event
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // Delete the selected key to prevent repeated processing
                ite.remove();

                handler(key);
            }
        }
    }
    /**
     * Processing requests
     *
     * @param key
     * @throws IOException
     */
    public void handler(SelectionKey key) throws IOException {

        // Client request connection event
        if (key.isAcceptable()) {
            handlerAccept(key);
            // Get readable events
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }

    /**
     * Processing connection requests
     *
     * @param key
     * @throws IOException
     */
    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // Get the channel to connect with the client
        SocketChannel channel = server.accept();
        // Set to non blocking
        channel.configureBlocking(false);

        // Here you can send information to the client
        System.out.println("New client connection");
        // After the connection with the client is successful, in order to receive the information from the client, you need to set the read permission for the channel.
        channel.register(this.selector, SelectionKey.OP_READ);
    }

    /**
     * Handling read events
     *
     * @param key
     * @throws IOException
     */
    public void handelerRead(SelectionKey key) throws IOException {
        // The server can read the message: get the Socket channel of the event
        SocketChannel channel = (SocketChannel) key.channel();
        // Create read buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if(read > 0){
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("Information received by the server:" + msg);

            //Write back data
            ByteBuffer outBuffer = ByteBuffer.wrap("well".getBytes());
            channel.write(outBuffer);// Send the message back to the client
        }else{
            System.out.println("Client shutdown");
            key.cancel();
        }
    }


    public static void main(String[] args) throws IOException {
        Hello server = new Hello();
        server.initServer(8000);
        server.listen();
    }
}

NettyHelloworld
MyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
    public static void main(String[] args) throws Exception {
        //Thread group used to process server-side acceptance of client connections
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //Thread group used for network communication (read and write)
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        //Create a helper class for configuring a series of server channels
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup,workGroup)//Bind two thread groups
                .channel(NioServerSocketChannel.class)//Specifies that the network transmission mode of NIO is TCP,UDP:NioDatagramChannel
                .option(ChannelOption.SO_BACKLOG,1024)//Set tcp buffer
                .option(ChannelOption.SO_SNDBUF,32*1024)//Set send buffer size
                .option(ChannelOption.SO_RCVBUF,32*1024)//Set receive buffer size
                .option(ChannelOption.SO_KEEPALIVE,true)//Keep connected
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ServerHandler());//The processing of the specific data receiving method is configured here
                    }
                });

        ChannelFuture cf1 = sb.bind(8787).sync();//The port specified by the asynchronous binding
        ChannelFuture cf2 = sb.bind(8686).sync();//netty can bind multiple ports

        cf1.channel().closeFuture().sync();//Wait to close, equivalent to thread sleep(Integer.MAX_VALUE)
        cf2.channel().closeFuture().sync();

        //Close thread group
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

ServerHandler

package hello;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {
    /**
     * Rewriting the method of processing when reading data
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;

        //Declare byte array, buf Readablebytes() returns the number of bytes readable in the buf buffer
        byte[] req = new byte[buf.readableBytes()];
        //Reads the bytes in the buf buffer into the byte array req
        buf.readBytes(req);
        String body = new String(req, "utf-8");
        System.out.println("Server Print received information:" + body);
        String response = "Server Return to Client Response information:" + body;

        //1.ctx. The writeandflush () method is equivalent to calling the write() and flush() methods continuously, because the write() method only writes the buf to the buffer of the channel, and the flush() method will pass the data in the buffer to the client
        //2. The function of Unpooled tool class here is to convert byte array into ByteBuf object of netty
        //3. The writeAndFlush() method is used here to automatically release the buf buffer, so you don't need to manually release the buf buffer in finally as in ClientHandler
        //4.addListener() method: after listening to the server write data to the client and confirming that the client has received the information,
        // The server will take the initiative to close the connection with the client, because the client calls CF1 channel(). closeFuture(). Sync () method, so the client will open the block here and continue to execute the code backward
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
//                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * Exception handling method for rewriting read data
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}

MyClient

package hello;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args) throws Exception{

        NioEventLoopGroup group = new NioEventLoopGroup();//Thread group used to handle network communication (read and write)

        Bootstrap b = new Bootstrap();//Create client helper class tool
        b.group(group)//Bind thread group
                .channel(NioSocketChannel.class)//Set the communication channel to TCP protocol
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ClientHandler());//The processing of the specific data receiving method is configured here
                    }
                });

        /*Communication with 8787 port*/
        ChannelFuture cf1 = b.connect("127.0.0.1", 8787).sync();//Establish connection asynchronously

        cf1.channel().write(Unpooled.copiedBuffer("hello world".getBytes()));//Write "hello world" to buf buffer
        cf1.channel().flush();//Here, you must use flush() to transfer the data in the buf buffer to the server

        /*Communication with 8686 port*/
        ChannelFuture cf2 = b.connect("127.0.0.1", 8686).sync();
        cf2.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes()));

        cf1.channel().closeFuture().sync();//Wait to close, equivalent to thread sleep(Integer.MAX_VALUE)
        cf2.channel().closeFuture().sync();

        group.shutdownGracefully();//Close thread group
    }
}

ClientHandler

package hello;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter {
    /**
     * Rewriting the method of processing when reading data
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;

            //Declare byte array, buf Readablebytes() returns the number of bytes readable in the buf buffer
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "utf-8");
            System.out.println("Client Print received information:" + body);

        }finally {
            ReferenceCountUtil.release(msg);//The buf buffer is used up and must be released
        }

    }

    /**
     * Exception handling method for rewriting read data
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

3, Operation results

reference resources

https://blog.csdn.net/weixin_56102526/article/details/121805391?spm=1001.2014.3001.5501

Keywords: Java Tomcat server

Added by unstable_geek on Thu, 13 Jan 2022 18:29:29 +0200