1. Introduction to IO, NIO and Netty
1.1 blocking I / O
Synchronous blocking I/O mode: when a thread executes the read() or write() method, the thread will block until it reads some data or writes out all the data. During this period, the thread cannot do anything else. When the number of active connections is not particularly high (less than 1000 for a single machine), this model is relatively good. Each connection can focus on its own I/O, and the programming model is simple, without too much consideration of system overload, current limit and other problems.
However, when faced with 100000 or even millions of connections, the traditional BIO model is powerless. Therefore, we need a more efficient I/O processing model to deal with higher concurrency.
1.2 non blocking NIO(New I/O)
NIO is a synchronous non blocking I/O model. NIO has the same function and purpose as the original IO, but the use method is completely different. NIO supports Buffer oriented and Channel based operations. NIO will read and write files in a more efficient way. The core of JAVA NIO is: Channel and Buffer. A Channel represents a connection that opens an IO device (e.g., file, Socket). If NIO system needs to be used, it is necessary to obtain the Channel used to connect IO devices and the Buffer data used to hold data for processing. Blocking IO will wait all the time, so non blocking IO is used to solve the decoupling problem between IO threads and sockets. By introducing a mechanism, if the Socket sending Buffer is writable, it will notify the IO thread to write, and if the Socket receiving Buffer is readable, it will notify the IO thread to read. NIO provides SocketChannel and ServerSocketChannel corresponding to Socket and ServerSocket in the traditional BIO model.
For high load and high concurrency (Network) applications, NIO's non blocking mode should be used for development.
1.3 Netty
Netty is an asynchronous event driven network application framework for rapid development of maintainable high-performance protocol servers and clients.
Netty is a NIO client server framework, which can quickly and easily develop network applications, such as protocol server and client. It greatly simplifies and simplifies network programming such as TCP and UDP socket server.
"Fast and simple" does not mean that the resulting application will be affected by maintainability or performance problems. Netty is carefully designed based on experience gained from the implementation of many protocols, such as FTP, SMTP, HTTP, and various legacy protocols based on binary and text. As a result, netty successfully found a way to achieve ease of development, performance, stability, and flexibility without compromise.
2. Coding
2.1 IO server
import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class OioServer { @SuppressWarnings("resource") public static void main(String[] args) throws Exception { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); //Create a socket service and listen on port 10101 ServerSocket server=new ServerSocket(10101); System.out.println("Server startup!"); while(true){ //Get a socket (blocking) final Socket socket = server.accept(); System.out.println("A new client!"); newCachedThreadPool.execute(new Runnable() { @Override public void run() { //Business processing handler(socket); } }); } } /** * Read data * @param socket * @throws Exception */ public static void handler(Socket socket){ try { byte[] bytes = new byte[1024]; InputStream inputStream = socket.getInputStream(); while(true){ //Read data (blocking) int read = inputStream.read(bytes); if(read != -1){ System.out.println(new String(bytes, 0, read)); }else{ break; } } } catch (Exception e) { e.printStackTrace(); }finally{ try { System.out.println("socket close"); socket.close(); } catch (IOException e) { e.printStackTrace(); } } }}
2.2 NIO server
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { // Channel Manager private Selector selector; /** * Get a ServerSocket channel and do some initialization for the channel * * @param port * Bound port number * @throws IOException */ public void initServer(int port) throws IOException { // Get a ServerSocket channel ServerSocketChannel serverChannel = ServerSocketChannel.open(); // Set channel to non blocking serverChannel.configureBlocking(false); // Bind the ServerSocket corresponding to the channel to the port serverChannel.socket().bind(new InetSocketAddress(port)); // Get a channel manager this.selector = Selector.open(); // Bind the channel manager to the channel and register the selectionkey for the channel OP_ Accept event. After registering this event, // When the event arrives, the selector Select () will return if the event does not reach the selector Select() will always block. serverChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * Use polling to monitor whether there are events to be processed on the selector. If so, process them * * @throws IOException */ public void listen() throws IOException { System.out.println("The server is started successfully!"); // Polling access selector while (true) { // When the registered event arrives, the method returns; Otherwise, the method will always block selector.select(); // Gets the iterator of the selected item in the selector. The selected item is the registered event Iterator<?> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // Delete the selected key to prevent repeated processing ite.remove(); handler(key); } } } /** * Processing requests * * @param key * @throws IOException */ public void handler(SelectionKey key) throws IOException { // Client request connection event if (key.isAcceptable()) { handlerAccept(key); // Get readable events } else if (key.isReadable()) { handelerRead(key); } } /** * Processing connection requests * * @param key * @throws IOException */ public void handlerAccept(SelectionKey key) throws IOException { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // Get the channel to connect with the client SocketChannel channel = server.accept(); // Set to non blocking channel.configureBlocking(false); // Here you can send information to the client System.out.println("New client connection"); // After the connection with the client is successful, in order to receive the information from the client, you need to set the read permission for the channel. channel.register(this.selector, SelectionKey.OP_READ); } /** * Handling read events * * @param key * @throws IOException */ public void handelerRead(SelectionKey key) throws IOException { // The server can read the message: get the Socket channel of the event SocketChannel channel = (SocketChannel) key.channel(); // Create read buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); if(read > 0){ byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("Information received by the server:" + msg); //Write back data ByteBuffer outBuffer = ByteBuffer.wrap("well".getBytes()); channel.write(outBuffer);// Send the message back to the client }else{ System.out.println("Client shutdown"); key.cancel(); } } /** * Start server test * * @throws IOException */ public static void main(String[] args) throws IOException { NIOServer server = new NIOServer(); server.initServer(8000); server.listen(); } }
2.3 Netty echo server
package Netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // configure server EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server ChannelFuture f = b.bind(PORT).sync(); // Block until disconnected f.channel().closeFuture().sync(); } finally { // Close all event loops and close threads bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
Server processing events:
package Netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.StandardCharsets; import java.util.Scanner; @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Scanner s = new Scanner(System.in); while(true) { String words = s.nextLine(); final ByteBuf wordbuf = Unpooled.wrappedBuffer(words.getBytes(StandardCharsets.UTF_8)); final ChannelFuture f = ctx.writeAndFlush(wordbuf); // (3) f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; if(words.equals("exit")){ ctx.close(); return; } } }); // (4) } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
package Netty; /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", ""); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the server closes the connection f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
Client handling events:
package Netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.Date; /** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; // (1) String word=m.toString(CharsetUtil.UTF_8); try { System.out.println(word); if(word.equals("exit")){ ctx.close(); } } finally { m.release(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
3. Reference
