Graphic io+BIO+NIO+AIO+Netty + chat room practice

The beginning of 2020 is extraordinary. I will stay at home and study io again
io is divided into bio,nio,aio
Among them, netty belongs to nio, and aio has not been widely used, so it has great potential in the future

Understand several concepts
Synchronous and asynchronous
Blocking and non blocking
io multiplexing
I/O multiplexing, I/O refers to network I/O, multiplexing refers to multiple TCP connections (i.e. socket or channel), multiplexing refers to multiplexing one or several threads.
Simply put: the biggest advantage of using one or more threads to handle multiple TCP connections is that it reduces the system overhead and does not need to create too many processes / threads or maintain them

Reactor mode: Based on event driven, common applications include redis,netty, etc
epoll
1) Without the limitation of FD, the upper limit of FD supported is the maximum number of file handles of the operating system. 1G memory supports about 100000 handles
2) Efficiency improvement, using callback notification instead of polling will not decrease with the increase of FD number
3) Through the callback mechanism, the kernel and user space mmap are implemented in the same block of memory

First, the simple interaction process of io is illustrated

Stand alone internal io


From the flow chart, it can be seen that the external interaction is first that the socket enters through the network card, through the corresponding port, then enters the kernel space through the data preparation stage, and then the kernel space is copied to the user space through the data, and then the application program processes it. Other and related to synchronous and asynchronous, blocking and non blocking understanding, this will not be introduced.


This figure is the calling process of blocking io

Network io model

java.io Model
BIO: main network io programming mode before jdk1.4, synchronous blocking

