TCP chat program based on IO, NIO and Netty

preface

Java version 1.4 introduces a new IO API, which has the same function and purpose as the original IO; It can replace standard java IO, but the implementation method is different. NIO is a buffer oriented and channel based IO operation; NIO can improve the reading and writing of files. Based on this advantage, there are more and more scenarios using NIO. Many popular frameworks use NIO technology, such as Tomcat, Netty, Jetty, etc; Therefore, learning and mastering NIO technology is a necessary skill for java development.

1, IO and NIO

1. Stream oriented and buffer oriented

Reading and writing data in Java IO is * * Stream oriented (Stream) * *, which means that when we read data from the Stream and write data to the Stream, we also write it to the Stream. The meaning of the Stream is that there is no cache. It's like we stand in front of the pipeline, and all the data arrive in front of us successively along the pipeline. We can only read the current data. If we need to obtain the previous or later data of a data, we must cache the data ourselves Data, not directly from the Stream.

In Java NIO, data reading and writing is oriented to * * Buffer (Buffer) * *, when reading, the whole block of data can be read into the Buffer, and when writing, the data in the whole Buffer can be written together. This is like changing pipeline transmission into truck transportation. Stream oriented data reading and writing only provides a data stream interface, while Buffer oriented IO enables us to see the context of data, that is, in the Buffer It is very convenient to obtain the previous data or the latter data of a certain data in the area. This convenience comes at a price, because we must manage the Buffer well, including not allowing new data to overwrite the useful data in the Buffer that has not been processed; Block the data in the Buffer correctly, and distinguish what has been processed and what has not been processed, etc.

2. Blocking and non blocking

Traditional IO streams are blocking. That is, when a thread calls read() or write(), the thread is blocked until some data is read or written, and the thread cannot perform other tasks during this period. Therefore, when the network communication is completed for IO operation, because the thread will block, the server must provide an independent thread for each client to process. When the server needs to process a large number of clients, the performance drops sharply.

Java NIO is non blocking. When a thread reads and writes data from a channel, if no data is available, the thread can perform other tasks. Threads usually use the idle time of non blocking IO to perform IO operations on other channels, so a single thread can manage multiple input and output channels. Therefore, NIO can let the server use one or a limited number of threads to process all clients connected to the server at the same time.

2, TCP chat program

1. IO based

IO server:

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IOServer {

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {

        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //Create a socket service and listen to port 8081
        ServerSocket server=new ServerSocket(8081);
        System.out.println("Server startup!");
        int count=0;
        while(true){
            //Get a socket (blocking)
            final Socket socket = server.accept();
            System.out.println("Welcome section"+(++count)+"A classmate");
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    //Business processing
                    handler(socket);
                }
            });

        }
    }

  
 //Read data
    
    public static void handler(Socket socket){
        try {
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();

            while(true){
                //Read data (blocking)
                int read = inputStream.read(bytes);
                if(read != -1){
                    System.out.println(new String(bytes, 0, read));
                }else{
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                System.out.println("socket close");

                socket.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

IO client:

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

public class IOClient {
    public static void main(String[] args) throws IOException {
        //Send ten times
        for (int i=0;i<10;i++){
            Socket socket=new Socket("127.0.0.1", 8081);
            //Write data
            OutputStream os=socket.getOutputStream();
            os.write(("Royal Music"+i).getBytes());
            //Release resources
            socket.close();
        }

    }

}

effect:

2. Based on NIO

NIO server:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
    // Channel Manager
    private Selector selector;


     //Start server test

    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8081);
        server.listen();
    }



     // Get a ServerSocket channel and do some initialization for the channel

    public void initServer(int port) throws IOException {
        // Get a ServerSocket channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // Set channel to non blocking
        serverChannel.configureBlocking(false);
        // Bind the ServerSocket corresponding to the channel to the port
        serverChannel.socket().bind(new InetSocketAddress(port));
        // Get a channel manager
        this.selector = Selector.open();
        // Bind the channel manager to the channel and register the selectionkey for the channel OP_ Accept event. After registering this event,
        // When the event arrives, the selector Select () will return if the event does not reach the selector Select() will always block.
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }


     //Use polling to monitor whether there are events to be processed on the selector. If so, process them

    public void listen() throws IOException {
        System.out.println("The server is started successfully!");
        // Polling access selector
        while (true) {
            // When the registered event arrives, the method returns; Otherwise, the method will always block
            selector.select();
            // Gets the iterator of the selected item in the selector. The selected item is the registered event
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // Delete the selected key to prevent repeated processing
                ite.remove();

                handler(key);
            }
        }
    }


     //Processing requests

    public void handler(SelectionKey key) throws IOException {

        // Client request connection event
        if (key.isAcceptable()) {
            handlerAccept(key);
            // Get readable events
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }


     // Processing connection requests

    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // Get the channel to connect with the client
        SocketChannel channel = server.accept();
        // Set to non blocking
        channel.configureBlocking(false);

        // Here you can send information to the client
        System.out.println("New customer connection detected");
        // After the connection with the client is successful, in order to receive the information from the client, you need to set the read permission for the channel.
        channel.register(this.selector, SelectionKey.OP_READ);
    }


     // Handling read events

    public void handelerRead(SelectionKey key) throws IOException {
        // The server can read the message: get the Socket channel of the event
        SocketChannel channel = (SocketChannel) key.channel();
        // Create read buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if(read > 0){
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("The user name is:" + msg);

            //Write back data
            ByteBuffer outBuffer = ByteBuffer.wrap("Server received".getBytes());
            channel.write(outBuffer);// Send the message back to the client
        }else{
            System.out.println("Client shutdown");
            key.cancel();
        }
    }
}

