1. Introduction to Netty
1.1. Netty is a client server framework based on Java NIO. Using netty, you can quickly develop network applications. Netty provides a high-level abstraction to simplify the programming of TCP and UDP servers, but you can still use the underlying API.
1.2. The internal implementation of netty is very complex, but netty provides a simple and easy-to-use API to decouple business logic from network processing code. Netty is completely implemented based on NIO, so the whole netty is asynchronous.
1.3. Netty is the most popular NIO framework. It has been verified by hundreds of commercial and commercial projects. The underlying RPCs of many frameworks and open source components use netty, such as Dubbo, RocketMQ, Elasticsearch, g rpc, etc.
1.4. Support a variety of protocols, such as FTP, SMTP, HTTP and various binary and text-based traditional protocols.
2. Why use Netty?
2.1 the unified API supports multiple transmission types, blocking and non blocking.
2.2 simple and powerful threading model.
2.3 powerful function, preset codec to solve TCP packet sticking / unpacking function, and support a variety of mainstream protocols;
2.4 true connectionless packet socket support.
2.5 has higher throughput, lower latency, lower resource consumption and less memory replication than using Java core API directly.
2.6 good security, complete SSL/TLS and StartTLS support.
2.7 the community is active and the version iteration cycle is short. The bugs found can be repaired in time. At the same time, more new functions will be added.
2.8 is mature and stable. It has experienced the use and test of large projects, and many open source projects use Netty, such as Dubbo, RocketMQ and so on.
2.9 is mature and stable. Netty has repaired all JDK NIO bugs that have been found. Business developers don't need to worry about NIO bugs anymore.
3. Application scenarios of Netty
3.1 network communication tool as RPC framework
For example, in our microservice framework, different services often need to call each other. At this time, RPC framework is needed.
3.2 realize a real-time communication system
Using Netty, we can realize a chat function, such as the real-time communication system of wechat.
Netty is implemented based on Java NIO. We must talk about the basic concepts of NIO:
Block and non block
Block: you need to wait for the data in the buffer to be ready before processing other things, otherwise you will wait there all the time.
Non block: when the process accesses the data buffer, if the data is not ready, it will return directly without waiting. If the data is ready, it is also returned directly.
Synchronous and Asynchronous
Synchronous: the client sends a request to the server. Regardless of whether the server is blocked or not and other conditions, as long as the server returns the result of synchronization, it is recognized as synchronization.
Asynchronous: when the Client sends a request to the server, there is no result in the synchronous return value. The Client only needs to wait for the notification and can pull the result after receiving the notification, which is asynchronous.
Differences between Java BIO and NIO
BIO (traditional IO):
The traditional java.io package is implemented based on the flow model. The interaction mode is synchronous and blocking, that is, when reading the input stream or output stream, the thread will always block there before the reading and writing action is completed, and the call between them is in a reliable linear order.
NIO(Non-blocking/New I/O):
NIO is a synchronous non blocking I/O model introduced in Java 1.4. It corresponds to the java.nio package and provides abstractions such as channel, selector and Buffer. N in NIO can be understood as non blocking, not just New. It supports Buffer oriented, channel based I/O operations. NIO provides two different Socket channel implementations of SocketChannel and ServerSocketChannel corresponding to Socket and ServerSocket in the traditional BIO model. Both channels support blocking and non blocking modes. For high load and high concurrency (Network) applications, NIO's non blocking mode should be used for development.
The first biggest difference between Java NIO and BIO is that BIO is stream oriented and NIO is buffer oriented.
NIO features:
A thread can process multiple channels, reduce the number of thread creation, and read or write is not blocked. Even if there is no data read / write, blocking will not occur, resulting in a waste of thread resources.
Netty's execution process:
Next, post the Client and Server codes to explain the main functions of the core components of Netty.
Server code:
public void start(InetSocketAddress address) { //Configure NIO thread group of server EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(address) .childHandler(new NettyServerChannelInitializer()) .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024)) .childOption(ChannelOption.SO_KEEPALIVE, true); // Bind the port and start receiving incoming connections ChannelFuture future = bootstrap.bind(address).sync(); log.info("netty Server start listening port:" + address.getPort()); //Close the channel and block until it is closed future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //Close main thread group bossGroup.shutdownGracefully(); //Close worker thread group workerGroup.shutdownGracefully(); } }
Client code:
public void Client(InetSocketAddress address) { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new DemoSocketClientHandler()); } }); ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); future.channel().closeFuture().sync(); } finally { if(eventLoopGroup != null) { eventLoopGroup.shutdownGracefully(); } } } }
Channel:
Channel interface is an abstract class of Netty for network operations. It includes basic I/O operations, such as bind(), connect(), read(), write(), etc.
The commonly used Channel interface implementation classes are NioServerSocketChannel (server) and NioSocketChannel (client). These two channels can correspond to the two concepts of ServerSocket and Socket in the BIO programming model. The API provided by Netty's Channel interface greatly reduces the complexity of using Socket classes directly.
Moreover, we read and write data through the Channel pipe. It is like a water pipe. It is a Channel. The difference between channels and streams is that channels are bidirectional and can be used for read, write and simultaneous read-write operations.
EventLoop:
Its main function is actually to listen to network events and call the event processor to process relevant I/O operations.
EventloopGroup:
As the name suggests, it is the group of EventLoop.
EventLoopGroup contains multiple eventloops. Each EventLoop usually contains a Thread. As we have said above, the main function of EventLoop is actually to listen to network events and call the event processor to process relevant I/O operations, and the I/O events processed by EventLoop will be processed on its proprietary Thread, That is, Thread and EventLoop belong to a 1:1 relationship to ensure Thread safety.
It can be seen from the above figure combined with the following code diagram:
The bossGroup is used to receive connections, and the workerGroup is used for specific processing (message reading and writing and other logical processing).
When the client connects to the server through the connect method, the bossGroup processes the client connection request. After the client processing is completed, the connection will be submitted to the workerGroup for processing, and then the workerGroup is responsible for handling its IO related operations.
NioEventLoopGroup initialization analysis
Before explaining, let's talk about the relationship between EventLoopGroup and NioEventLoopGroup, as shown in the following figure
As can be seen from the above figure, in Netty, EventLoopGroup and NioEventLoopGroup are both thread pools, and the EventLoopGroup interface in Netty directly inherits the EventExecutorGroup interface in JDK. NioEventLoopGroup in Netty is an abstract class that inherits MultithreadEventLoopGroup.
MultithreadEventLoopGroup inherits the MultithreadEventExecutorGroup abstract class. The MultithreadEventExecutorGroup abstract class implements the EventLoopGroup interface in Netty. If you don't understand, you can take a look at the inheritance relationship:
Is this a better understanding
The following specific analysis starts from the initialization of NioEventLoopGroup to the source code analysis
/** * Create a new instance, use the default number of threads, and use the default thread factory ThreadFactory and SelectorProvider * SelectorProvider Created by the static provider() method of SelectorProvider */ public NioEventLoopGroup() { this(0); } /** * Create a new instance using the specified number of threads */ public NioEventLoopGroup(int nThreads) { // The second parameter is the executor contained in the group this(nThreads, (Executor)null); } /** * Create a new instance, using the specified number of threads, given ThreadFactory and SelectorProvider */ public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, Executor executor) { // The third parameter is provider, which is used to provide selector and selectable channel, // This provider is the only singleton provider in the current JVM this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { // The fourth parameter is a selection policy factory instance this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } }
Meaning of each parameter specified in the constructor:
nThreads : DEFAULT_EVENT_LOOP_THREADS / / the default number of cores of the current cpu is 2
Times.
SelectorProvider.provider():
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { // 1. java.nio.channels.spi.SelectorProvider property specifies the implementation class if (loadProviderFromProperty()) return provider; // 2. SPI specifies the implementation class if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
As can be seen from the code, this is a singleton provider.
SelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE / / select a policy factory instance by default.
chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE / / selector factory instance
The main focus here is on the MultithreadEventExecutorGroup constructor
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { this.terminatedChildren = new AtomicInteger(); this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } else { //If no thread factory is specified, a default thread factory is constructed if (executor == null) { // This executor is the executor included in the group. It will create a thread for each eventLoop it contains in the future executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory()); } this.children = new EventExecutor[nThreads]; int j; for(int i = 0; i < nThreads; ++i) { boolean success = false; boolean var18 = false; try { var18 = true; // Create eventLoop this.children[i] = this.newChild((Executor)executor, args); // In the NioEventLoopGroup, each element of chlidren is actually a NioEventLoop success = true; var18 = false; } catch (Exception var19) { throw new IllegalStateException("failed to create a child event loop", var19); } finally { if (var18) { // In the process of creating these eventloops, if one creation fails, all previously created eventloops will be closed if (!success) { int j; for(j = 0; j < i; ++j) { // Close all previously created eventloops this.children[j].shutdownGracefully(); } // Terminate all tasks performed on eventLoop for(j = 0; j < i; ++j) { EventExecutor e = this.children[j]; try { while(!e.isTerminated()) { e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var20) { Thread.currentThread().interrupt(); break; } } } } } if (!success) { for(j = 0; j < i; ++j) { this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) { EventExecutor e = this.children[j]; try { while(!e.isTerminated()) { e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var22) { Thread.currentThread().interrupt(); break; } } } } // Create a selector this.chooser = chooserFactory.newChooser(this.children); FutureListener<Object> terminationListener = new FutureListener<Object>() { public void operationComplete(Future<Object> future) throws Exception { if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) { MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null); } } }; EventExecutor[] var24 = this.children; j = var24.length; //Add a listener to trigger when NioEventLoop is closed for(int var26 = 0; var26 < j; ++var26) { EventExecutor e = var24[var26]; e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length); Collections.addAll(childrenSet, this.children); this.readonlyChildren = Collections.unmodifiableSet(childrenSet); } }
newDefaultThreadFactory:
cexecutor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
If no thread factory is specified, the newDefaultThreadFactory method is called by default to construct a thread factory.
Click Next:
public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY);//The parameter poolType is the class of newDefaultThreadFactory. false indicates that the thread is not a background line //Process, Thread.NORM_PRIORITY is the priority of normal threads (three priorities: MIN_PRIORITY = 1;NORM_PRIORITY = 5;MAX_PRIORITY = 10;). }
Click Next:
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); }
To poolName (poolType): for example, if the poolType of the current newDefaultThreadFactory is io.netty.util.concurrent.newDefaultThreadFactory, it will be returned as newDefaultThreadFactory through the toPoolName () method.
Click Next:
public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); }
Thread group parameters are added to the construction method.
Click Next:
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { this.nextId = new AtomicInteger(); ObjectUtil.checkNotNull(poolName, "poolName"); if (priority >= 1 && priority <= 10) { this.prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } else { throw new IllegalArgumentException("priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } }
Some parameters are specified here:
c//poolId:private static final AtomicInteger poolId = new AtomicInteger(); Ensure thread safety.
cthis.daemon = daemon;// Background thread
cthis.priority = priority;// priority
cthis.threadGroup = threadGroup;// Thread group
After newDefaultThreadFactory, go back to the origin and talk about the construction method of newDefaultThreadFactory (thread executor), which creates a thread entity every time a task is executed.
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = (ThreadFactory)ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } public void execute(Runnable command) { this.threadFactory.newThread(command).start(); } }
The threadFactory passed in is the default parameter created by DefaultThreadFactory, in which NioEventLoop thread naming rule will be constructed as nioEventLoop-1-xxx. When the thread executes, execute() method will be called, and a FastThreadLocalThread thread thread will be created here.
public class DefaultThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } }
Create a thread through the above newThread(), then initialize the thread object data, and finally call Thread.init().
Next, you can see that an array of children is created, and the corresponding number of arrays are created as needed
cthis.children = new EventExecutor[nThreads];
Because each NioEventLoopGroup is a collection of nioeventloops, the children array here is the NioEventLoop of the current NioEventLoopGroup. Just explain it. That's not the point.
The key code is the creation of NioEventLoop, so go down and find:
cthis.children[i] = this.newChild((Executor)executor, args);;
This line of code, because it is the creation of NioEventLoopGroup, finds the newChild implementation of the subclass.
protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory)args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2], queueFactory); }
The newChild method is an abstract method whose task is to instantiate the EventLoop object.
Let's go on
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { //newTaskQueue create queue (jctools) super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); //Set nio selectorProvider this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); //Set the select policy selector to control the nio loop logic this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //selectorTuple is actually a simple bean with native selectors and wrapped selectors inside final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
The NioEventLoop constructor is to instantiate a NioEventLoop object and return it. The NioEventLoop constructor will save the provider and event poller selector, create an MpscQueue queue in its parent class, and then save the thread executor executor.
Looking back, MultithreadEventExecutorGroup internally maintains an EventExecutor[] children array. The implementation mechanism of Netty's EventLoopGroup is actually based on MultithreadEventExecutorGroup.
Finally, summarize the initialization process of EventLoopGroup:
1. Eventloopgroup (actually MultithreadEventExecutorGroup) internally maintains an array of EventExecutor 2 and children, and the length of the array is nThreads.
3. When we instantiate NioEventLoopGroup, if the thread pool size is specified, nThreads is the specified value, otherwise, the number of processor cores * 2.
4. The MultithreadEventExecutorGroup will call the newChild abstract method to initialize the children array
The abstract method newChild is implemented in NioEventLoopGroup, which returns an instance of NioEventLoop.
####Bootstrap and ServerBootstrap:
Bootstrap: client startup configuration class
ServerBootstrap: server startup configuration class
Bootstrap can also bind a local port through the bind() method as one end of UDP protocol communication.
ServerBootstrap usually uses the bind() method to bind to the local port and wait for the client to connect.
Bootstrap only needs to configure one thread group - EventLoopGroup, while ServerBootstrap needs to configure two thread groups - EventLoopGroup, one for receiving connections and the other for specific processing. It is mainly used to bind the EventLoopGroup we created, specify the type of Channel and bind the Channel processor, It is mainly used to assign values to the properties of ServerBootstrap and bootstrap, so it is called configuration class.
Inheritance relationship between Bootstrap and ServerBootstrap
AbstractBootstrap is an abstract class. Both Bootstrap and ServerBootstrap inherit from the AbstractBootstrap abstract class. AbstractBootstrap provides methods similar to the builder pattern.
/** * AbstractBootstrap It is an auxiliary class to help us better start the Channel. It provides a series of method chains to help us better configure AbstractBootstrap */ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { /** * EventLoopGroup object */ volatile EventLoopGroup group; /** * Channel Factory for creating Channel objects. */ @SuppressWarnings("deprecation") private volatile ChannelFactory<? extends C> channelFactory; /** * Local address */ private volatile SocketAddress localAddress; /** * channel Related option parameters */ private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); /** * Initialize the property value of the channel */ private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>(); /** * The business logic Handler may be a HandlerInitializer or an ordinary Handler */ private volatile ChannelHandler handler; /** * Construction method * */ AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) { group = bootstrap.group; channelFactory = bootstrap.channelFactory; handler = bootstrap.handler; localAddress = bootstrap.localAddress; synchronized (bootstrap.options) { options.putAll(bootstrap.options); } synchronized (bootstrap.attrs) { attrs.putAll(bootstrap.attrs); } } /** * Configure EventLoopGroup, which is an event loop group used to handle I/O events. It is an important implementation of asynchronous event model. */ public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { // Duplicate settings are not allowed throw new IllegalStateException("group set already"); } this.group = group; return self(); } /** * To set the Channel type, you can use NioServerSocketChannel or NioSocketChannel as parameters. The comment indicates that the passed in class is used to create the Channel instance */ public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } /** * Create ChannelFactory for Channel */ @SuppressWarnings({"unchecked", "deprecation"}) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } /** * Set the bound ip port, which is generally used for the server */ public B localAddress(SocketAddress localAddress) { this.localAddress = localAddress; return self(); } /** * Set the ChannelOption options. These options will be used when the Channel is created. Pass in null value to remove the ChannelOption */ public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { // Empty means remove synchronized (options) { options.remove(option); } } else { // Non empty, modify synchronized (options) { options.put(option, value); } } return self(); } /** * Returns a deep copy, which is implemented by a subclass */ @Override @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") public abstract B clone(); /** * Create a new {@link Channel} and register it with an {@link EventLoop}. * Create a Channel and register the Channel with EventLoop */ public ChannelFuture register() { validate(); return initAndRegister(); } /** * Create Channel and bind */ public ChannelFuture bind() { //1. Verify the necessary parameters for service startup validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } //2. Bind address port return doBind(localAddress); } /** * The implementation of client and server is different, and it is rewritten by subclasses */ abstract void init(Channel channel) throws Exception; /** * Set the ChannelHandler that handles the request */ public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return self(); } /** * Return the config object of abstractbootstrap config to obtain the config of the current bootstrap, which is implemented by subclasses */ public abstract AbstractBootstrapConfig<B, C> config(); /** * Set ChannelOptions, a subset of subclasses that can be used to initialize Channel. */ static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) { for (Map.Entry<ChannelOption<?>, Object> e : options.entrySet()) { setChannelOption(channel, e.getKey(), e.getValue(), logger); } } }
Most of these methods set related parameters. For the channel method, it should be noted that the channel sets the channel type. In fact, the channelfactory is initialized according to the type parameters passed in? Extensions C > channelfactory is a channel factory object, which will be used later to produce channel objects.
Source code analysis of ServerBootstrap and Bootstrap
ServerBootstrap server startup configuration
public void start(InetSocketAddress address) { // 1.bossGroup is used to receive connections and workerGroup is used for specific processing EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //2. Create a server boot / auxiliary class: ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap() //3. Configure two thread groups for the boot class and determine the thread model .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(address) .childHandler(new NettyServerChannelInitializer()) .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024)) .childOption(ChannelOption.SO_KEEPALIVE, true); // Bind the port and start receiving incoming connections ChannelFuture future = bootstrap.bind(address).sync(); log.info("netty Server start listening port:" + address.getPort()); //Close the channel and block until it is closed future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //Close main thread group bossGroup.shutdownGracefully(); //Close worker thread group workerGroup.shutdownGracefully(); } }
Next, let's see what the group method has
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } else { this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; } }
In the group method, we will continue to call the group method of the parent class. Through the class inheritance diagram, we know that super.group(parentGroup) actually calls the group method of AbstractBootstrap. The group code in AbstractBootstrap is as follows:
public B group(EventLoopGroup group) { ObjectUtil.checkNotNull(group, "group"); if (this.group != null) { throw new IllegalStateException("group set already"); } else { this.group = group; return this.self(); } }
It's just initialization. It's all for preparation for later operations. If you're interested, you can go in and have a look.
c here, we mainly talk about bootstrap.bind(address).sync()
Let's click bind to go in and have a look at the source code
public ChannelFuture bind(SocketAddress localAddress) { // Verify that group and channelFactory are null this.validate(); return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress")); }
This is mainly to verify whether the Bootstrap group and channelFactory are successfully bound.
We then click doBind to go in
private ChannelFuture doBind(final SocketAddress localAddress) { // Create and initialize the channel, register it with the selector, and return an asynchronous result final ChannelFuture regFuture = this.initAndRegister(); // Get channel from asynchronous result final Channel channel = regFuture.channel(); // If an exception occurs during the execution of an asynchronous operation, the asynchronous object is returned directly (end directly) if (regFuture.cause() != null) { return regFuture; // Handle the completion of asynchronous operation (it may end normally, or an exception occurs, or the task is cancelled. These situations all belong to the situation with results) } else if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); // Bind the specified port doBind0(regFuture, channel, localAddress, promise); return promise; } else { //The asynchronous operation has no result final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); // Add listening for asynchronous operations regFuture.addListener(new ChannelFutureListener() { // If the asynchronous operation has a result (i.e. completion), the execution of the method is triggered public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); // There was a problem during the execution of the asynchronous operation if (cause != null) { promise.setFailure(cause); } else { // Asynchronous operation normal results promise.registered(); // Bind the specified port AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
First, let's take several steps from top to bottom:
1. What is the relationship between ChannelPromise and ChannelFuture?
ChannelFuture internally provides methods to modify the current Future state. The modification method of setting the final state is implemented on the basis of ChannelFuture, and ChannelFuture can only query the results of the current asynchronous operation, not modify the Future of the current asynchronous result. What we need to know here is that ChannelPromise can modify the status of the current asynchronous result.
ChannelPromise is a subclass of ChannelFuture. Click ChannelPromise to see that it inherits ChannelFuture. It defines some methods, such as setSuccess(), setFailure(), sync().
2,final ChannelFuture regFuture = this.initAndRegister();
initAndRegister: the method mainly does three things:
1. Create a Channel instance.
2. Initialize the Channel.
3. Register the Channel to the selector;
final ChannelFuture initAndRegister() { Channel channel = null; try { // Create channel // channelFactory is calling the AbstractBootstrap class // Set in the public B channel (class <? Extensions C > channelclass) method // The default is the reflective channelfactory class factory, that is, the factory that generates channels through reflection // // . channel(NioServerSocketChannel.class) sets the NioServerSocketChannel class // Then an instance of NioServerSocketChannel class is generated here, and the parameterless construction method of NioServerSocketChannel class is called channel = this.channelFactory.newChannel(); // Initialize channel this.init(channel); } catch (Throwable var3) { if (channel != null) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3); } // Register the channel with the selector ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
Step 1: create a Channel using channelFactory.newChannel()
First, we debug channelFactory.newChannel() in the debug mode to see the results:
It is found that the returned type of this channelFactory is reflective channelFactory, and the value of the attribute clazz is NioServerSocketChannel.class. So how did NioServerSocketChannel.class come from?
As can be seen from the above figure, the assignment is realized through bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
The channel method in the figure above is used to assign values to channelFactory and clazz in reflective channelFactory.
Now click newChannel to find the corresponding newChannel method
public T newChannel() { try { // Call the parameterless constructor to create a channel return (Channel)this.constructor.newInstance(); } catch (Throwable var2) { throw new ChannelException("Unable to create Channel from class " + this.constructor.getDeclaringClass(), var2); } }
The channel class passed in from the Server side is NioServerSocketChannel.class, so the constructor.newInstance(); The corresponding is the parameterless construction of NioServerSocketChannel. So let's continue to click on the NioServerSocketChannel constructor.
// provider in NIO, which is used to create selector s and channel s. And it is single case private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() { // DEFAULT_SELECTOR_PROVIDER static variable this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
We then click new socket
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException var2) { throw new ChannelException("Failed to open a server socket.", var2); } }
Here, the Java NIO native Channel is returned. Finally, the NIO native Channel is packaged as NioServerSocketChannel.
Continue to click this (new socket (default_selector_provider)) to enter the following code:
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) { // Parameter 1: parent channel // Parameter 2: NIO native channel // Parameter 3: Specifies that the event concerned by the current channel is an accepted connection super((Channel)null, channel, 16); // A collection of properties used to configure the channel this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket()); }
Method is used to call the parent class constructor and generate a configuration class object. The creation of this channel is complete.
Step 2: use this.init(channel) to initialize the Channel
We click to enter the init method
void init(Channel channel) throws Exception { // Gets the options property in serverBootstrap final Map<ChannelOption<?>, Object> options = options0(); // Set the options property to channel synchronized (options) { setChannelOptions(channel, options, logger); } // Get attrs attribute in serverBootstrap final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { // Traversal attrs attribute for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); // Initialize the currently traversed attr to the channel channel.attr(key).set(e.getValue()); } } // Get pipeline of channel ChannelPipeline p = channel.pipeline(); // Write all attributes starting with child in serverBootstrap to the local variable, // Then initialize them into childChannel final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // Add the serverbootstrap acceptor processor to the pipeline // The ServerBootstrap acceptor processor is used to receive attribute values in ServerBootstrap, // We usually call it a connection processor pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
This is mainly divided into three steps:
1. Set the options and attrs attributes and assign values to Channel and config, which means that the parameter values set by ServerBootstrap and Bootstrap through option() and attr() before calling doBind(), where the options attribute is set to the config attribute of the Channel, and attrs is directly set to the Channel.
2. Setting childOptions and childAttrs: after setting the options attribute and attrs attribute, we get the pipeline of the current channel. Next, we get the attribute value we set before doBind(), and the attribute value set by the methods starting with child, childOption() and childAttr().
3. To add a ChannelInitializer actuator:
pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }
Prepare for adding the serverbootstrap acceptor connector.
Let's talk about the serverbootstrap acceptor constructor
Class inheritance structure diagram:
Because it is responsible for the establishment and initialization of client channels, it requires configuration information related to childChannel. The main attributes are:
private final EventLoopGroup childGroup; // WorkerGroup in reactor model
private final ChannelHandler childHandler;// Channelhandler of client Channel
private final Entry<ChannelOption<?>, Object>[] childOptions; Options for client Channel
private final Entry<AttributeKey<?>, Object>[] childAttrs;// Attrs for client Channel
private final Runnable enableAutoReadTask;// Task with auto read enabled
So how does the serverbootstrap acceptor trigger events?
From the ServerBootstrapAcceptor method, we can see that it overrides the channelRead and exceptionguess methods.
When the client connection is sent to the server, NioEventLoop will receive the client connection, create a SocketChannel, and trigger a channelRead callback, as shown in the following code:
public void channelRead(ChannelHandlerContext ctx, Object msg) { // msg is the data sent by the client. It is NioSocketChannel, that is, child channel and childChannel final Channel child = (Channel) msg; // Initialize the child start attribute from ServerBootstrap into childChannel (childHandler, childOptions, childAttrs) child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //Register client Channel with WorkerGroup: //1.next() polls an EventLoop.register() //2.Channel.Unsafe.register(), register the channel with the Selector //3. Trigger various callbacks //Once the Channel is registered with the EventLoop, the EventLoop is responsible for handling all events throughout its life cycle. //It should be noted that the selector here is not the same as the selector registered by the parent channel childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // If the registration fails, force the connection to close if (!future.isSuccess()) { // The bottom layer is to call the closing method of the native JDK: javaChannel().close(); forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
To sum up, I did two things:
1. Initialize childChannel.
2. Register the channel successfully connected from the client to the selector.
Next is exception caution: relay exception
When an exception occurs in ChannelRead, Netty will forcibly close the connection and call the forceClose(child, var5) method to close the connection.
// Force close connection private static void forceClose(Channel child, Throwable t) { child.unsafe().closeForcibly(); logger.warn("Failed to register an accepted channel: {}", child, t); }
The underlying layer still calls the SocketChannel.close() method of the underlying layer of the JDK to close the connection.
The ChannelRead event triggers an exception. The Pipeline will propagate the exception event and execute the exceptionCaught callback. When the serverbootstrap acceptor faces the exception, it will pause for 1 second, stop accepting the client connection, wait for the ServerSocketChannel to recover, and propagate the exception event.
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ChannelConfig config = ctx.channel().config(); if (config.isAutoRead()) { // Stop receiving new clients within 1 second config.setAutoRead(false); ctx.channel().eventLoop().schedule(this.enableAutoReadTask, 1L, TimeUnit.SECONDS); } // Propagate abnormal events ctx.fireExceptionCaught(cause); }
So far, the channel initialization code has been explained.
Step 3: this.config().group().register(channel) registers the channel with the selector
First, we click the register method to go in
According to the inheritance system of NioEventLoopGroup, you can directly find and implement MultithreadEventLoopGroup. Because only MultithreadEventLoopGroup is in its inheritance system.
Continue clicking
public ChannelFuture register(Channel channel) { return this.next().register(channel); }
Here, we need to understand the next() method, because we are now eventloopgroup. next() is to obtain an EventLoop from the current group. Then, we need to find the class that implements the register method in the EventLoop inheritance system: SingleThreadEventLoop.
public ChannelFuture register(Channel channel) { // Create a ChannelPromise and register return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); } // Here, continue to click register in promise.channel().unsafe().register(this, promise) public ChannelFuture register(ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
Find the following code
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (AbstractChannel.this.isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); } else if (!AbstractChannel.this.isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); } else { // The binding of channel and eventLoop takes place here, // Note that the eventLoop here has not bound a thread, because this thread has not been created AbstractChannel.this.eventLoop = eventLoop; // Judge whether the current thread and the thread bound by eventLoop are the same thread if (eventLoop.inEventLoop()) { //Core method this.register0(promise); } else { try { // Execute the execute() of the eventLoop bound by the current thread. This execute() will write the task parameters to the task queue and create and start a new thread eventLoop.execute(new Runnable() { eventLoop.execute(new Runnable() { public void run() { AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4); this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var4); } } } }
Click this.register0(promise) directly
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !this.ensureOpen(promise)) { return; } boolean firstRegistration = this.neverRegistered; AbstractChannel.this.doRegister(); // Binding, right here this.neverRegistered = false; AbstractChannel.this.registered = true; //Invokehandleraddedifneed here will call back to the opposite side of handlerAdded of ChannelInitializer, thus adding serverbootstrap acceptor to Pipeline AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded(); this.safeSetSuccess(promise); AbstractChannel.this.pipeline.fireChannelRegistered(); //The isActive() method here will return false. The bottom layer calls the isActive judgment of NioServerSocketChannel. At this time, the bind operation has not been completed if (AbstractChannel.this.isActive()) { if (firstRegistration) { AbstractChannel.this.pipeline.fireChannelActive(); } else if (AbstractChannel.this.config().isAutoRead()) { this.beginRead(); } } } catch (Throwable var3) { this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var3); } }
Click the AbstractChannel.this.doRegister() method to go in
Finally, the doRegister method in AbstractNioChannel is called to register events.
protected void doRegister() throws Exception { boolean selected = false; while(true) { try { // Register here and register the NIO native channel with the NIO native selector this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this.eventLoop().selectNow(); selected = true; } } }
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this) this is the code for the channel to register the Selector.
After the event registration is completed, Pipeline. Invokehandleraddedifneed() will be called; Thus, call the initChannel method of ChannelInitializer, that is, execute the following code to really add serverbootstrap acceptor to Pipeline, as shown in the following code
doBind0() binding port number
public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = ServerBootstrap.this.config.handler(); if (handler != null) { pipeline.addLast(new ChannelHandler[]{handler}); } ch.eventLoop().execute(new Runnable() { public void run() { //4. Add serverbootstrap acceptor pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); }
private ChannelFuture doBind(final SocketAddress localAddress) { // Create and initialize the channel, register it with the selector, and return an asynchronous result final ChannelFuture regFuture = this.initAndRegister(); // Get channel from asynchronous result final Channel channel = regFuture.channel(); // If an exception occurs during the execution of an asynchronous operation, the asynchronous object is returned directly (end directly) if (regFuture.cause() != null) { return regFuture; // Handle the completion of asynchronous operation (it may end normally, or an exception occurs, or the task is cancelled. These situations all belong to the situation with results) } else if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); // Bind the specified port doBind0(regFuture, channel, localAddress, promise); return promise; } else { //The asynchronous operation has no result final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); // Add listening for asynchronous operations regFuture.addListener(new ChannelFutureListener() { // If the asynchronous operation has a result (i.e. completion), the execution of the method is triggered public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); // There was a problem during the execution of the asynchronous operation if (cause != null) { promise.setFailure(cause); } else { // Asynchronous operation normal results promise.registered(); // Bind the specified port AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
doBind0 method found
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { public void run() { // Binding can only be performed after the channel is initialized and registered successfully if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
Click channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); Entry method
It is best to call the AbstractChannel bind method
Continue to the next step
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return this.tail.bind(localAddress, promise); }
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { ObjectUtil.checkNotNull(localAddress, "localAddress"); if (this.isNotValidPromise(promise, false)) { return promise; } else { final AbstractChannelHandlerContext next = this.findContextOutbound(512); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { public void run() { next.invokeBind(localAddress, promise); } }, promise, (Object)null, false); } return promise; } }
Continue to click next.invokeBind(localAddress, promise); next step
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (this.invokeHandler()) { try { ((ChannelOutboundHandler)this.handler()).bind(this, localAddress, promise); } catch (Throwable var4) { notifyOutboundHandlerException(var4, promise); } } else { this.bind(localAddress, promise); } }
Continue to the next step
! [insert picture description here]( https://img-blog.csdnimg.cn/a2717310542f42aca2d053022c2128a7.png?x-oss-process=image/watermark,type
_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA6Zey5b6X5peg6IGK55qE5Lq6,size_20,color_FFFFFF,t_70,g_se,x_16)
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { this.unsafe.bind(localAddress, promise); }
Continue to the next step
public final void bind(SocketAddress localAddress, ChannelPromise promise) { this.assertEventLoop(); if (promise.setUncancellable() && this.ensureOpen(promise)) { if (Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { AbstractChannel.logger.warn("A non-root user can't receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested."); } // Gets whether the current channel is activated. Note that it has not been activated yet, so its value is false boolean wasActive = AbstractChannel.this.isActive(); try { / binding AbstractChannel.this.doBind(localAddress); } catch (Throwable var5) { this.safeSetFailure(promise, var5); this.closeIfClosed(); return; } if (!wasActive && AbstractChannel.this.isActive()) { this.invokeLater(new Runnable() { public void run() { // Triggers the execution of the overridden channelActivate method } AbstractChannel.this.pipeline.fireChannelActive(); } }); } this.safeSetSuccess(promise); } }
Then the doBind0 method of NioServerSocketChannel is invoked, where the underlying nio package of java is called.
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { this.javaChannel().bind(localAddress, this.config.getBacklog()); } else { this.javaChannel().socket().bind(localAddress, this.config.getBacklog()); } }
javaChannel() is the way to get NIO native channel, and then get the NIO native channel and then call the bind method to complete the binding.