netty establishes the corresponding channel in the process of service-side port binding and new connection establishment. Closely related to the channel's action is the concept of pipeline, which appears to be a pipeline in which raw materials (byte stream) enter, are processed, and finally output.
pipeline initialization
In the previous article, we learned that when you create a NioSocketChannel, you create the core components of netty
pipeline is one of them and is created in the following code
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
A reference to pipeline is saved in NioSocketChannel
DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
A reference to the channel is saved in the pipeline, and when the pipeline is created, the entire pipeline looks like this
Each node in the pipeline is a ChannelHandlerContext object, and each context node holds the context required by its wrapped executor, ChannelHandler, to perform an operation. This is actually pipeline, because pipeline contains a reference to channel and can get all the context information
pipeline add node
Here is a very common piece of client code
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new Spliter()) p.addLast(new Decoder()); p.addLast(new BusinessHandler()) p.addLast(new Encoder()); } });
First, the source TCP packets are unpacked with a spliter, then the unpacked packets are decoder, passed into the Business Processor BusinessHandler, the business process is encoder, output
The entire pipeline structure is as follows
I distinguish two different types of nodes in the pipeline using two colors: ChannelInboundHandler, which handles inBound events, most typically reading data streams and processing; and ChannelOutboundHandler, which handles outBound events, such as when calling writeAndFlush ()This type of handler passes through
Regardless of the handler type, the outer objects ChannelHandlerContext are connected through a two-way Chain table. To distinguish whether a ChannelHandlerContext is in or out, we can see how netty handles when adding a node
DefaultChannelPipeline
@Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); }
@Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { for (ChannelHandler h: handlers) { addLast(executor, null, h); } return this; }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { // 1.Check for duplicates handler checkMultiplicity(handler); // 2.Create Node newCtx = newContext(group, filterName(name, handler), handler); // 3.Add Node addLast0(newCtx); } // 4.Callback user method callHandlerAdded0(handler); return this; }
The simple synchronized method here is to prevent multithreaded concurrent operations on the two-way chained list at the bottom of the pipeline
Let's go through this code step by step
Check for duplicate handler s
When you add a handler to your user code, you first check to see if it has been added
private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } }
netty uses a member variable added to identify whether a channel has been added. This code is simple. If the current Andler to be added is not shared and has already been added, an exception is thrown. Otherwise, it identifies that the handler has been added
Thus, if a Handler is sharable, it can be added to the pipeline indefinitely. If our client code wants a Handler to be shared, just add a @Sharable label as follows
@Sharable public class BusinessHandler { }
If the Handler is sharable, it is typically used by spring's injection, not necessarily new one at a time
The isSharable() method is implemented by whether the corresponding class of the Handler labels @Sharable
ChannelHandlerAdapter
public boolean isSharable() { Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; }
Determine whether there is a Sharable.class annotation by reflection
Create Node
Back to the main process, look at the code for creating context
newCtx = newContext(group, filterName(name, handler), handler);
Here we need to analyze the code filterName(name, handler), which is used to create a unique name for the handler
private String filterName(String name, ChannelHandler handler) { if (name == null) { return generateName(handler); } checkDuplicateName(name); return name; }
Obviously, if the name we passed in is null, netty will generate a default name for us. Otherwise, check to see if there is a duplicate name and return if the check passes
netty's rule for creating default names is simple class names#0, so let's see how this works
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() throws Exception { return new WeakHashMap<Class<?>, String>(); } }; private String generateName(ChannelHandler handler) { // First check to see if a default has been generated in the cache name Map<Class<?>, String> cache = nameCaches.get(); Class<?> handlerType = handler.getClass(); String name = cache.get(handlerType); // If not, a default is generated name,Join Cache if (name == null) { name = generateName0(handlerType); cache.put(handlerType, name); } // Build finished, depending on default name Is there a conflict if (context0(name) != null) { String baseName = name.substring(0, name.length() - 1); for (int i = 1;; i ++) { String newName = baseName + i; if (context0(newName) == null) { name = newName; break; } } } return name; }
netty uses a FastThreadLocal variable to cache the mapping relationship between Handler's class and default name. When generating a name, first check to see if a default name (simple class name#0) has been generated in the cache. If not, call generateName0() to generate a default name, then join the cache
Next, you need to check if the name conflicts with an existing name, call context0(), and find out if there is a corresponding context in the pipeline
private AbstractChannelHandlerContext context0(String name) { AbstractChannelHandlerContext context = head.next; while (context != tail) { if (context.name().equals(name)) { return context; } context = context.next; } return null; }
The context0() method chain traverses each ChannelHandlerContext, returning a context whenever it finds that its name is the same as the name to be added, and throwing an exception shows that this is actually a linear search process
If context0 (name)!= null is true, there is already a default name in the existing context, then look up from the simple class name #1 until you find a unique name, such as the simple class name #3
If the user code specifies a name when adding a Handler, then all it does is check for duplicates
private void checkDuplicateName(String name) { if (context0(name) != null) { throw new IllegalArgumentException("Duplicate handler name: " + name); } }
After processing the name, you enter the process of creating the context, which is known from the previous call chain that the group is null, so the child Executor (group) also returns null
DefaultChannelPipeline
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } private EventExecutor childExecutor(EventExecutorGroup group) { if (group == null) { return null; } //.. }
DefaultChannelHandlerContext
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; }
In the constructor, DefaultChannelHandlerContext returns the parameter to the parent class, saves the reference of the Handler, and enters its parent class
AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; }
Two fields are used in netty to indicate whether this channelHandlerContext belongs to an inBound or an outBound, or both. The two Booleans are determined by the following two small functions (see the code above).
DefaultChannelHandlerContext
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
The instanceof keyword determines by the interface type, so if a Handler implements two types of interfaces, it is both an inBound type Handler and an outBound type Handler, such as the following
A common codec that combines decode and encode operations generally inherits MessageToMessageCodec, which inherits ChannelDuplexHandler
MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler { protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception; protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception; }
After the contexts have been created, the next step is to finally add the created contexts to the pipeline
Add Node
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; // 1 newCtx.next = tail; // 2 prev.next = newCtx; // 3 tail.prev = newCtx; // 4 }
This is a simple illustration of the process, which, to put it plainly, is a two-way chain table insertion.
When the operation is complete, the context is added to the pipeline
Now that pipeline has finished adding nodes, you can use this idea to learn all the addxxx() series methods
Callback user method
AbstractChannelHandlerContext
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); }
By the fourth step, the new node in the pipeline is added, and the user code ctx.handler().handlerAdded(ctx) is called back; common user code is as follows
public class DemoHandler extends SimpleChannelInboundHandler<...> { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // Callback here after the node has been added // do something } }
Next, set the state of the node
AbstractChannelHandlerContext
final void setAddComplete() { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return; } } }
Modify the state of the node with cas to: REMOVE_COMPLETE (indicating that the node has been removed) or ADD_COMPLETE
pipeline delete node
One of the most important features of netty is that Handler is pluggable to dynamically weave pipelines. For example, when connecting for the first time, you need to authenticate with privileges. After the authentication is passed, you can remove this context. The next time the pipeline propagates an event, it will not call the privilege authentication processor
Below is the simplest implementation of a privilege authentication Handler. The first packet is the authentication information. If the check passes, delete the Handler. Otherwise, close the connection directly
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception { if (verify(authDataPacket)) { ctx.pipeline().remove(this); } else { ctx.close(); } } private boolean verify(ByteBuf byteBuf) { //... } }
The focus is on the code ctx.pipeline().remove(this)
@Override public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; }
The remove operation is much simpler than add and consists of three steps:
1. Find the node to delete
2. Adjust two-way Chain pointer deletion
3. Call back user functions
Find Node to Delete
DefaultChannelPipeline
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) { AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; } } @Override public final ChannelHandlerContext context(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null) { return null; } if (ctx.handler() == handler) { return ctx; } ctx = ctx.next; } }
Here, in order to find the context corresponding to the Handler, the node is found by iterating through the two-way chain table one by one until a context's Handler is the same as the current Handler
Adjust bi-directional list pointer deletion
DefaultChannelPipeline
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { assert ctx != head && ctx != tail; synchronized (this) { // 2.Adjust bi-directional list pointer deletion remove0(ctx); } // 3.Callback user function callHandlerRemoved0(ctx); return ctx; } private static void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; // 1 next.prev = prev; // 2 }
The process is simpler than adding nodes and can be represented by the following diagram
The final result is
Combining these two graphs gives you a clear idea of how privilege validation Handler works. In addition, deleted nodes are automatically recycled by gc over time because they have no object references
Callback user function
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved(); } }
In the third step, the node deletion in the pipeline is complete, and the callback to the user code ctx.handler().handlerRemoved(ctx)) begins; common code is as follows
public class DemoHandler extends SimpleChannelInboundHandler<...> { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // Call back here after the node has been deleted to do some resource cleanup // do something } }
Finally, set the state of the node to remove
final void setRemoved() { handlerState = REMOVE_COMPLETE; }
summary
1. There is only one ChannelPipeline corresponding to each Channel in Netty.
2. ChannelPipeline is a two-way chain table that maintains AbstractChannelHandlerContext as the node, where the chain table is a two-way chain table with head (HeadContext) as the head and tail (TailContext) as the tail.
3. Each node in the pipeline is enclosed with a specific processor ChannelHandler, which determines whether the node belongs to in or out or both depending on whether the ChannelHandler is of type ChannelInboundHandler or ChannelOutboundHandler