NIO client:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
    public static void main(String[] args) throws Exception {
        final int count[]=new int[1];
        count[0]=1;
        for(int i=0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    SocketChannel socketChannel = null;
                    //Data sent
                    String str = "Royal Music"+count[0]++;
                    ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());

                    //Accepted data
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    try {
                        //Establish connection
                        socketChannel = SocketChannel.open();
                        socketChannel.configureBlocking(false);
                        if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081))) {
                            //Waiting for connection
                            while (!socketChannel.finishConnect()) {
                            }
                        }
                        //Write data
                        socketChannel.write(byteBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    //Automatically disconnect after 10s
                    int time=1;
                    while (time<10){
                        time++;
                        try {
                            //Read data
                            int read=socketChannel.read(buffer);
                            if(read > 0) {
                                byte[] data = buffer.array();
                                String msg = new String(data).trim();
                                System.out.println("Client received message:" + msg);
                            }
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            Thread.sleep(100);
        }
    }

}

Test effect:
The server:

client:

3. Based on Netty

Netty server:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class NettyServer {
    public static void main(String[] args) {
        //It is used to process the client connection received by the server
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //Network communication (read-write)
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //Auxiliary tool class, which is used for a series of configuration of server channels
            ServerBootstrap bootstrap = new ServerBootstrap();
            //Bind two thread groups
            bootstrap.group(bossGroup,workerGroup)
                    //Set the boss selector to create the object used by the channel
                    .channel(NioServerSocketChannel.class)
                    //Queue length of boss waiting for connection
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //Processing message objects
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //Create pipe
                            ChannelPipeline pipeline = ch.pipeline();
                            //Decoding mode
                            pipeline.addLast("decoder",new StringDecoder());
                            //Coding mode
                            pipeline.addLast("encoder",new StringEncoder());
                            //Custom processing message object
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            System.out.println("The server is starting");
            //Binding port number
            ChannelFuture cf = bootstrap.bind(8083).sync();

            cf.addListener(cd->{
                if(cd.isSuccess()){
                    System.out.println("Start successful");
                }else{
                    System.out.println("Startup failed");
                }
            });
            //The server sends messages to all clients
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                ServerHandler.sendAll(msg);
            }
            //Block current thread
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }



}
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+" == " +msg);
        channelGroup.forEach(ch->{
            if (channel!=ch) {
                ch.writeAndFlush("[ client ]" + channel.remoteAddress() + "Sent a message : " + msg + "\n");
            }else{
                ch.writeAndFlush("[ customer ] Sent a message: " + msg + "\n");
            }
        });

    }
    //It is used by the server to send information to all clients
    public static void sendAll(String msg){
        channelGroup.forEach(channel -> {
            channel.writeAndFlush("The server: "+msg+"\n");
        });
    }


     // Triggered when there is a new user connection

    public void channelActive(ChannelHandlerContext ctx){
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ client]"+channel.remoteAddress()+" Connected "+sf.format(new Date())+"\n");
        //Add new connections
        channelGroup.add(channel);
        System.out.println(ctx.channel().remoteAddress()+" It's online" + "\n");
    }


     //Triggered when the user disconnects

    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ client ] " +channel.remoteAddress()+ " Disconnect"+"\n");
        System.out.println(channel.remoteAddress()+" Offline.\n");
        System.out.println("channelGroup size = "+ channelGroup.size());
    }


}

Netty client:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.ArrayList;
import java.util.List;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        List<ChannelFuture> channelFutures = new ArrayList<ChannelFuture>();
        try {
            Bootstrap bootstrap = new Bootstrap();
            //The server can actively disconnect
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            //Address multiplexing
            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ClientHandler());
                        }
                    });
            final int count[] =new int[1];
            count[0]=0;
            for(int i=0;i<3;i++){
                //add connections
                channelFutures.add(bootstrap.connect("127.0.0.1",8083).sync());
                //New thread impersonation multi-user
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        int index=count[0]++;
                        //Get the corresponding pipe
                        Channel channel = channelFutures.get(index).channel();
                        System.out.println( "======"+channel.localAddress()+"======");
                        int time=0;
                        while (time++<3){
                            //send data
                            String msg =" Royal Music "+(index)+": "+time;
                            channel.writeAndFlush(msg);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        //Close connection
                        channel.close();

                    }
                }).start();

            }

            //Block the main thread, otherwise it will execute finally and close EventLoopGroup directly
            int time=0;
            while (time++<5){
                Thread.sleep(1000);
            }

        } finally {
            //Close EventLoopGroup
            group.shutdownGracefully();
        }

    }

}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }

}

demonstration
Server:

client

reference resources

https://blog.csdn.net/linjpg/article/details/80962453
https://blog.csdn.net/qq_47281915/article/details/121802536

Keywords: Java network TCP/IP

Added by scottfossum on Wed, 15 Dec 2021 07:33:33 +0200