1 Bootstrap,ServerBootstrap
- Bootstrap means boot. A Netty application usually starts with a bootstrap. Its main function is to configure the entire Netty program and connect various components in series. The bootstrap class in Netty is the startup boot class of the client program, and the ServerBootstrap is the startup boot class of the server.
- Common methods:
/** * This method is used for the client to set an EventLoop */ public ServerBootstrap group(EventLoopGroup group); /** * This method is used on the server side to set two eventloops */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup); /** * Used to add configuration to the received channel */ public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value); /** * Used to add attributes to the received channel */ public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value); /** * This method is used to set the business processing class (custom handler) */ public ServerBootstrap childHandler(ChannelHandler childHandler); /** * This method is used on the server side to set the occupied port number */ public ChannelFuture bind(int inetPort); /** * This method is used for the client to connect to the server */ public ChannelFuture connect(String inetHost, int inetPort) ;
2 Future,ChannelFuture
- All IO operations in Netty are asynchronous, and you can't immediately know whether the message is processed correctly. However, you can wait for its execution to complete or register a listener directly. The specific implementation is that they can register a listener through Future and ChannelFutures. When the operation is successful or failed, the listener will automatically trigger the registered listening event.
- Common methods:
/** * Returns the channel for which the IO operation is currently in progress */ Channel channel(); /** * Add listener to channel */ ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); /** * Add listener to channel */ ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1); /** * Remove listener for channel */ ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1); /** * Remove listener for channel */ ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1); /** * Wait for the task to end. If the task generates an exception or is interrupted, throw an exception, otherwise return to Future itself */ ChannelFuture sync() throws InterruptedException; /** * Wait for the task to end. The task itself cannot be interrupted. If an exception occurs, an exception will be thrown, otherwise it will return to Future itself */ ChannelFuture syncUninterruptibly(); /** * Wait for the task to end. If the task is interrupted, an interrupt exception will be thrown. Unlike sync, only an interrupt exception will be thrown, not the exception generated by the task */ ChannelFuture await() throws InterruptedException; /** * Wait for the end of the task. The task cannot be interrupted */ ChannelFuture awaitUninterruptibly();
3 Channel
4 Selector
- Netty implements I/O multiplexing based on Selector object. One thread of Selector can listen to Channel events of multiple connections.
- After registering a Channel with a Selector, the internal mechanism of the Selector can automatically and continuously query (Select) whether these registered channels have ready I/O events (such as readable, writable, network connection completion, etc.), so that the program can simply use one thread to efficiently manage multiple channels.
5 ChannelHandler and its implementation class
- ChannelHandler is an interface that handles I/O events or intercepts I/O operations and forwards them to the next handler in its ChannelPipeline (business processing chain).
- ChannelHandler itself does not provide many methods, because this interface has many methods that need to be implemented. During use, it can inherit its subclasses.
- List of ChannelHandler and its implementation classes:
- ChannelInboundHandler is used to handle inbound I/O events.
- ChannelOutboundHandler is used to handle outbound I/O operations.
Adapter
- The ChannelInboundHandlerAdapter is used to handle inbound I/O events.
- The ChannelOutboundHandlerAdapter is used to handle outbound I/O operations.
- ChannelDuplexHandler is used to handle inbound and outbound events.
6 Pipeline and ChannelPipeline
- ChannelPipeline is a collection of handlers, which is responsible for handling and intercepting inbound or outbound events and operations. It is equivalent to a chain running through Netty.
- ChannelPipeline implements an advanced form of interception filter mode, which enables users to fully control the processing of events and how each ChannelHandler in the Channel interacts with each other.
- In Netty, each Channel has only one ChannelPipeline corresponding to it. Their composition relationship is as follows:
- A Channel contains a ChannelPipeline, which maintains a two-way linked list composed of ChannelHandlerContext, and each ChannelHandlerContext is associated with a ChannelHandler.
- Inbound events and outbound events are in a two-way linked list. Inbound events will be passed back from the linked list head to the last inbound handler, and outbound events will be passed forward from the linked list tail to the first outbound handler. The two types of handlers do not interfere with each other.
- common method
- ChannelPipeline addFirst(ChannelHandler... handlers) adds a business processing class (handler) to the first position in the chain.
- ChannelPipeline addLast(ChannelHandler... handlers) adds a business processing class (handler) to the last position in the chain.
7 ChannelHandlerContext
- Save all context information related to the Channel and associate a ChannelHandler object at the same time.
- That is, the ChannelHandlerContext contains a specific event handler. At the same time, the ChannelHandlerContext is also bound with the corresponding Pipeline and Channel information to facilitate the call to the ChannelHandler.
- Common methods:
- ChannelOutboundInvoker ChannelFuture close(), close the channel
- ChannelOutboundInvoker ChannelOutboundInvoker flush(), refresh
- ChannelFuture writeAndFlush(Object msg), write the data to the next ChannelHandler of the current ChannelHandler in the ChannelPipeline and start processing (outbound).
8 ChannelOption
- After creating a Channel instance, Netty generally needs to set the ChannelOption parameter.
- The ChannelOption parameters are as follows:
- ChannelOption.SO_BACKLOG: corresponds to the backlog parameter in the listen function of TCP/IP protocol. Used to initialize the server connectable queue size. The server processes client connection requests in sequence, so only one client connection can be processed at a time. When multiple clients come, the server puts the client connection requests that cannot be processed in the queue for processing. The backlog parameter specifies the size of the queue.
- ChannelOption.SO_KEEPALIVE: keep the connection active all the time.
9 EventLoopGroup and its implementation class NioEventLoopGroup
- EventLoopGroup is an abstraction of a group of eventloops. In order to make better use of multi-core CPU resources, Netty generally has multiple eventloops working at the same time, and each EventLoop maintains a Selector instance.
- EventLoopGroup provides the next interface. You can obtain one of the eventloops from the group according to certain rules to process tasks. In Netty server-side programming, we generally need to provide two eventloopgroups, such as BossEventLoopGroup and WorkerEventLoopGroup.
- Usually, a service port, that is, a ServerSocketChannel, corresponds to a Selector and an EventLoop thread. BossEventLoop is responsible for receiving the connection from the client and handing the SocketChannel to the WorkerEventLoopGroup for IO processing.
- Boseventloopgroup is usually a single threaded EventLoop. EventLoop maintains a Selector instance registered with ServerSocketChannel. Boseventloop constantly polls the Selector to separate connection events. OP received_ After the accept event, give the received SocketChannel to the WorkerEventLoopGroup.
- WorkerEventLoopGroup will select one of the eventloops by next to register the SocketChannel with its maintained Selector and handle its subsequent IO events.
10 Unpooled class
Netty provides a tool class specially used to operate buffers (that is, netty's data container). A common method is as follows
public static ByteBuf copiedBuffer(CharSequence string, Charset charset);
ByteBuf schematic diagram:
BEFORE clear() +-------------------+------------------+------------------+ | discardable bytes | readable bytes | writable bytes | +-------------------+------------------+------------------+ | | | | 0 <= readerIndex <= writerIndex <= capacity AFTER clear() +---------------------------------------------------------+ | writable bytes (got more space) | +---------------------------------------------------------+ | | 0 = readerIndex = writerIndex <= capacity
- Capacity: capacity, determined when creating.
- 0 - readerIndex: indicates read
- Readerindex writerindex: indicates the readable range
- writerIndex - capacity: writable range
Bytebuf Demo code is as follows:
package bin.netty.bytebuf; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; /** * @author liyibin * @date 2021-06-26 */ public class ByteBufDemo { public static void main(String[] args) { ByteBuf byteBuf = Unpooled.copiedBuffer("hello, world", CharsetUtil.UTF_8); // buf index System.out.println("readerIndex: " + byteBuf.readerIndex() + ", writerIndex: " + byteBuf.writerIndex() + ", capacity: " + byteBuf.capacity()); for (int i = 0; i < byteBuf.writerIndex(); i++) { System.out.println((char) byteBuf.readByte()); } // buf index System.out.println("readerIndex: " + byteBuf.readerIndex() + ", writerIndex: " + byteBuf.writerIndex() + ", capacity: " + byteBuf.capacity()); // Read part // param1: start index // param2: length System.out.println(byteBuf.getCharSequence(0, 3, CharsetUtil.UTF_8)); } }
11 Netty application example - group chat system
- Write a Netty group chat system to realize simple data communication (non blocking) between server and client
Realize multi crowd chat. - Server side: it can monitor users online and offline, and realize message forwarding function.
- Client: through the channel, you can send messages to all other users without blocking, and accept messages sent by other users (forwarded by the server).
Server:
package bin.netty.groupchat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @author liyibin * @date 2021-06-26 */ public class NettyGroupChatServer { public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // Add a log processor to the bossGroup .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { // Initialize the channel and add a processor to the channel of the workerGroup @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // Character type decoder pipeline.addLast("decoder", new StringDecoder()); // Character type encoder pipeline.addLast("encoder", new StringEncoder()); // Service processor pipeline.addLast(new GroupChatServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(9999).sync(); channelFuture.addListener(cf -> { if (cf.isSuccess()) { System.out.println("listen on port 9999"); } }); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
GroupChatServerHandler:
package bin.netty.groupchat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * @author liyibin * @date 2021-06-26 */ public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { /** * Channel group, which is used to manage the currently connected channels. It is globally unique */ private final static ChannelGroup CHANNELS = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * Receive messages sent by clients */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); // Server print message System.out.printf("[%s]: %s\n", channel.remoteAddress().toString(), msg); // Send messages to other channels CHANNELS.forEach(ch -> { // If it is not the current communication channel, send the message if (ch != channel) { ch.writeAndFlush(String.format("[%s]: %s\n", channel.remoteAddress().toString(), msg)); } }); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.printf("%s Joined the group chat\n", ctx.channel().remoteAddress()); CHANNELS.add(ctx.channel()); System.out.println("Current number of group chat users:" + CHANNELS.size()); } /** * go online */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.printf("%s It's online\n", ctx.channel().remoteAddress()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.printf("%s Quit group chat", ctx.channel().remoteAddress()); // Automatically removed System.out.println("Current number of group chat users:" + CHANNELS.size()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
client:
package bin.netty.groupchat; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Scanner; /** * @author liyibin * @date 2021-06-26 */ public class NettyGroupChatClient { public static void main(String[] args) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // Character type decoder pipeline.addLast("decoder", new StringDecoder()); // Character type encoder pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { // Print received messages System.out.printf("%s\n", msg); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); channelFuture.addListener(cf -> { if (cf.isSuccess()) { System.out.println("connect to server"); } }); Channel channel = channelFuture.channel(); // Process user input Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); channel.writeAndFlush(Unpooled.copiedBuffer(msg + "\r\n", CharsetUtil.UTF_8)); } } finally { group.shutdownGracefully(); } } }
12 cases of netty heartbeat detection mechanism
- Write a case of Netty heartbeat detection mechanism. When the server does not read for more than 3 seconds, it will prompt that the read is idle.
- When the server has no write operation for more than 5 seconds, it will prompt that the write is idle.
- When the server has no read or write operation for more than 7 seconds, it will prompt that the read and write is idle.
Server:
package bin.netty.heartcheck; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * Heartbeat detection mechanism * * @author liyibin * @date 2021-06-27 */ public class NettyHeartCheckServer { public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // Add processor to handle idle state // readerIdleTime: indicates how long you haven't read, and then send a heartbeat detection packet // writerIdleTime: indicates how long you haven't written, and then send a heartbeat detection packet // allIdleTime: indicates how long it takes to send a heartbeat detection packet without reading or writing // Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed read, write, or both operation for a while. // When the IdleStateEvent event event is triggered, it will be passed to the useEventTriggered method of the next handler for processing pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS)); // Service processor pipeline.addLast(new MyServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(9999).sync(); channelFuture.addListener(cf -> { if (cf.isSuccess()) { System.out.println("listen on 9999"); } }); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerHandler:
package bin.netty.heartcheck; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; /** * @author liyibin * @date 2021-06-27 */ public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; String eventType; switch (idleStateEvent.state()) { case READER_IDLE: eventType = "Read idle"; break; case WRITER_IDLE: eventType = "Write idle"; break; case ALL_IDLE: eventType = "Read write idle"; break; default: eventType = null; break; } System.out.println(ctx.channel().remoteAddress() + "---Idle type---" + eventType); } } }
client:
package bin.netty.heartcheck; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author liyibin * @date 2021-06-27 */ public class NettyHeartCheckClient { public static void main(String[] args) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
13 Netty realizes the long connection between server and client through WebSocket programming
- The Http protocol is stateless. The request between the browser and the server responds once, and the connection will be re created next time.
- Requirements: realize full duplex interaction of long connection based on webSocket.
- Change the constraint of multiple requests of Http protocol to realize a long connection, and the server can send messages to the browser.
- The client browser and the server will perceive each other. For example, when the server is closed, the browser will perceive. Similarly, when the browser is closed, the server will perceive.
Server:
package bin.netty.websocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; /** * @author liyibin * @date 2021-06-27 */ public class NettyWebSocketServer { public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // http codec pipeline.addLast(new HttpServerCodec()); // Read and write in block mode pipeline.addLast(new ChunkedWriteHandler()); // http data transmission is segmented. When a large amount of data is sent, it needs to be aggregated together pipeline.addLast(new HttpObjectAggregator(8192)); // 1. websocket processor, data is transmitted in the form of frames. netty corresponds to WebSocketFrame class, which has 6 subclasses. // 2. Request ws://localhost:9999/chat for communication // 3. The core of websocketserverprotocolhandler is to upgrade http protocol to ws protocol by responding to status code 101 pipeline.addLast(new WebSocketServerProtocolHandler("/chat")); // Service processor pipeline.addLast(new MyTextWebSocketFrameHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(9999).sync(); channelFuture.addListener(cf -> { if (cf.isSuccess()) { System.out.println("websocket listen on 9999"); } }); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyTextWebSocketFrameHandler:
package bin.netty.websocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; /** * @author liyibin * @date 2021-06-27 */ public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.printf("[%s]: %s", ctx.channel().remoteAddress(), msg.text()); // Reply message ctx.writeAndFlush(new TextWebSocketFrame("The server received a message: " + LocalDateTime.now())); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded cover" + ctx.channel().id().asShortText() + " call"); System.out.println("handlerAdded cover" + ctx.channel().id().asLongText() + " call"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved cover" + ctx.channel().id().asShortText() + " call"); System.out.println("handlerRemoved cover" + ctx.channel().id().asLongText() + " call"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
client:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> let socket; if (window.WebSocket) { socket = new WebSocket("ws://localhost:9999/chat"); socket.onopen = function (evt) { let res = document.getElementById("responseText"); res.value = "Connection on...\n"; } socket.onmessage = function (evt) { let res = document.getElementById("responseText"); res.value = res.value + evt.data; } socket.onclose = function (evt) { let res = document.getElementById("responseText"); res.value = res.value + "Connection closed...\n"; } } else { alert("The current browser does not support WebSocket") } function send(msg) { console.log(msg); /*if (!window.socket) { console.log("no open"); return ; }*/ if (socket.readyState === WebSocket.OPEN) { socket.send(msg); } else { alert("The connection is not open") } } </script> <form onsubmit="return false"> <textarea name="message" style="width: 300px; height: 300px"></textarea> <input type="button" value="send message" onclick="send(this.form.message.value)"> <textarea id="responseText" style="width: 300px; height: 300px"></textarea> <input type="button" value="Clear message" onclick="document.getElementById('responseText').value=''"> </form> </body> </html>