BIO chat room
TalkServerBIO bio server
//bio implementation of chat server
public class TalkServerBIO {
//Client list
public static List userList = new ArrayList<>();

public static void main(String[] args) throws IOException {

    //Server port settings
    ServerSocket serverSocket = new ServerSocket(9999);
    System.out.println("Beauty chat room waiting for your connection......");
    while (true){
        Socket socket = serverSocket.accept();
        System.out.println("Dear customers welcome you");
        MyChannel myChannel = new MyChannel(socket);
        new Thread(myChannel).start();
        userList.add(myChannel);
    }
}

}
TalkClientBIO bio client implementation
//The realization of bio in chat client
@Slf4j
public class TalkClientBIO {

private static File file = new File("Client.propertise");

private static Properties properties = new Properties();

private static String HOST = "";

private static int PORT =0;

//Initialize port startup information
static {
    if(!file.exists()){
        try {
            file.createNewFile();
            FileOutputStream fos = new FileOutputStream(file);
            properties.setProperty("hostname","localhost");
            properties.setProperty("port","9999");
            properties.store(fos,null);
            fos.close();
        } catch (IOException e) {
            log.error("Failed to create file");
            e.printStackTrace();
        }
    }
    try {
        properties.load(new FileInputStream(file));
        HOST = properties.getProperty("hostname");
        PORT  = Integer.parseInt(properties.getProperty("port"));
    } catch (IOException e) {
        e.printStackTrace();
    }
}
public static void main(String[] args) {
    try {
        Socket socket = new Socket(HOST,PORT);
        Scanner input = new Scanner(System.in);
        System.out.println("The client has started. Please enter your nickname: ");
        String name = input.next();
        System.out.println("Has entered the chat room!");
        new Thread(new SendUtil(socket),name).start();
        new Thread(new ReceiveUtil(socket),name).start();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}
Implementation of MyChannel message channel
//Realization of pipeline communication on server server
@Slf4j
public class MyChannel implements Runnable {

//Input stream
private DataInputStream dis;
//Output stream
private DataOutputStream dos;
//Pipeline closure sign
private boolean isStop = Boolean.FALSE;

/**
 * Constructor gets connection related information
 * @param client
 */
public MyChannel(Socket client){

    try {
        dis = new DataInputStream(client.getInputStream());
        dos = new DataOutputStream(client.getOutputStream());
    } catch (IOException e) {
        e.printStackTrace();
        log.info("Pipeline message exception{}",client.getInetAddress());
        colse();
    }
}
@Override
public void run() {
    while (!isStop){
        sendOther();
    }

}

/**
 *
 * Get message
 * @return
 */
public String receiveMsg(){

    String msg ="";

    try {
        msg = dis.readUTF();
        System.out.println(Thread.currentThread().getName()+":"+msg);
    } catch (IOException e) {
        colse();
        TalkServerBIO.userList.remove(this);
    }
    return msg;
}

public void sendMsg(String s){

    if(!StringUtils.isEmpty(s)){
        try {
            dos.writeUTF(s);
            dos.flush();
        } catch (IOException e) {
            e.printStackTrace();
            isStop = true;
            colse();
        }

    }

}

/**
 * Send messages to other clients
 */
public void sendOther(){
    String s = this.receiveMsg();
    List<MyChannel> userList = TalkServerBIO.userList;
    for(MyChannel myChannel:userList){
      if(this == myChannel){
          continue;
      }
      //Send messages to others
      myChannel.sendMsg(s);
    }
}

/**
 * Close connection
 */
public void colse(){
    isStop = true;
    if(null!=dis){
        try {
            dis.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
    if(null!=dos){
        try {
            dos.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

}

//Send message tool class
public class SendUtil implements Runnable {

private DataOutputStream dos = null;
private boolean isStop = Boolean.FALSE;
private BufferedReader br = null;

public SendUtil(Socket socket){

    br = new BufferedReader(new InputStreamReader(System.in));
    try {
        dos =  new DataOutputStream(socket.getOutputStream());
    } catch (IOException e) {
        e.printStackTrace();
        colse();
    }
}

/**
 * send message
 */
public void sendMessage(){
    try {
        dos.writeUTF(Thread.currentThread().getName()+"-->Say:"+br.readLine());
        dos.flush();
    } catch (IOException e) {
        colse();
        e.printStackTrace();
    }
}
@Override
public void run() {

    while (!isStop){
        this.sendMessage();
    }
}

/**
 * Close connection
 */
public void colse(){
    isStop = true;
    if(null!=dos){
        try {
            dos.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

}
//Get message tool class
public class ReceiveUtil implements Runnable {

private DataInputStream ios = null;
private boolean isStop = Boolean.FALSE;
private BufferedReader br = null;

public ReceiveUtil(Socket socket){
    try {
        ios = new DataInputStream(socket.getInputStream());
    } catch (IOException e) {
        e.printStackTrace();
        colse();
    }
}
public void receiveMessage(){
    String msg ="";
    try {
        msg = ios.readUTF();
        System.out.println(msg);
    } catch (IOException e) {
        e.printStackTrace();
    }

}
@Override
public void run() {
    while (!isStop){
        this.receiveMessage();
    }
}


/**
 * Close connection
 */
public void colse(){
    isStop = true;
    if(null!=ios){
        try {
            ios.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

}
Three clients and one server are opened locally to realize online chat of multiple people

It can be seen from this example that every new client will start a new thread on the server side, so when a large number of clients request, there will be insufficient use of server thread resources, resulting in service unavailability. Even using thread pool can not change the fundamental problem. If the corresponding thread of the client has not read or write operations, the thread is always blocked, which is a great waste of resources.

NIO model appears, after jdk1.4, the characteristic synchronization is non blocking

nio chat room implementation
//Server
@Slf4j
public class TalkServerNIO {
//Define port number
private static int PORT = 8848;
//For character set decoding
private Charset charset = Charset.forName("UTF-8");
//Buffer for receiving data
private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
//Buffer for sending data
private ByteBuffer sBuffer = ByteBuffer.allocate(1024);
//Used to store the client socketchannel collection
private Map<String, SocketChannel> clientMap = new HashMap<>();
//Used to listen for channel events
private static Selector selector;

public static void main(String[] args) {
    TalkServerNIO talkServerNIO = new TalkServerNIO(8848);
    talkServerNIO.listen();
}
public TalkServerNIO(int port) {

    PORT = port;
    try {
        init();
    } catch (Exception e) {
        e.printStackTrace();
        log.error("Server initialization exception", e);
    }
}

//Initialize server, register listening events
private void init() throws IOException {

    //Open a serverSocketChannel
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    //Channel set to non blocking
    serverSocketChannel.configureBlocking(false);
    //Get the serverSocket of the channel
    ServerSocket serverSocket = serverSocketChannel.socket();
    //Binding port number
    serverSocket.bind(new InetSocketAddress(PORT));
    //Open listen selector
    selector = Selector.open();
    //Register connections to listen for events on selectors
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("Server starts, port number is:" + PORT);
}

//The server polls and listens, and the select method will block until the relevant event is sent or timed out
public void listen() {

    while (true) {

        try {
            // log.info("waiting for connection request.....");
            selector.select();//The return value is the number of events triggered this time
            //Get event collection
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //Handling events
            selectionKeys.forEach(selectionKey -> handle(selectionKey));
            selectionKeys.clear();//Clean up what you've dealt with
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

//Handling corresponding events
private void handle(SelectionKey selectionKey) {

    // log.info("there is a request to come in, need to deal with...");
    //Connection requested by client
    if (selectionKey.isAcceptable()) {
        //Get the channel of the connection request
        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
        try {
            SocketChannel clent = server.accept();
            clent.configureBlocking(false);
            clent.register(selector, SelectionKey.OP_READ);
            clientMap.put(getClientName(clent), clent);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //Client law message
    else if (selectionKey.isReadable()) {

        //Client channel
        SocketChannel client = (SocketChannel) selectionKey.channel();
        //Clear read cache
        rBuffer.clear();

        try {
            int bytes = client.read(rBuffer);
            if (bytes > 0) {
                //Set the switch to read data from the initial position of the zone
                rBuffer.flip();
                //Decoding buffer data
                String receiveText = String.valueOf(charset.decode(rBuffer));
                System.out.println(client.toString() + ":" + receiveText);
                dispatch(client, receiveText);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

//Forward the message to other clients, which is equivalent to the bio heavy sendOther
private void dispatch(SocketChannel clinet, String info) {
    if (!clientMap.isEmpty()) {

        for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
            SocketChannel temp = entry.getValue();
            if (!clinet.equals(temp)) {
                sBuffer.clear();
                sBuffer.put(charset.encode(getClientName(clinet) + ":" + info));
                sBuffer.flip();
                try {
                    temp.write(sBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}
//Generate client name or nickname
private String getClientName(SocketChannel client) {

    Socket socket = client.socket();
    return "[" + socket.getInetAddress().toString().substring(1) + ":" + Integer.toHexString(client.hashCode()) + "]";
}

}

Implementation of client

@Slf4j
public class TalkClientNIO {

//server address
private InetSocketAddress SERVER = null;
//Buffer for receiving data
private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
//Buffer for sending data
private ByteBuffer sBuffer = ByteBuffer.allocate(1024);

//Used to listen for channel events
private static Selector selector;
//For encoding or decoding
private Charset charset = Charset.forName("UTF-8");

public TalkClientNIO(int port){

    SERVER = new InetSocketAddress("127.0.0.1",port);
    //Initialize client
    init();

}

//Initialize client
private void init(){

    try {
        //Open SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        //Nonblocking
        socketChannel.configureBlocking(false);
        //Open monitoring channel
        selector = Selector.open();
        //Register the channel on the selector and listen for connection events
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        //Connect
        socketChannel.connect(SERVER);
        while (true){
            //Call the select method to select the channel with event call
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //Processing
            selectionKeys.forEach(selectionKey -> handle(selectionKey));
            selectionKeys.clear();//Clean up processed events
        }

    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void handle(SelectionKey selectionKey){
    //Connection ready event
    if(selectionKey.isConnectable()){
        //Get channel s with connection events
        SocketChannel client  = (SocketChannel) selectionKey.channel();
        //If connecting status
        if(client.isConnectionPending()){
            try {
                //Indicates successful connection
                client.finishConnect();
                System.out.println("Complete the connection with the server...");
                //Start thread listens for client input
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (true){
                            sBuffer.clear();
                            Scanner scanner = new Scanner(System.in);
                            String sendText = scanner.next();
                            sBuffer.put(charset.encode(sendText));
                            sBuffer.flip();
                            try {
                                client.write(sBuffer);
                            }catch (Exception e){
                                log.error("Input message exception...");
                            }
                        }
                    }
                }).start();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                //Register readable events
                client.register(selector,SelectionKey.OP_READ);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
        }

    }
    //Read events have messages sent from the server, read and input to the screen
    else if(selectionKey.isReadable()){
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        rBuffer.clear();
        try {
            int count = socketChannel.read(rBuffer);
            if(count>0){
                String receiveText = new String(rBuffer.array(),0,count);
                System.out.println(receiveText);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    TalkClientNIO talkClientNI = new TalkClientNIO(8848);
}

}
The model of nio's Reactor
Reactor can be divided into two parts:
Part of it is the Reactor itself, which handles client connections and requests
Part of it is io processing
1. Single Reactor single thread
All requests are answered by the main thread
Client sends request: the main thread multiplexes, listens for connection accept from the selector, registers the client to get the connection, reads the connection, writes the connection, and reads the data from the server

2. Single Reactor multithreading
Use thread pool to process business io

3. Master slave Reactor multithreading

Disadvantages of java nio
1. It is very difficult to learn. You need to master the skills of socketchannel,buffer, etc
2. Experience is needed to develop complex and many business processes, such as network interruption, network exception, etc
3. The infamous epoll null polling bug causes system failure

netty is a high-performance network communication framework, developed and maintained by jboss company.
Thread model: improvement of master-slave Reactor based on Multithreading
Efficient zero copy technology:
Two implementation methods of mmp and sendfile
4 copies and 3 context switches are required for read and write operations in the normal process
After the improvement of mmp, there are three copies and three context switches
sendfile copies three times before linux 2.4 and switches context twice
sendfile copies twice after Linux 2.4 and switches up and down twice
Achieve true zero copy: the so-called zero copy refers to the memory copy participated by cpu
dma copy is inevitable. dma refers to directory memory access

netty chat room practice
NettyServer
public class NettyServer {

private  static int PORT = 8848;
public NettyServer(int port){

    PORT = port;
}
public void start() {

    //The default thread size of thread group is cup*2, which can be set according to the actual situation. If the request concurrency is not high, io processing is time-consuming,
    //You can set less boss group and less worker group
    //Group of processing threads receiving client connections
    //Get the number of cpu cores at the bottom NettyRuntime.availableProcessors() * 2
    System.out.println("cpu number:"+NettyRuntime.availableProcessors());

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    //The working thread group is mainly responsible for the network io read-write interaction
    EventLoopGroup workerGroup = new NioEventLoopGroup();


    //netty is used to start the boot class of nio server, reducing the development complexity
    ServerBootstrap bootstrap = new ServerBootstrap();
    //AbstractBootstrap setting bossGroup
    //Important concept chain programming thread group
    bootstrap = bootstrap.group(bossGroup,workerGroup);
    //Set channel mode
    bootstrap = bootstrap.channel(NioServerSocketChannel.class);
    //Set callback request for processor
    bootstrap = bootstrap.childHandler(new NettyServerInitalizer());
    //Set the tcp parameters of NioServerSocketChannel
   // bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG,1024);
   // bootstrap = bootstrap.option(ChannelOption.SO_KEEPALIVE,true);

    //Bind the listening port, call the sync synchronization blocking method, wait for the binding operation to complete, and then return to channelfailure
    try {
        ChannelFuture future = bootstrap.bind(PORT).sync();
        System.out.println("Server start......");
        //Call the sync method to block and wait for the link of the server to close before the main function exits
        future.channel().closeFuture().sync();
        System.out.println("Server down......");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        //Graceful exit, release thread resources
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}
public static void main(String[] args) {

    new NettyServer(PORT).start();

}

}

public class NettyServerInitalizer extends ChannelInitializer {

@Override
protected void initChannel(SocketChannel ch) throws Exception {

    System.out.println("With client connection:"+ch.remoteAddress());

    ChannelPipeline channelPipeline = ch.pipeline();
    //Set decoder
    channelPipeline.addLast("decode",new StringDecoder());
    //Set encoder
    channelPipeline.addLast("encode",new StringEncoder());

    //Set processor for event response
    channelPipeline.addLast("handler",new NettyServerHandler());
}

}

public class NettyServerHandler extends SimpleChannelInboundHandler {

//Define channel group and instantiate a global instance
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
 * When connecting with the client, the handlerAdder will execute, record the channel of the client and join the queue
 * @param ctx
 * @param msg
 * @throws Exception
 */

public void handlerAdded(ChannelHandlerContext ctx, String msg) throws Exception {

    //Get client channel
    Channel currentClient = ctx.channel();
    //All client channels
    for(Channel channel:channels){
        if(channel!=currentClient){
            channel.writeAndFlush("[welcome"+currentClient.remoteAddress()+"]Enter chat room\n");
        }
    }
    //Queue the latest clients
    channels.add(currentClient);
}


/**
 * Triggered when the client has data written
 * @param ctx
 * @param msg
 * @throws Exception
 */
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

    //Get client channel
    Channel currentClient = ctx.channel();
    //All client channels
    for(Channel channel:channels){
        if(channel!=currentClient){
             channel.writeAndFlush("[User:"+currentClient.remoteAddress()+"say]"+msg
             +"\n");
        }else {
            channel.writeAndFlush("[I said:]"+msg +"\n");
        }
    }
    //Queue the latest clients
    channels.add(currentClient);
}

/**
 * Processing when the client leaves
 * @param ctx
 */
public void handlerRemoved(ChannelHandlerContext ctx){
    //Get client channel
    Channel leaveClient = ctx.channel();
    //Notify other clients to leave
    for(Channel channel:channels){
        if(leaveClient!=channel){
            channel.writeAndFlush("[bye:"+leaveClient.remoteAddress()+"]Leave the chat room\n");
        }
    }

}

/**
 * Current user online
 * @param ctx
 */
public void channelActive(ChannelHandlerContext ctx){

    Channel currentChannel = ctx.channel();
    System.out.println("["+currentChannel.remoteAddress()+"]on-line");
}

/**
 * Current user offline
 * @param ctx
 */
public void channelInactive(ChannelHandlerContext ctx){

    Channel currentChannel = ctx.channel();
    System.out.println("["+currentChannel.remoteAddress()+"]off-line");
}

/**
 * Client communication exception
 * @param ctx
 * @param throwable
 */
public void exceptionCaught(ChannelHandlerContext ctx,Throwable throwable){

    Channel currentChannel = ctx.channel();
    System.out.println("["+currentChannel.remoteAddress()+"]Abnormal communication");
    ctx.close();

}

}

//client side implementation

public class NettyClient {

private  String HOST;

private  int PORT;


public NettyClient(String host,int port){
    HOST = host;
    PORT = port;
}

public void start(){

    /**
     * Event loop group
     */
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    //Client start
    Bootstrap bootstrap = new Bootstrap();
    //Client add thread group
    bootstrap = bootstrap.group(eventLoopGroup);
    //Set client channel
    bootstrap = bootstrap.channel(NioSocketChannel.class);
    //Set up client event handler
    bootstrap.handler(new NettyClientInitalizer());

    try {
        //Connect to the server to obtain the client channel
        Channel channel = bootstrap.connect(HOST,PORT).sync().channel();
        //Get system input information
        BufferedReader input = new BufferedReader(new InputStreamReader(System.in));

        while (true){
            channel.writeAndFlush(input.readLine()+"\n");
        }
    } catch (InterruptedException | IOException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) {
    new NettyClient("localhost",8848).start();
}

}

public class NettyClientInitalizer extends ChannelInitializer {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

    System.out.println("Client start....");

    //socket information channel
    ChannelPipeline channelPipeline = socketChannel.pipeline();
    //Set decoder
    channelPipeline.addLast("decode",new StringDecoder());
    //Set encoder
    channelPipeline.addLast("encode",new StringEncoder());
    //Set up processor
    channelPipeline.addLast("handler",new NettyClientHandler());

}

}

public class NettyClientHandler extends SimpleChannelInboundHandler {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
}

}

AIO realizes chat room
Server
public class AIOServer {

private static final String LOCALHSOT="localhost";

private static final int DEFAULT_PORT=8848;
//Launch logo
private static final String QUIT="quit";

private static final int BUFFER_SIZE=1024;

private static final int THREADPOLL_SIZE = 8;
//Asynchronous channel group
private AsynchronousChannelGroup channelGroup;
//Asynchronous ServerSocketChannel is similar to ServerSocketChannel
private AsynchronousServerSocketChannel serverSocketChannel;
//Encoding format
private Charset charset = Charset.forName("UTF-8");
//Collection of connected clients
private List<ClientHandler> connectedClients;

private int port;


public AIOServer(){
    this(DEFAULT_PORT);
}

public AIOServer(int port){
    this.port = port;
    this.connectedClients = new ArrayList<>();
}

/**
 * Apply for exit
 * @param msg
 * @return
 */
private boolean readtToQuit(String msg){
    return QUIT.equals(msg);
}

/**
 * Close flow
 * @param closeable
 */
private void close(Closeable closeable){

    if(closeable!=null){

        try {
            closeable.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * New connected clients
  * @param clientHandler
 */

private synchronized void addClient(ClientHandler clientHandler){

    connectedClients.add(clientHandler);
    System.out.println("Server connected.");

}

private synchronized void removeClient(ClientHandler clientHandler){

  connectedClients.remove(clientHandler);

  System.out.println("Disconnected.");

}

/**
 *Decoding received messages
 * @param byteBuffer
 * @return
 */

private String receive(ByteBuffer byteBuffer){

   CharBuffer charBuffer = charset.decode(byteBuffer);
   return String.valueOf(charBuffer);

}

/**
 *Get name
 * @param clinetChannel
 * @return
 */

private String getClinetName(AsynchronousSocketChannel clinetChannel){

   int clientPort = -1;

   try {
       InetSocketAddress inetSocketAddress = (InetSocketAddress) clinetChannel.getRemoteAddress();
       port = inetSocketAddress.getPort();
   } catch (IOException e) {
       e.printStackTrace();
   }
   return "client["+port+"]";

}

/**
 *Send messages to other clients
 * @param clientChannel
 * @param msg
 */

private synchronized void dispatch(AsynchronousSocketChannel clientChannel,String msg){

   for(ClientHandler clientHandler:connectedClients){
       if(!clientHandler.clientChannel.equals(clientChannel)){
           ByteBuffer byteBuffe =
                   charset.encode(getClinetName(clientHandler.clientChannel)+":"+msg);
           clientHandler.clientChannel.write(byteBuffe,null,clientHandler);
       }

   }

}

/**
 * Handling after event completion
 */
private class ClientHandler implements CompletionHandler<Integer,Object> {

    private AsynchronousSocketChannel clientChannel;


    public ClientHandler(AsynchronousSocketChannel channel){

        this.clientChannel = channel;

    }

    @Override
    public void completed(Integer result,Object attachment){

        ByteBuffer buffer = (ByteBuffer) attachment;

        if(buffer!=null){//Read event sent
            if(result<=0){
                //Exception sent by client channel, removed from list
                //Remove the object from the list
               removeClient(this);
            }else {

                buffer.flip();//Convert to read
                String msg = receive(buffer);//Read message from buffer
                System.out.println(getClinetName(clientChannel)+":"+msg);
                //relay the message
                dispatch(clientChannel,msg);
                buffer.clear();//Situation buffer
                //sign out
                if(readtToQuit(msg)){
                    removeClient(this);
                }else {
                    clientChannel.read(buffer,buffer,this);
                }
            }
        }
    }
    @Override
    public void failed(Throwable exc,Object attachment){

        System.out.println("Read write failure:"+exc);

    }
}

//Server connection event handling
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{

    @Override
    public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {

        if(serverSocketChannel.isOpen()){//Determine whether the connection is open
                   serverSocketChannel.accept(null,this);
        }
        if(clientChannel!=null&&clientChannel.isOpen()){

            ClientHandler clientHandler = new ClientHandler(clientChannel);
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
            //Add customer to list
            addClient(clientHandler);

            clientChannel.read(byteBuffer,byteBuffer,clientHandler);

        }

    }

    @Override
    public void failed(Throwable exc, Object attachment) {
       System.out.println("Read write failure: "+exc);
    }

}

private void start(){
//Start a fixed size thread pool
ExecutorService executorService = Executors.newFixedThreadPool(THREADPOLL_SIZE);

   try {
       //Join thread pool to asynchronous channel for resource sharing
       channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
       //Asynchronous channel open
       serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
       serverSocketChannel.bind(new InetSocketAddress(LOCALHSOT,port));
       System.out.println("Service startup, listening port: "+port);
       while (true){
           serverSocketChannel.accept(null,new AcceptHandler());
           System.in.read();//Block calls, prevent system resources from being occupied, and call accept function uniformly
       }
   } catch (IOException e) {
       e.printStackTrace();
   }finally {
       close(serverSocketChannel);
   }

}

public static void main(String[] args) {
    AIOServer server = new AIOServer(8848);
    server.start();
}

}

client
public class AIOClient {

private static final String LOCALHSOT ="localhost";

private static final  int DEFAUT_PORT=8848;
//Exit ID
private static final String QUIT="quit";

private static final int BUFFER=1024;

private String hsot;

private int port;


private AsynchronousSocketChannel clientChannel;

private Charset charset = Charset.forName("UTF-8");


public AIOClient(){
    this(LOCALHSOT,DEFAUT_PORT);
}
public AIOClient(String host,int port){
     this.hsot = host;
     this.port = port;
}

//Close the flow and release the resources associated with it
private void close(Closeable closeable){

    if(closeable!=null){

        try {
            closeable.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//Determine whether the client is ready to exit
public boolean redatQuit(String msg){

    return QUIT.equals(msg);
}

private void start(){

    //Open connection channel
    try {
        clientChannel = AsynchronousSocketChannel.open();
        //Get a connection
        Future<Void> future = clientChannel.connect(new InetSocketAddress(hsot,port));
        try {
            future.get();//Blocking call
            //Start a new thread to process user input
            new Thread(new AioClientHandler(this)).start();
            //Application cache
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER);
            while (true){
                //Start the asynchronous read operation, and put the data read from the channel into the exchange area
                Future<Integer> readResult = clientChannel.read(byteBuffer);
               //Blocking results
                int result = readResult.get();
                if(result<=0){
                    System.out.println("Server communication is abnormal, disconnect");
                    close(clientChannel);
                    //Abnormal exit
                    System.exit(1);
                }else {
                      byteBuffer.flip();//Reset read position
                      //Message decoding
                      String msg = String.valueOf(charset.decode(byteBuffer));
                      byteBuffer.clear();
                      System.out.println(msg);
                }

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}
public  void send(String msg){
    if(msg.isEmpty()){
        return;
    }
    //Message encryption
    ByteBuffer byteBuffer = charset.encode(msg);
    //Block send message
    Future<Integer> writeResult = clientChannel.write(byteBuffer);

    try {
        //Get the result of sending message
        writeResult.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
        System.out.println("Message sending failed");
    } catch (ExecutionException e) {
        e.printStackTrace();
        System.out.println("Message sending failed");
    }
}

public static void main(String[] args) {
    AIOClient aioClient = new AIOClient();
    aioClient.start();
}

}

View results:

Keywords: socket network Netty iOS

Added by PW on Wed, 10 Jun 2020 06:42:48 +0300