s1.NIO overview
First of all, we have reached a consensus that when we discuss the IO model, it is currently aimed at network programming, because network programming has large-scale connections, IO input and output. The purpose of NIO is to solve the pain point of a large number of threads caused by a large number of connections in the BIO network programming model.
IO models generally include three types
BIO, synchronous blocking IO, refer to the thread model of BIO in the figure below
NIO, synchronous non blocking IO
Added after fully understanding NIO threading model
AIO, asynchronous non blocking IO
2.NIO core API
2.1Channel
Differences between channel and IO stream:
channel can be read or written. IO stream input and output are different streams, and the output must be synchronized after input
channel reading and writing can be executed in different threads, that is, asynchronous execution, which is the biggest difference
Public API
//Read data from channel to buffer channel - > buffer
int read(ByteBuffer var1);
//Write data from buffer to channel} buffer - > Channel
int write(ByteBuffer var1);
//Close channel
final void close() throws IOException;
1.ServerSocketChannel
//Open a server channel
static ServerSocketChannel open() throws IOException;
//Bind a network address to the current server channel to listen
final ServerSocketChannel bind(SocketAddress var1) throws IOException;
//Listen to the socket connection of the client. This method is blocked by default. If configureBlocking is set to false, it will not be blocked
SocketChannel accept();
//If false is set, NIO is in non blocking mode. By default, true is in blocking mode like BIO
final SelectableChannel configureBlocking(boolean var1) throws IOException;
//Register the current channel with selecor. var2 represents the registered event type. var3 can attach any object
final SelectionKey register(Selector var1, int var2, Object var3);
//Gets the network address monitored by the current server channel
SocketAddress getLocalAddress() throws IOException;
//Blocking mode
boolean isBlocking();
//Is it registered with a selector
boolean isRegistered();
Code example:
public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 9991; ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(host, port)); serverSocketChannel.configureBlocking(false); SocketAddress localAddress = serverSocketChannel.getLocalAddress(); System.out.println(serverSocketChannel.isBlocking()); System.out.println(serverSocketChannel.isRegistered()); System.out.println(localAddress); SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println(socketChannel); }
2.SocketChannel
//Open a channel
static SocketChannel open() throws IOException;
//Bind a network address listener for the current channel
SocketChannel bind(SocketAddress local) throws IOException;
//Establish a connection with the server channel. This method is blocked by default. If configureBlocking is set to false, it will not be blocked
boolean connect(SocketAddress remote) throws IOException;
//The non blocking mode is called. This method is used to judge whether the connection is established successfully. The internal of this method is the blocking mode. The significance of this method is that there is a certain delay after initiating the connection establishment request. During this period, you can do some things. When things are done, call this method to judge whether the connection is established
boolean finishConnect() throws IOException;
//This method internally only returns a status ID whether it is connected or not
boolean isConnected();
//If false is set, NIO is in non blocking mode. By default, true is in blocking mode like BIO
final SelectableChannel configureBlocking(boolean var1) throws IOException;
//Open a channel and establish a connection with the server channel. The process of establishing a connection is blocking mode
static SocketChannel open(SocketAddress var0) throws IOException;
//Get the network address of the current channel listening
SocketAddress getLocalAddress() throws IOException;
//Get the network address of the server
SocketAddress getRemoteAddress() throws IOException;
Code example:
public static void main(String[] args) throws IOException { String host = "127.0.0.1"; int port = 9992; String serverHost = "127.0.0.1"; int serverPort = 18001; SocketChannel socketChannel = SocketChannel.open(); socketChannel.bind(new InetSocketAddress(host, port)); socketChannel.configureBlocking(false); boolean connected = socketChannel.connect(new InetSocketAddress(serverHost, serverPort)); System.out.println(connected); System.out.println(socketChannel.isConnected()); System.out.println(socketChannel.finishConnect()); }
3.FileChannel
//Open the IO channel of a disk file. OpenOption is the operation type of the channel, including read, write and delete
static FileChannel open(Path var0, OpenOption... var1);
//Byte size of data in channel
long size();
//Map the data in the channel to the buffer
MappedByteBuffer map(MapMode var1, long var2, long var4);
//copy the data in the current channel to the target channel var5
long transferTo(long var1, long var3, WritableByteChannel var5);
//copy the data in the target channel var1 to the current channel
long transferFrom(ReadableByteChannel var1, long var2, long var4);
Code example:
public static void copyFile() { FileChannel source = null; FileChannel target = null; try { source = FileChannel.open(Paths.get("C:\\Users\\Lenovo\\Desktop\\shell.txt")); target = FileChannel.open(Paths.get("C:\\Users\\Lenovo\\Desktop\\shell2.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); source.transferTo(0, source.size(), target); } catch (Exception e) { e.printStackTrace(); } finally { try { source.close(); target.close(); } catch (IOException e) { e.printStackTrace(); } } }
2.2ByteBuffer
Several core attributes in Buffer
private int position = 0; / / the current operation bit will be + 1 regardless of reading or writing
//limit indicates that the current operation, regardless of the number of read and write operations, is < = capacity when the buffer is built for the first time
private int limit;
private int capacity; // Capacity. When creating a buffer, you must specify the capacity, that is, the maximum limit for each read and write
//Create a byte buffer with a capacity of capacity
static ByteBuffer allocate(int capacity);
//First, create an equal volume cache, and then write the byte array to the buffer
static ByteBuffer wrap(byte[] var0);
//Get a byte
byte get();
//Add a byte
ByteBuffer put(byte var1);
//Return the byte array in the buffer directly. Note that it is not a deep clone
final byte[] array();
//Is there any unread data in the buffer
final boolean hasRemaining();
//Each time you read data from the buffer, flip it first. Because there must be a write before reading, you need to set position to 0 and limit to the position written during writing
public final Buffer flip() {
this.limit = this.position;
this.position = 0;
this.mark = -1;
return this;
}
//After reading data from the buffer every time, you must clear it, set position to 0 and limit to capacity again, and then you may continue to write data to the buffer
public final Buffer clear() {
this.position = 0;
this.limit = this.capacity;
this.mark = -1;
return this;
}
Code example:
Note: in the following code, when read==-1, it means that the channel providing data has been disconnected, read==0 means that the channel data has been read, and read > 0 means that the data can be read from the channel
public static String readBuffer(SocketChannel channel, ByteBuffer buffer) throws IOException { List<Byte> list = new ArrayList<>(); int read = channel.read(buffer); while (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { list.add(buffer.get()); } buffer.clear(); read = channel.read(buffer); } if (read == -1) { throw new ClosedChannelException(); } byte[] bytes = new byte[list.size()]; for (int i = 0; i < bytes.length; i++) { bytes[i] = list.get(i); } return new String(bytes); }
2.3Selector
//Get all event key s
public abstract Set<SelectionKey> keys();
//Get all the prepared event keys. Pay attention to the difference from the keys method
public abstract Set<SelectionKey> selectedKeys();
//Number of non blocking return ready events
public abstract int selectNow() throws IOException;
//Number of ready events returned by blocking var1 MS
public abstract int select(long var1) throws IOException;
//Return after blocking to event ready
public abstract int select() throws IOException;
2.4SelectionKey
//Channels registered on events
SelectableChannel channel();
//The selector where the current event is located
Selector selector();
//Cancel the event. Note that this method and remove from the keys of the selector must be executed before the event can be truly cancelled
void cancel();
//Attach an object
final Object attach(Object var1);
//Get attached objects
final Object attachment();
final boolean isReadable(); / / read event
final boolean isWritable(); / / write event
final boolean isAcceptable(); / / receive connection events
final boolean isConnectable(); // Connection event
3. NIO simulates sending a request to the server and receiving a response code example
1.buffer tool class
public class BufferUtils { public static String readBuffer(SocketChannel channel, ByteBuffer buffer) throws IOException { return readBuffer(channel, buffer, false); } /** * @Description: Read data from channel * @param channel A SocketChannel channel * @param buffer Buffer object * @param checkChannelClosed Check whether the channel is disconnected * @return Returns the read data string * @throws IOException */ public static String readBuffer(SocketChannel channel, ByteBuffer buffer, boolean checkChannelClosed) throws IOException { List<Byte> list = new ArrayList<>(); int read = channel.read(buffer); while (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { list.add(buffer.get()); } buffer.clear(); read = channel.read(buffer); } if (read == -1 && checkChannelClosed) { throw new ClosedChannelException(); } byte[] bytes = new byte[list.size()]; for (int i = 0; i < bytes.length; i++) { bytes[i] = list.get(i); } return new String(bytes); } }
2.Server side
public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 9991; Server server = Server.createServer(host, port); server.start(1000, 64); } // NIOServer object static class Server { // serverSocketChannel private ServerSocketChannel serverSocketChannel; /** * @Description: Create a NIOServer instance * @param host * @param port * @return * @throws Exception */ public static Server createServer(String host, int port) throws IOException { Server server = new Server(); server.init(host, port); return server; } private Server() { } // Initialize serverSocketChannel private void init(String host, int port) throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(host, port)); serverSocketChannel.configureBlocking(false); } /** * @Description: Start Server * @param selectTimeout The selector gets the blocking duration of the ready event * @param bufferCapacity buffer Capacity of * @throws IOException */ public void start(long selectTimeout, int bufferCapacity) throws IOException { final long timeout = selectTimeout <= 0 ? 1000 : selectTimeout; final int capacity = bufferCapacity <= 12 ? 12 : bufferCapacity; System.out.println("........NIOServer.start.........."); // Open selector Selector selector = Selector.open(); // register serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); try { while (true) { // Loop listening ready event if (selector.select(timeout) > 0) { // Get ready event Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); // Iterative processing ready event while (iterator.hasNext()) { SelectionKey key = iterator.next(); try { if (key.isAcceptable()) { // Connection event received SocketChannel channel = serverSocketChannel.accept(); InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress(); String hostName = address.getHostName(); int port = address.getPort(); System.out.println(hostName + ":" + port + "->Connection event established..."); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(capacity)); } else if (key.isReadable()) { // Read event this.handler(key); } } catch (IOException e) { e.printStackTrace(); } iterator.remove(); // Delete processed events } } } } catch (IOException e) { e.printStackTrace(); } finally { try { serverSocketChannel.close(); selector.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Connection closed"); } } // Handling read events private void handler(SelectionKey key) throws IOException { try { SocketChannel channel = (SocketChannel) key.channel(); InetSocketAddress remoteAddress = (InetSocketAddress) channel.getRemoteAddress(); ByteBuffer buffer = (ByteBuffer) key.attachment(); String message = BufferUtils.readBuffer(channel, buffer, true); message = remoteAddress.getHostName() + ":" + remoteAddress.getPort() + " message -> " + message; System.out.println(message); Thread.sleep(50); // Simulated business processing time // Build response message String response = "status=200;server Received from " + message; channel.write(ByteBuffer.wrap(response.getBytes())); } catch (Exception e) { e.printStackTrace(); } finally { key.channel().close(); } } }
3. client side
public static void main(String[] args) throws IOException { String host = "127.0.0.1"; int port = 9991; Client client = Client.createClient(); String message = "client call"; for (int i = 0; i < 10; i ++) { new Thread(() -> { String response = client.call(host, port, message); System.out.println(response); }).start(); } } static class Client { // Create Client public static Client createClient() { Client client = new Client(); return client; } private Client() { } /** * * @Description: Send request to server * @param host Server host * @param port Server port * @param message Messages sent * @return */ public String call(String host, int port, String message) { SocketChannel channel = null; try { //Open the channel and connect to the server in blocking mode channel = SocketChannel.open(new InetSocketAddress(host, port)); //Store message in buffer ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); //Write data from buffer to channel channel.write(buffer); ByteBuffer byteBuffer = ByteBuffer.allocate(64); //Read the data responded by the server from the channel String response = BufferUtils.readBuffer(channel, byteBuffer); return response; } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } return null; } }
4. Example of NiO simulated long link group chat system:
Requirements:
The server monitors the online and offline of each client, and forwards messages from any client to all clients
Design idea:
First, the following objects are abstracted
Common directive object
Instruction objects are divided into server instructions and client instructions
Server object
client object
Finally, according to the current time, there is still no GUI. Simply use the console to input scan scanner
The code is as follows: (tired, added after the comment)
1.BufferUtils
Refer to 3.1BufferUtils code snippet
2. commond
package cn.qu.scanner; import java.util.Map; import java.util.function.Function; public interface Commond { /** * @Description: Initiate stop command */ void stop(); /** * @Description: Polling listening */ void listen(); /** * @Description: Build instruction set * @param commondContainer Container for storing instructions */ void buildCommondContainer(Map<String, Function<String, Object>> commondContainer); }
3.ScannerCommond
package cn.qu.scanner; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; public abstract class ScannerCommond implements Commond { private AtomicBoolean flag = new AtomicBoolean(true); private Scanner scaner; private final Map<String, Function<String, Object>> commondContainer = new HashMap<>(); private List<String> commondKeyList; protected ScannerCommond() { this.init(); } @Override public void stop() { flag.compareAndSet(true, false); } @Override public void listen() { while (flag.get()) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } if (scaner.hasNext()) { String commond = scaner.nextLine(); if (checkCommond(commond)) { String commondKey, commondContent; if (commond.contains(":")) { commondKey = commond.substring(0, commond.indexOf(":")); commondContent = commond.substring(commond.indexOf(":") + 1); } else { commondKey = commond; commondContent = commond; } Function<String, Object> function = commondContainer.get(commondKey); if (function != null) { function.apply(commondContent); } } } } } private boolean checkCommond(String commond) { for (String commondKey : commondKeyList) { if (commond.startsWith(commondKey)) { return true; } } return false; } private void init() { this.buildCommondContainer(commondContainer); this.commondKeyList = commondContainer.keySet().stream().collect(Collectors.toList()); scaner = new Scanner(System.in); } }
4.Server
package cn.qu.nio.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.springframework.util.StringUtils; import cn.qu.scanner.ScannerCommond; import cn.qu.util.BufferUtils; public class GroupChatServer { public static void main(String[] args) { String host = "127.0.0.1"; int port = 18001; Server server = Server.createServer(host, port); new Thread(server).start(); GroupChatServerScaner.start(server); } static class GroupChatServerScaner extends ScannerCommond { private GroupChatServerScaner() { } private final static GroupChatServerScaner scaner = new GroupChatServerScaner(); private Server server; public static void start(Server server) { scaner.server = server; scaner.listen(); } @Override public void buildCommondContainer(Map<String, Function<String, Object>> commondContainer) { commondContainer.put("stop", (param) -> { try { this.server.stop(); this.stop(); } catch (Exception e) { e.printStackTrace(); } return null; }); } } static class Server implements Runnable { private AtomicBoolean flag; private ServerSocketChannel serverSocketChannel; Selector selector = null; private InetSocketAddress address; public static Server createServer(String host, int port) { Server server = new Server(); try { server.init(host, port); return server; } catch (IOException e) { throw new RuntimeException(e); } } @Override public void run() { try { while (flag.get()) { if (selector.select(1000) > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); try { if (key.isAcceptable()) { this.onLine(); } else if (key.isReadable()) { this.readMessage(key); } } catch (IOException e) { key.cancel(); try { key.channel().close(); } catch (IOException e1) { e1.printStackTrace(); } // e.printStackTrace(); } // iterator.remove(); // Delete processed events } } } } catch (IOException e) { e.printStackTrace(); } finally { this.close(); } } public void stop() { this.flag.set(false); } private Server() { } private void init(String host, int port) throws IOException { address = new InetSocketAddress(host, port); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(address); serverSocketChannel.configureBlocking(false); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); this.flag = new AtomicBoolean(true); String current = address.getHostName() + ":" + address.getPort(); System.out.println("-----------------------" + current + " server is started-----------------------"); } private void onLine() throws IOException { String user = ""; try { SocketChannel channel = serverSocketChannel.accept(); InetSocketAddress clientAddress = (InetSocketAddress) channel.getRemoteAddress(); String hostName = clientAddress.getHostName(); int port = clientAddress.getPort(); user = hostName + ":" + port; System.out.println(user + " -> Online..."); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, user); } catch (IOException e) { System.out.println("Remote service connection establishment exception... clientAddress=" + user); throw e; } } private void sendMessage(Selector selector, String message) throws IOException { try { Iterator<SelectionKey> iterator = selector.keys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); SelectableChannel selectableChannel = key.channel(); if (!(selectableChannel instanceof ServerSocketChannel)) { SocketChannel channel = (SocketChannel) selectableChannel; channel.write(ByteBuffer.wrap(message.getBytes())); } } } catch (IOException e) { System.out.println("Forwarding message exception... message=" + message); throw e; } } private void readMessage(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); String user = (String) key.attachment(); String message = ""; try { message = BufferUtils.readBuffer(channel, ByteBuffer.allocate(1024), true); if (!StringUtils.isEmpty(message)) { message = user + " -> " + message; this.sendMessage(selector, message); } } catch (IOException e) { System.out.println(user + " -> " + "Offline..."); throw e; } } private void close() { try { serverSocketChannel.close(); selector.close(); String current = address.getHostName() + ":" + address.getPort(); System.out.println("-----------------------" + current + " server is stoped-----------------------"); } catch (IOException e) { e.printStackTrace(); } } } }
5.client
package cn.qu.nio.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import cn.qu.scanner.ScannerCommond; import cn.qu.util.BufferUtils; public class GroupChatClient { public static void main(String[] args) { String serverHost = "127.0.0.1"; int serverPort = 18001; Client client = Client.create(serverHost, serverPort); new Thread(client).start(); GroupChatServerScaner.start(client); } static class GroupChatServerScaner extends ScannerCommond { private GroupChatServerScaner() { } private final static GroupChatServerScaner scaner = new GroupChatServerScaner(); public static void start(Client client) { scaner.client = client; scaner.listen(); } private Client client; @Override public void buildCommondContainer(Map<String, Function<String, Object>> commondContainer) { commondContainer.put("stop", (commond) -> { try { this.client.stop(); this.stop(); } catch (Exception e) { e.printStackTrace(); } return null; }); commondContainer.put("say", (commond) -> { this.client.sendMessage(commond); return null; }); } } static class Client implements Runnable { private SocketChannel socketChannel; private AtomicBoolean flag; private Selector selector; private InetSocketAddress serverAddress; private InetSocketAddress address; public static Client create(String host, int port) { Client client = new Client(); try { client.init(host, port); return client; } catch (IOException e) { throw new RuntimeException(e); } } private Client() { } public void init(String host, int port) throws IOException { selector = Selector.open(); serverAddress = new InetSocketAddress(host, port); socketChannel = SocketChannel.open(serverAddress); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); flag = new AtomicBoolean(true); address = (InetSocketAddress) socketChannel.getLocalAddress(); String current = address.getHostName() + ":" + address.getPort(); System.out.println("-----------------------" + current + " client is started-----------------------"); } public void stop() { flag.compareAndSet(true, false); } @Override public void run() { try { while (flag.get()) { if (selector.select(1000) > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { this.readMessage(key); } iterator.remove(); } } } } catch (IOException e) { e.printStackTrace(); } finally { this.close(); } } private void close() { try { selector.close(); socketChannel.close(); String current = address.getHostName() + ":" + address.getPort(); System.out.println("-----------------------" + current + " client is stoped-----------------------"); } catch (IOException e) { e.printStackTrace(); } } public void sendMessage(String message) { try { socketChannel.write(ByteBuffer.wrap(message.getBytes())); } catch (IOException e) { e.printStackTrace(); } } private void readMessage(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); String message = ""; try { message = BufferUtils.readBuffer(channel, ByteBuffer.allocate(1024), true); } catch (IOException e) { message = serverAddress.getHostName() + ":" + serverAddress.getPort() + " -> The server has been disconnected..."; key.cancel(); try { channel.close(); } catch (IOException e1) { e1.printStackTrace(); } // e.printStackTrace(); } message = "[" + address.getHostName() + ":" + address.getPort() + "] ==>> " + message; System.out.println(message); } } }