The beginning of 2020 is extraordinary. I will stay at home and study io again
io is divided into bio,nio,aio
Among them, netty belongs to nio, and aio has not been widely used, so it has great potential in the future
Understand several concepts
Synchronous and asynchronous
Blocking and non blocking
io multiplexing
I/O multiplexing, I/O refers to network I/O, multiplexing refers to multiple TCP connections (i.e. socket or channel), multiplexing refers to multiplexing one or several threads.
Simply put: the biggest advantage of using one or more threads to handle multiple TCP connections is that it reduces the system overhead and does not need to create too many processes / threads or maintain them
Reactor mode: Based on event driven, common applications include redis,netty, etc
epoll
1) Without the limitation of FD, the upper limit of FD supported is the maximum number of file handles of the operating system. 1G memory supports about 100000 handles
2) Efficiency improvement, using callback notification instead of polling will not decrease with the increase of FD number
3) Through the callback mechanism, the kernel and user space mmap are implemented in the same block of memory
First, the simple interaction process of io is illustrated
Stand alone internal io
From the flow chart, it can be seen that the external interaction is first that the socket enters through the network card, through the corresponding port, then enters the kernel space through the data preparation stage, and then the kernel space is copied to the user space through the data, and then the application program processes it. Other and related to synchronous and asynchronous, blocking and non blocking understanding, this will not be introduced.
This figure is the calling process of blocking io
Network io model
java.io Model
BIO: main network io programming mode before jdk1.4, synchronous blocking
BIO chat room
TalkServerBIO bio server
//bio implementation of chat server
public class TalkServerBIO {
//Client list
public static List userList = new ArrayList<>();
public static void main(String[] args) throws IOException { //Server port settings ServerSocket serverSocket = new ServerSocket(9999); System.out.println("Beauty chat room waiting for your connection......"); while (true){ Socket socket = serverSocket.accept(); System.out.println("Dear customers welcome you"); MyChannel myChannel = new MyChannel(socket); new Thread(myChannel).start(); userList.add(myChannel); } }
}
TalkClientBIO bio client implementation
//The realization of bio in chat client
@Slf4j
public class TalkClientBIO {
private static File file = new File("Client.propertise"); private static Properties properties = new Properties(); private static String HOST = ""; private static int PORT =0; //Initialize port startup information static { if(!file.exists()){ try { file.createNewFile(); FileOutputStream fos = new FileOutputStream(file); properties.setProperty("hostname","localhost"); properties.setProperty("port","9999"); properties.store(fos,null); fos.close(); } catch (IOException e) { log.error("Failed to create file"); e.printStackTrace(); } } try { properties.load(new FileInputStream(file)); HOST = properties.getProperty("hostname"); PORT = Integer.parseInt(properties.getProperty("port")); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { try { Socket socket = new Socket(HOST,PORT); Scanner input = new Scanner(System.in); System.out.println("The client has started. Please enter your nickname: "); String name = input.next(); System.out.println("Has entered the chat room!"); new Thread(new SendUtil(socket),name).start(); new Thread(new ReceiveUtil(socket),name).start(); } catch (IOException e) { e.printStackTrace(); } }
}
Implementation of MyChannel message channel
//Realization of pipeline communication on server server
@Slf4j
public class MyChannel implements Runnable {
//Input stream private DataInputStream dis; //Output stream private DataOutputStream dos; //Pipeline closure sign private boolean isStop = Boolean.FALSE; /** * Constructor gets connection related information * @param client */ public MyChannel(Socket client){ try { dis = new DataInputStream(client.getInputStream()); dos = new DataOutputStream(client.getOutputStream()); } catch (IOException e) { e.printStackTrace(); log.info("Pipeline message exception{}",client.getInetAddress()); colse(); } } @Override public void run() { while (!isStop){ sendOther(); } } /** * * Get message * @return */ public String receiveMsg(){ String msg =""; try { msg = dis.readUTF(); System.out.println(Thread.currentThread().getName()+":"+msg); } catch (IOException e) { colse(); TalkServerBIO.userList.remove(this); } return msg; } public void sendMsg(String s){ if(!StringUtils.isEmpty(s)){ try { dos.writeUTF(s); dos.flush(); } catch (IOException e) { e.printStackTrace(); isStop = true; colse(); } } } /** * Send messages to other clients */ public void sendOther(){ String s = this.receiveMsg(); List<MyChannel> userList = TalkServerBIO.userList; for(MyChannel myChannel:userList){ if(this == myChannel){ continue; } //Send messages to others myChannel.sendMsg(s); } } /** * Close connection */ public void colse(){ isStop = true; if(null!=dis){ try { dis.close(); } catch (IOException ex) { ex.printStackTrace(); } } if(null!=dos){ try { dos.close(); } catch (IOException ex) { ex.printStackTrace(); } } }
}
//Send message tool class
public class SendUtil implements Runnable {
private DataOutputStream dos = null; private boolean isStop = Boolean.FALSE; private BufferedReader br = null; public SendUtil(Socket socket){ br = new BufferedReader(new InputStreamReader(System.in)); try { dos = new DataOutputStream(socket.getOutputStream()); } catch (IOException e) { e.printStackTrace(); colse(); } } /** * send message */ public void sendMessage(){ try { dos.writeUTF(Thread.currentThread().getName()+"-->Say:"+br.readLine()); dos.flush(); } catch (IOException e) { colse(); e.printStackTrace(); } } @Override public void run() { while (!isStop){ this.sendMessage(); } } /** * Close connection */ public void colse(){ isStop = true; if(null!=dos){ try { dos.close(); } catch (IOException ex) { ex.printStackTrace(); } } }
}
//Get message tool class
public class ReceiveUtil implements Runnable {
private DataInputStream ios = null; private boolean isStop = Boolean.FALSE; private BufferedReader br = null; public ReceiveUtil(Socket socket){ try { ios = new DataInputStream(socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); colse(); } } public void receiveMessage(){ String msg =""; try { msg = ios.readUTF(); System.out.println(msg); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { while (!isStop){ this.receiveMessage(); } } /** * Close connection */ public void colse(){ isStop = true; if(null!=ios){ try { ios.close(); } catch (IOException ex) { ex.printStackTrace(); } } }
}
Three clients and one server are opened locally to realize online chat of multiple people
It can be seen from this example that every new client will start a new thread on the server side, so when a large number of clients request, there will be insufficient use of server thread resources, resulting in service unavailability. Even using thread pool can not change the fundamental problem. If the corresponding thread of the client has not read or write operations, the thread is always blocked, which is a great waste of resources.
NIO model appears, after jdk1.4, the characteristic synchronization is non blocking
nio chat room implementation
//Server
@Slf4j
public class TalkServerNIO {
//Define port number
private static int PORT = 8848;
//For character set decoding
private Charset charset = Charset.forName("UTF-8");
//Buffer for receiving data
private ByteBuffer rBuffer = ByteBuffer.allocate(1024);
//Buffer for sending data
private ByteBuffer sBuffer = ByteBuffer.allocate(1024);
//Used to store the client socketchannel collection
private Map<String, SocketChannel> clientMap = new HashMap<>();
//Used to listen for channel events
private static Selector selector;
public static void main(String[] args) { TalkServerNIO talkServerNIO = new TalkServerNIO(8848); talkServerNIO.listen(); } public TalkServerNIO(int port) { PORT = port; try { init(); } catch (Exception e) { e.printStackTrace(); log.error("Server initialization exception", e); } } //Initialize server, register listening events private void init() throws IOException { //Open a serverSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //Channel set to non blocking serverSocketChannel.configureBlocking(false); //Get the serverSocket of the channel ServerSocket serverSocket = serverSocketChannel.socket(); //Binding port number serverSocket.bind(new InetSocketAddress(PORT)); //Open listen selector selector = Selector.open(); //Register connections to listen for events on selectors serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Server starts, port number is:" + PORT); } //The server polls and listens, and the select method will block until the relevant event is sent or timed out public void listen() { while (true) { try { // log.info("waiting for connection request....."); selector.select();//The return value is the number of events triggered this time //Get event collection Set<SelectionKey> selectionKeys = selector.selectedKeys(); //Handling events selectionKeys.forEach(selectionKey -> handle(selectionKey)); selectionKeys.clear();//Clean up what you've dealt with } catch (IOException e) { e.printStackTrace(); } } } //Handling corresponding events private void handle(SelectionKey selectionKey) { // log.info("there is a request to come in, need to deal with..."); //Connection requested by client if (selectionKey.isAcceptable()) { //Get the channel of the connection request ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); try { SocketChannel clent = server.accept(); clent.configureBlocking(false); clent.register(selector, SelectionKey.OP_READ); clientMap.put(getClientName(clent), clent); } catch (IOException e) { e.printStackTrace(); } } //Client law message else if (selectionKey.isReadable()) { //Client channel SocketChannel client = (SocketChannel) selectionKey.channel(); //Clear read cache rBuffer.clear(); try { int bytes = client.read(rBuffer); if (bytes > 0) { //Set the switch to read data from the initial position of the zone rBuffer.flip(); //Decoding buffer data String receiveText = String.valueOf(charset.decode(rBuffer)); System.out.println(client.toString() + ":" + receiveText); dispatch(client, receiveText); } } catch (IOException e) { e.printStackTrace(); } } } //Forward the message to other clients, which is equivalent to the bio heavy sendOther private void dispatch(SocketChannel clinet, String info) { if (!clientMap.isEmpty()) { for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) { SocketChannel temp = entry.getValue(); if (!clinet.equals(temp)) { sBuffer.clear(); sBuffer.put(charset.encode(getClientName(clinet) + ":" + info)); sBuffer.flip(); try { temp.write(sBuffer); } catch (IOException e) { e.printStackTrace(); } } } } } //Generate client name or nickname private String getClientName(SocketChannel client) { Socket socket = client.socket(); return "[" + socket.getInetAddress().toString().substring(1) + ":" + Integer.toHexString(client.hashCode()) + "]"; }
}
Implementation of client
@Slf4j
public class TalkClientNIO {
//server address private InetSocketAddress SERVER = null; //Buffer for receiving data private ByteBuffer rBuffer = ByteBuffer.allocate(1024); //Buffer for sending data private ByteBuffer sBuffer = ByteBuffer.allocate(1024); //Used to listen for channel events private static Selector selector; //For encoding or decoding private Charset charset = Charset.forName("UTF-8"); public TalkClientNIO(int port){ SERVER = new InetSocketAddress("127.0.0.1",port); //Initialize client init(); } //Initialize client private void init(){ try { //Open SocketChannel SocketChannel socketChannel = SocketChannel.open(); //Nonblocking socketChannel.configureBlocking(false); //Open monitoring channel selector = Selector.open(); //Register the channel on the selector and listen for connection events socketChannel.register(selector, SelectionKey.OP_CONNECT); //Connect socketChannel.connect(SERVER); while (true){ //Call the select method to select the channel with event call selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); //Processing selectionKeys.forEach(selectionKey -> handle(selectionKey)); selectionKeys.clear();//Clean up processed events } } catch (IOException e) { e.printStackTrace(); } } public void handle(SelectionKey selectionKey){ //Connection ready event if(selectionKey.isConnectable()){ //Get channel s with connection events SocketChannel client = (SocketChannel) selectionKey.channel(); //If connecting status if(client.isConnectionPending()){ try { //Indicates successful connection client.finishConnect(); System.out.println("Complete the connection with the server..."); //Start thread listens for client input new Thread(new Runnable() { @Override public void run() { while (true){ sBuffer.clear(); Scanner scanner = new Scanner(System.in); String sendText = scanner.next(); sBuffer.put(charset.encode(sendText)); sBuffer.flip(); try { client.write(sBuffer); }catch (Exception e){ log.error("Input message exception..."); } } } }).start(); } catch (IOException e) { e.printStackTrace(); } try { //Register readable events client.register(selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } } //Read events have messages sent from the server, read and input to the screen else if(selectionKey.isReadable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); rBuffer.clear(); try { int count = socketChannel.read(rBuffer); if(count>0){ String receiveText = new String(rBuffer.array(),0,count); System.out.println(receiveText); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { TalkClientNIO talkClientNI = new TalkClientNIO(8848); }
}
The model of nio's Reactor
Reactor can be divided into two parts:
Part of it is the Reactor itself, which handles client connections and requests
Part of it is io processing
1. Single Reactor single thread
All requests are answered by the main thread
Client sends request: the main thread multiplexes, listens for connection accept from the selector, registers the client to get the connection, reads the connection, writes the connection, and reads the data from the server
2. Single Reactor multithreading
Use thread pool to process business io
3. Master slave Reactor multithreading
Disadvantages of java nio
1. It is very difficult to learn. You need to master the skills of socketchannel,buffer, etc
2. Experience is needed to develop complex and many business processes, such as network interruption, network exception, etc
3. The infamous epoll null polling bug causes system failure
netty is a high-performance network communication framework, developed and maintained by jboss company.
Thread model: improvement of master-slave Reactor based on Multithreading
Efficient zero copy technology:
Two implementation methods of mmp and sendfile
4 copies and 3 context switches are required for read and write operations in the normal process
After the improvement of mmp, there are three copies and three context switches
sendfile copies three times before linux 2.4 and switches context twice
sendfile copies twice after Linux 2.4 and switches up and down twice
Achieve true zero copy: the so-called zero copy refers to the memory copy participated by cpu
dma copy is inevitable. dma refers to directory memory access
netty chat room practice
NettyServer
public class NettyServer {
private static int PORT = 8848; public NettyServer(int port){ PORT = port; } public void start() { //The default thread size of thread group is cup*2, which can be set according to the actual situation. If the request concurrency is not high, io processing is time-consuming, //You can set less boss group and less worker group //Group of processing threads receiving client connections //Get the number of cpu cores at the bottom NettyRuntime.availableProcessors() * 2 System.out.println("cpu number:"+NettyRuntime.availableProcessors()); EventLoopGroup bossGroup = new NioEventLoopGroup(); //The working thread group is mainly responsible for the network io read-write interaction EventLoopGroup workerGroup = new NioEventLoopGroup(); //netty is used to start the boot class of nio server, reducing the development complexity ServerBootstrap bootstrap = new ServerBootstrap(); //AbstractBootstrap setting bossGroup //Important concept chain programming thread group bootstrap = bootstrap.group(bossGroup,workerGroup); //Set channel mode bootstrap = bootstrap.channel(NioServerSocketChannel.class); //Set callback request for processor bootstrap = bootstrap.childHandler(new NettyServerInitalizer()); //Set the tcp parameters of NioServerSocketChannel // bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG,1024); // bootstrap = bootstrap.option(ChannelOption.SO_KEEPALIVE,true); //Bind the listening port, call the sync synchronization blocking method, wait for the binding operation to complete, and then return to channelfailure try { ChannelFuture future = bootstrap.bind(PORT).sync(); System.out.println("Server start......"); //Call the sync method to block and wait for the link of the server to close before the main function exits future.channel().closeFuture().sync(); System.out.println("Server down......"); } catch (InterruptedException e) { e.printStackTrace(); }finally { //Graceful exit, release thread resources workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { new NettyServer(PORT).start(); }
}
public class NettyServerInitalizer extends ChannelInitializer {
@Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("With client connection:"+ch.remoteAddress()); ChannelPipeline channelPipeline = ch.pipeline(); //Set decoder channelPipeline.addLast("decode",new StringDecoder()); //Set encoder channelPipeline.addLast("encode",new StringEncoder()); //Set processor for event response channelPipeline.addLast("handler",new NettyServerHandler()); }
}
public class NettyServerHandler extends SimpleChannelInboundHandler {
//Define channel group and instantiate a global instance public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * When connecting with the client, the handlerAdder will execute, record the channel of the client and join the queue * @param ctx * @param msg * @throws Exception */ public void handlerAdded(ChannelHandlerContext ctx, String msg) throws Exception { //Get client channel Channel currentClient = ctx.channel(); //All client channels for(Channel channel:channels){ if(channel!=currentClient){ channel.writeAndFlush("[welcome"+currentClient.remoteAddress()+"]Enter chat room\n"); } } //Queue the latest clients channels.add(currentClient); } /** * Triggered when the client has data written * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //Get client channel Channel currentClient = ctx.channel(); //All client channels for(Channel channel:channels){ if(channel!=currentClient){ channel.writeAndFlush("[User:"+currentClient.remoteAddress()+"say]"+msg +"\n"); }else { channel.writeAndFlush("[I said:]"+msg +"\n"); } } //Queue the latest clients channels.add(currentClient); } /** * Processing when the client leaves * @param ctx */ public void handlerRemoved(ChannelHandlerContext ctx){ //Get client channel Channel leaveClient = ctx.channel(); //Notify other clients to leave for(Channel channel:channels){ if(leaveClient!=channel){ channel.writeAndFlush("[bye:"+leaveClient.remoteAddress()+"]Leave the chat room\n"); } } } /** * Current user online * @param ctx */ public void channelActive(ChannelHandlerContext ctx){ Channel currentChannel = ctx.channel(); System.out.println("["+currentChannel.remoteAddress()+"]on-line"); } /** * Current user offline * @param ctx */ public void channelInactive(ChannelHandlerContext ctx){ Channel currentChannel = ctx.channel(); System.out.println("["+currentChannel.remoteAddress()+"]off-line"); } /** * Client communication exception * @param ctx * @param throwable */ public void exceptionCaught(ChannelHandlerContext ctx,Throwable throwable){ Channel currentChannel = ctx.channel(); System.out.println("["+currentChannel.remoteAddress()+"]Abnormal communication"); ctx.close(); }
}
//client side implementation
public class NettyClient {
private String HOST; private int PORT; public NettyClient(String host,int port){ HOST = host; PORT = port; } public void start(){ /** * Event loop group */ EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //Client start Bootstrap bootstrap = new Bootstrap(); //Client add thread group bootstrap = bootstrap.group(eventLoopGroup); //Set client channel bootstrap = bootstrap.channel(NioSocketChannel.class); //Set up client event handler bootstrap.handler(new NettyClientInitalizer()); try { //Connect to the server to obtain the client channel Channel channel = bootstrap.connect(HOST,PORT).sync().channel(); //Get system input information BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); while (true){ channel.writeAndFlush(input.readLine()+"\n"); } } catch (InterruptedException | IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new NettyClient("localhost",8848).start(); }
}
public class NettyClientInitalizer extends ChannelInitializer {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { System.out.println("Client start...."); //socket information channel ChannelPipeline channelPipeline = socketChannel.pipeline(); //Set decoder channelPipeline.addLast("decode",new StringDecoder()); //Set encoder channelPipeline.addLast("encode",new StringEncoder()); //Set up processor channelPipeline.addLast("handler",new NettyClientHandler()); }
}
public class NettyClientHandler extends SimpleChannelInboundHandler {
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); }
}
AIO realizes chat room
Server
public class AIOServer {
private static final String LOCALHSOT="localhost"; private static final int DEFAULT_PORT=8848; //Launch logo private static final String QUIT="quit"; private static final int BUFFER_SIZE=1024; private static final int THREADPOLL_SIZE = 8; //Asynchronous channel group private AsynchronousChannelGroup channelGroup; //Asynchronous ServerSocketChannel is similar to ServerSocketChannel private AsynchronousServerSocketChannel serverSocketChannel; //Encoding format private Charset charset = Charset.forName("UTF-8"); //Collection of connected clients private List<ClientHandler> connectedClients; private int port; public AIOServer(){ this(DEFAULT_PORT); } public AIOServer(int port){ this.port = port; this.connectedClients = new ArrayList<>(); } /** * Apply for exit * @param msg * @return */ private boolean readtToQuit(String msg){ return QUIT.equals(msg); } /** * Close flow * @param closeable */ private void close(Closeable closeable){ if(closeable!=null){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * New connected clients * @param clientHandler */
private synchronized void addClient(ClientHandler clientHandler){
connectedClients.add(clientHandler); System.out.println("Server connected.");
}
private synchronized void removeClient(ClientHandler clientHandler){
connectedClients.remove(clientHandler); System.out.println("Disconnected.");
}
/** *Decoding received messages * @param byteBuffer * @return */
private String receive(ByteBuffer byteBuffer){
CharBuffer charBuffer = charset.decode(byteBuffer); return String.valueOf(charBuffer);
}
/** *Get name * @param clinetChannel * @return */
private String getClinetName(AsynchronousSocketChannel clinetChannel){
int clientPort = -1; try { InetSocketAddress inetSocketAddress = (InetSocketAddress) clinetChannel.getRemoteAddress(); port = inetSocketAddress.getPort(); } catch (IOException e) { e.printStackTrace(); } return "client["+port+"]";
}
/** *Send messages to other clients * @param clientChannel * @param msg */
private synchronized void dispatch(AsynchronousSocketChannel clientChannel,String msg){
for(ClientHandler clientHandler:connectedClients){ if(!clientHandler.clientChannel.equals(clientChannel)){ ByteBuffer byteBuffe = charset.encode(getClinetName(clientHandler.clientChannel)+":"+msg); clientHandler.clientChannel.write(byteBuffe,null,clientHandler); } }
}
/** * Handling after event completion */ private class ClientHandler implements CompletionHandler<Integer,Object> { private AsynchronousSocketChannel clientChannel; public ClientHandler(AsynchronousSocketChannel channel){ this.clientChannel = channel; } @Override public void completed(Integer result,Object attachment){ ByteBuffer buffer = (ByteBuffer) attachment; if(buffer!=null){//Read event sent if(result<=0){ //Exception sent by client channel, removed from list //Remove the object from the list removeClient(this); }else { buffer.flip();//Convert to read String msg = receive(buffer);//Read message from buffer System.out.println(getClinetName(clientChannel)+":"+msg); //relay the message dispatch(clientChannel,msg); buffer.clear();//Situation buffer //sign out if(readtToQuit(msg)){ removeClient(this); }else { clientChannel.read(buffer,buffer,this); } } } } @Override public void failed(Throwable exc,Object attachment){ System.out.println("Read write failure:"+exc); } }
//Server connection event handling
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
@Override public void completed(AsynchronousSocketChannel clientChannel, Object attachment) { if(serverSocketChannel.isOpen()){//Determine whether the connection is open serverSocketChannel.accept(null,this); } if(clientChannel!=null&&clientChannel.isOpen()){ ClientHandler clientHandler = new ClientHandler(clientChannel); ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE); //Add customer to list addClient(clientHandler); clientChannel.read(byteBuffer,byteBuffer,clientHandler); } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("Read write failure: "+exc); }
}
private void start(){
//Start a fixed size thread pool
ExecutorService executorService = Executors.newFixedThreadPool(THREADPOLL_SIZE);
try { //Join thread pool to asynchronous channel for resource sharing channelGroup = AsynchronousChannelGroup.withThreadPool(executorService); //Asynchronous channel open serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup); serverSocketChannel.bind(new InetSocketAddress(LOCALHSOT,port)); System.out.println("Service startup, listening port: "+port); while (true){ serverSocketChannel.accept(null,new AcceptHandler()); System.in.read();//Block calls, prevent system resources from being occupied, and call accept function uniformly } } catch (IOException e) { e.printStackTrace(); }finally { close(serverSocketChannel); }
}
public static void main(String[] args) { AIOServer server = new AIOServer(8848); server.start(); }
}
client
public class AIOClient {
private static final String LOCALHSOT ="localhost"; private static final int DEFAUT_PORT=8848; //Exit ID private static final String QUIT="quit"; private static final int BUFFER=1024; private String hsot; private int port; private AsynchronousSocketChannel clientChannel; private Charset charset = Charset.forName("UTF-8"); public AIOClient(){ this(LOCALHSOT,DEFAUT_PORT); } public AIOClient(String host,int port){ this.hsot = host; this.port = port; } //Close the flow and release the resources associated with it private void close(Closeable closeable){ if(closeable!=null){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } //Determine whether the client is ready to exit public boolean redatQuit(String msg){ return QUIT.equals(msg); } private void start(){ //Open connection channel try { clientChannel = AsynchronousSocketChannel.open(); //Get a connection Future<Void> future = clientChannel.connect(new InetSocketAddress(hsot,port)); try { future.get();//Blocking call //Start a new thread to process user input new Thread(new AioClientHandler(this)).start(); //Application cache ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER); while (true){ //Start the asynchronous read operation, and put the data read from the channel into the exchange area Future<Integer> readResult = clientChannel.read(byteBuffer); //Blocking results int result = readResult.get(); if(result<=0){ System.out.println("Server communication is abnormal, disconnect"); close(clientChannel); //Abnormal exit System.exit(1); }else { byteBuffer.flip();//Reset read position //Message decoding String msg = String.valueOf(charset.decode(byteBuffer)); byteBuffer.clear(); System.out.println(msg); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } public void send(String msg){ if(msg.isEmpty()){ return; } //Message encryption ByteBuffer byteBuffer = charset.encode(msg); //Block send message Future<Integer> writeResult = clientChannel.write(byteBuffer); try { //Get the result of sending message writeResult.get(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("Message sending failed"); } catch (ExecutionException e) { e.printStackTrace(); System.out.println("Message sending failed"); } } public static void main(String[] args) { AIOClient aioClient = new AIOClient(); aioClient.start(); }
}
View results: