netty series: detailed explanation of channelPipeline

brief introduction

When we introduced channel, we mentioned that almost all the implementations in channel are implemented through channelPipeline. As a pipeline, how does it work?

Let's have a look.

ChannelPipeline

ChannelPipeline is an interface, which inherits three interfaces: channelinboundinvoker, channeloutbooundinvoker and Iterable:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

Inherited from ChannelInboundInvoker, which means that ChannelPipeline can trigger some events of channel inbound, such as:

ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();

Inherited from ChannelOutboundInvoker, it means that ChannelPipeline can perform some channel active operations, such as bind,connect,disconnect,close,deregister,read,write,flush and so on.

Inherited from Iterable, which means that ChannelPipeline is ergodic. Why is ChannelPipeline ergodic?

Because one or more channelhandlers can be added to the ChannelPipeline, the ChannelPipeline can be regarded as a collection of channelhandlers.

For example, ChannelPipeline provides a series of methods to add ChannelHandler:

ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addFirst(ChannelHandler... handlers);

ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

You can add handler s from the front, from the back, or from a specific location.

In addition, you can delete a specific channelHandler from the pipeline, or move out and replace a handler at a specific location:

ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

Of course, there are no fewer corresponding query operations:

ChannelHandler first();
ChannelHandler last();
ChannelHandler get(String name);
List<String> names();

You can also obtain the ChannelHandlerContext corresponding to the handler according to the incoming ChannelHandler.

ChannelHandlerContext context(ChannelHandler handler);

There are also some channel related events triggered in ChannelPipeline, such as:

    ChannelPipeline fireChannelRegistered();
    ChannelPipeline fireChannelUnregistered();
    ChannelPipeline fireChannelActive();
    ChannelPipeline fireChannelInactive();
    ChannelPipeline fireExceptionCaught(Throwable cause);
    ChannelPipeline fireUserEventTriggered(Object event);
    ChannelPipeline fireChannelRead(Object msg);
    ChannelPipeline fireChannelReadComplete();
    ChannelPipeline fireChannelWritabilityChanged();

Event delivery

Some friends may ask, since the ChannelPipeline contains many handlers, how are the events in the handler passed?

In fact, these events are triggered by calling the corresponding methods in ChannelHandlerContext.

For Inbound events, you can call the following methods to pass the event:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

For Outbound events, you can call the following methods to pass the event:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

Specifically, the corresponding method in ChannelHandlerContext is invoked in handler:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           System.out.println("Connected!");
           ctx.fireChannelActive();
       }
   }
  
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
           System.out.println("Closing ..");
           ctx.close(promise);
       }
   }
   

DefaultChannelPipeline

There is an official implementation of ChannelPipeline called DefaultChannelPipeline, because for pipeline, the main function is to manage the handler and pass events. Compared with, the function is relatively simple, but it also has some special implementation places. For example, it has two AbstractChannelHandlerContext type head and tail.

We know that ChannelPipeline is actually a collection of many handlers, so how are these collections stored? This stored data structure is AbstractChannelHandlerContext. Each AbstractChannelHandlerContext has a next node and a prev node, which are used to form a two-way linked list.

Similarly, use head and tail in the DefaultChannelPipeline to store the encapsulated handler.

Note that although both head and tail here are AbstractChannelHandlerContext, they are slightly different. Let's first look at the definitions of head and tail:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

In the constructor of DefaultChannelPipeline, initialize tail and head, where tail is TailContext and head is HeadContext.

TailContext implements the ChannelInboundHandler interface:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler

The HeadContext implements the ChannelOutboundHandler and ChannelInboundHandler interfaces:

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler 

Let's take the addFirst method as an example to see how the handler is added to the pipline:

    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            name = filterName(name, handler);

            newCtx = newContext(group, name, handler);

            addFirst0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

Its working logic is to first build a new context based on the incoming handler, then call the addFirst0 method to add context to the two-way linked list consisting of AbstractChannelHandlerContext:

    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }

The callHandlerAdded0 method is then invoked to trigger the handlerAdded method of context.

summary

channelPipeline is responsible for managing various handlers of the channel. In DefaultChannelPipeline, the head and tail of AbstractChannelHandlerContext are used to store multiple handlers. At the same time, this chain structure is used to manage various handlers, which is very convenient.

This article has been included in http://www.flydean.com/04-3-netty-channelpipeline/

The most popular interpretation, the most profound dry goods, the most concise tutorial, and many tips you don't know are waiting for you to find!

Welcome to my official account: "those things in procedure", understand technology, know you better!

Keywords: Java Netty NIO

Added by mananx on Mon, 28 Feb 2022 12:52:03 +0200