preface
Java version 1.4 introduces a new IO API, which has the same function and purpose as the original IO; It can replace standard java IO, but the implementation method is different. NIO is a buffer oriented and channel based IO operation; NIO can improve the reading and writing of files. Based on this advantage, there are more and more scenarios using NIO. Many popular frameworks use NIO technology, such as Tomcat, Netty, Jetty, etc; Therefore, learning and mastering NIO technology is a necessary skill for java development.
1, IO and NIO
1. Stream oriented and buffer oriented
Reading and writing data in Java IO is * * Stream oriented (Stream) * *, which means that when we read data from the Stream and write data to the Stream, we also write it to the Stream. The meaning of the Stream is that there is no cache. It's like we stand in front of the pipeline, and all the data arrive in front of us successively along the pipeline. We can only read the current data. If we need to obtain the previous or later data of a data, we must cache the data ourselves Data, not directly from the Stream.
In Java NIO, data reading and writing is oriented to * * Buffer (Buffer) * *, when reading, the whole block of data can be read into the Buffer, and when writing, the data in the whole Buffer can be written together. This is like changing pipeline transmission into truck transportation. Stream oriented data reading and writing only provides a data stream interface, while Buffer oriented IO enables us to see the context of data, that is, in the Buffer It is very convenient to obtain the previous data or the latter data of a certain data in the area. This convenience comes at a price, because we must manage the Buffer well, including not allowing new data to overwrite the useful data in the Buffer that has not been processed; Block the data in the Buffer correctly, and distinguish what has been processed and what has not been processed, etc.
2. Blocking and non blocking
Traditional IO streams are blocking. That is, when a thread calls read() or write(), the thread is blocked until some data is read or written, and the thread cannot perform other tasks during this period. Therefore, when the network communication is completed for IO operation, because the thread will block, the server must provide an independent thread for each client to process. When the server needs to process a large number of clients, the performance drops sharply.
Java NIO is non blocking. When a thread reads and writes data from a channel, if no data is available, the thread can perform other tasks. Threads usually use the idle time of non blocking IO to perform IO operations on other channels, so a single thread can manage multiple input and output channels. Therefore, NIO can let the server use one or a limited number of threads to process all clients connected to the server at the same time.
2, TCP chat program
1. IO based
IO server:
import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class IOServer { @SuppressWarnings("resource") public static void main(String[] args) throws Exception { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); //Create a socket service and listen to port 8081 ServerSocket server=new ServerSocket(8081); System.out.println("Server startup!"); int count=0; while(true){ //Get a socket (blocking) final Socket socket = server.accept(); System.out.println("Welcome section"+(++count)+"A classmate"); newCachedThreadPool.execute(new Runnable() { @Override public void run() { //Business processing handler(socket); } }); } } //Read data public static void handler(Socket socket){ try { byte[] bytes = new byte[1024]; InputStream inputStream = socket.getInputStream(); while(true){ //Read data (blocking) int read = inputStream.read(bytes); if(read != -1){ System.out.println(new String(bytes, 0, read)); }else{ break; } } } catch (Exception e) { e.printStackTrace(); }finally{ try { System.out.println("socket close"); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
IO client:
import java.io.IOException; import java.io.OutputStream; import java.net.Socket; public class IOClient { public static void main(String[] args) throws IOException { //Send ten times for (int i=0;i<10;i++){ Socket socket=new Socket("127.0.0.1", 8081); //Write data OutputStream os=socket.getOutputStream(); os.write(("Royal Music"+i).getBytes()); //Release resources socket.close(); } } }
effect:
2. Based on NIO
NIO server:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { // Channel Manager private Selector selector; //Start server test public static void main(String[] args) throws IOException { NIOServer server = new NIOServer(); server.initServer(8081); server.listen(); } // Get a ServerSocket channel and do some initialization for the channel public void initServer(int port) throws IOException { // Get a ServerSocket channel ServerSocketChannel serverChannel = ServerSocketChannel.open(); // Set channel to non blocking serverChannel.configureBlocking(false); // Bind the ServerSocket corresponding to the channel to the port serverChannel.socket().bind(new InetSocketAddress(port)); // Get a channel manager this.selector = Selector.open(); // Bind the channel manager to the channel and register the selectionkey for the channel OP_ Accept event. After registering this event, // When the event arrives, the selector Select () will return if the event does not reach the selector Select() will always block. serverChannel.register(selector, SelectionKey.OP_ACCEPT); } //Use polling to monitor whether there are events to be processed on the selector. If so, process them public void listen() throws IOException { System.out.println("The server is started successfully!"); // Polling access selector while (true) { // When the registered event arrives, the method returns; Otherwise, the method will always block selector.select(); // Gets the iterator of the selected item in the selector. The selected item is the registered event Iterator<?> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // Delete the selected key to prevent repeated processing ite.remove(); handler(key); } } } //Processing requests public void handler(SelectionKey key) throws IOException { // Client request connection event if (key.isAcceptable()) { handlerAccept(key); // Get readable events } else if (key.isReadable()) { handelerRead(key); } } // Processing connection requests public void handlerAccept(SelectionKey key) throws IOException { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // Get the channel to connect with the client SocketChannel channel = server.accept(); // Set to non blocking channel.configureBlocking(false); // Here you can send information to the client System.out.println("New customer connection detected"); // After the connection with the client is successful, in order to receive the information from the client, you need to set the read permission for the channel. channel.register(this.selector, SelectionKey.OP_READ); } // Handling read events public void handelerRead(SelectionKey key) throws IOException { // The server can read the message: get the Socket channel of the event SocketChannel channel = (SocketChannel) key.channel(); // Create read buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); if(read > 0){ byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("The user name is:" + msg); //Write back data ByteBuffer outBuffer = ByteBuffer.wrap("Server received".getBytes()); channel.write(outBuffer);// Send the message back to the client }else{ System.out.println("Client shutdown"); key.cancel(); } } }
NIO client:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class NIOClient { public static void main(String[] args) throws Exception { final int count[]=new int[1]; count[0]=1; for(int i=0;i<5;i++){ new Thread(new Runnable() { @Override public void run() { SocketChannel socketChannel = null; //Data sent String str = "Royal Music"+count[0]++; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); //Accepted data ByteBuffer buffer = ByteBuffer.allocate(1024); try { //Establish connection socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081))) { //Waiting for connection while (!socketChannel.finishConnect()) { } } //Write data socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } //Automatically disconnect after 10s int time=1; while (time<10){ time++; try { //Read data int read=socketChannel.read(buffer); if(read > 0) { byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("Client received message:" + msg); } Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); Thread.sleep(100); } } }
Test effect:
The server:
client:
3. Based on Netty
Netty server:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class NettyServer { public static void main(String[] args) { //It is used to process the client connection received by the server NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); //Network communication (read-write) NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Auxiliary tool class, which is used for a series of configuration of server channels ServerBootstrap bootstrap = new ServerBootstrap(); //Bind two thread groups bootstrap.group(bossGroup,workerGroup) //Set the boss selector to create the object used by the channel .channel(NioServerSocketChannel.class) //Queue length of boss waiting for connection .option(ChannelOption.SO_BACKLOG,1024) //Processing message objects .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //Create pipe ChannelPipeline pipeline = ch.pipeline(); //Decoding mode pipeline.addLast("decoder",new StringDecoder()); //Coding mode pipeline.addLast("encoder",new StringEncoder()); //Custom processing message object pipeline.addLast(new ServerHandler()); } }); System.out.println("The server is starting"); //Binding port number ChannelFuture cf = bootstrap.bind(8083).sync(); cf.addListener(cd->{ if(cd.isSuccess()){ System.out.println("Start successful"); }else{ System.out.println("Startup failed"); } }); //The server sends messages to all clients Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String msg = scanner.nextLine(); ServerHandler.sendAll(msg); } //Block current thread cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.Date; public class ServerHandler extends SimpleChannelInboundHandler<String> { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+" == " +msg); channelGroup.forEach(ch->{ if (channel!=ch) { ch.writeAndFlush("[ client ]" + channel.remoteAddress() + "Sent a message : " + msg + "\n"); }else{ ch.writeAndFlush("[ customer ] Sent a message: " + msg + "\n"); } }); } //It is used by the server to send information to all clients public static void sendAll(String msg){ channelGroup.forEach(channel -> { channel.writeAndFlush("The server: "+msg+"\n"); }); } // Triggered when there is a new user connection public void channelActive(ChannelHandlerContext ctx){ Channel channel = ctx.channel(); channelGroup.writeAndFlush("[ client]"+channel.remoteAddress()+" Connected "+sf.format(new Date())+"\n"); //Add new connections channelGroup.add(channel); System.out.println(ctx.channel().remoteAddress()+" It's online" + "\n"); } //Triggered when the user disconnects public void channelInactive(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[ client ] " +channel.remoteAddress()+ " Disconnect"+"\n"); System.out.println(channel.remoteAddress()+" Offline.\n"); System.out.println("channelGroup size = "+ channelGroup.size()); } }
Netty client:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.ArrayList; import java.util.List; public class NettyClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); List<ChannelFuture> channelFutures = new ArrayList<ChannelFuture>(); try { Bootstrap bootstrap = new Bootstrap(); //The server can actively disconnect bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //Address multiplexing bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder",new StringDecoder()); pipeline.addLast("encoder",new StringEncoder()); pipeline.addLast(new ClientHandler()); } }); final int count[] =new int[1]; count[0]=0; for(int i=0;i<3;i++){ //add connections channelFutures.add(bootstrap.connect("127.0.0.1",8083).sync()); //New thread impersonation multi-user new Thread(new Runnable() { @Override public void run() { int index=count[0]++; //Get the corresponding pipe Channel channel = channelFutures.get(index).channel(); System.out.println( "======"+channel.localAddress()+"======"); int time=0; while (time++<3){ //send data String msg =" Royal Music "+(index)+": "+time; channel.writeAndFlush(msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } //Close connection channel.close(); } }).start(); } //Block the main thread, otherwise it will execute finally and close EventLoopGroup directly int time=0; while (time++<5){ Thread.sleep(1000); } } finally { //Close EventLoopGroup group.shutdownGracefully(); } } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg.trim()); } }
demonstration
Server:
client
reference resources
https://blog.csdn.net/linjpg/article/details/80962453
https://blog.csdn.net/qq_47281915/article/details/121802536