Netty Source Analysis--- ChannelPipeline

netty establishes the corresponding channel in the process of service-side port binding and new connection establishment. Closely related to the channel's action is the concept of pipeline, which appears to be a pipeline in which raw materials (byte stream) enter, are processed, and finally output.

pipeline initialization

In the previous article, we learned that when you create a NioSocketChannel, you create the core components of netty

pipeline is one of them and is created in the following code

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

A reference to pipeline is saved in NioSocketChannel

DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

A reference to the channel is saved in the pipeline, and when the pipeline is created, the entire pipeline looks like this

 

Each node in the pipeline is a ChannelHandlerContext object, and each context node holds the context required by its wrapped executor, ChannelHandler, to perform an operation. This is actually pipeline, because pipeline contains a reference to channel and can get all the context information

pipeline add node

Here is a very common piece of client code

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});

First, the source TCP packets are unpacked with a spliter, then the unpacked packets are decoder, passed into the Business Processor BusinessHandler, the business process is encoder, output

The entire pipeline structure is as follows

 

I distinguish two different types of nodes in the pipeline using two colors: ChannelInboundHandler, which handles inBound events, most typically reading data streams and processing; and ChannelOutboundHandler, which handles outBound events, such as when calling writeAndFlush ()This type of handler passes through

Regardless of the handler type, the outer objects ChannelHandlerContext are connected through a two-way Chain table. To distinguish whether a ChannelHandlerContext is in or out, we can see how netty handles when adding a node

DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.Check for duplicates handler
        checkMultiplicity(handler);
        // 2.Create Node
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.Add Node
        addLast0(newCtx);
    }
   
    // 4.Callback user method
    callHandlerAdded0(handler);
    
    return this;
}

The simple synchronized method here is to prevent multithreaded concurrent operations on the two-way chained list at the bottom of the pipeline

Let's go through this code step by step

Check for duplicate handler s

When you add a handler to your user code, you first check to see if it has been added

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

netty uses a member variable added to identify whether a channel has been added. This code is simple. If the current Andler to be added is not shared and has already been added, an exception is thrown. Otherwise, it identifies that the handler has been added

Thus, if a Handler is sharable, it can be added to the pipeline indefinitely. If our client code wants a Handler to be shared, just add a @Sharable label as follows

@Sharable
public class BusinessHandler {
    
}

If the Handler is sharable, it is typically used by spring's injection, not necessarily new one at a time

The isSharable() method is implemented by whether the corresponding class of the Handler labels @Sharable

ChannelHandlerAdapter

public boolean isSharable() {
   Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}

Determine whether there is a Sharable.class annotation by reflection

Create Node

Back to the main process, look at the code for creating context

newCtx = newContext(group, filterName(name, handler), handler);

Here we need to analyze the code filterName(name, handler), which is used to create a unique name for the handler

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}

Obviously, if the name we passed in is null, netty will generate a default name for us. Otherwise, check to see if there is a duplicate name and return if the check passes

netty's rule for creating default names is simple class names#0, so let's see how this works

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private String generateName(ChannelHandler handler) {
    // First check to see if a default has been generated in the cache name
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    // If not, a default is generated name,Join Cache 
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }

    // Build finished, depending on default name Is there a conflict
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}

netty uses a FastThreadLocal variable to cache the mapping relationship between Handler's class and default name. When generating a name, first check to see if a default name (simple class name#0) has been generated in the cache. If not, call generateName0() to generate a default name, then join the cache

Next, you need to check if the name conflicts with an existing name, call context0(), and find out if there is a corresponding context in the pipeline

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}

The context0() method chain traverses each ChannelHandlerContext, returning a context whenever it finds that its name is the same as the name to be added, and throwing an exception shows that this is actually a linear search process

If context0 (name)!= null is true, there is already a default name in the existing context, then look up from the simple class name #1 until you find a unique name, such as the simple class name #3

If the user code specifies a name when adding a Handler, then all it does is check for duplicates

private void checkDuplicateName(String name) {
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

After processing the name, you enter the process of creating the context, which is known from the previous call chain that the group is null, so the child Executor (group) also returns null

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

DefaultChannelHandlerContext

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

In the constructor, DefaultChannelHandlerContext returns the parameter to the parent class, saves the reference of the Handler, and enters its parent class

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}

Two fields are used in netty to indicate whether this channelHandlerContext belongs to an inBound or an outBound, or both. The two Booleans are determined by the following two small functions (see the code above).

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

The instanceof keyword determines by the interface type, so if a Handler implements two types of interfaces, it is both an inBound type Handler and an outBound type Handler, such as the following

 

A common codec that combines decode and encode operations generally inherits MessageToMessageCodec, which inherits ChannelDuplexHandler

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
        throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
        throws Exception;
}

After the contexts have been created, the next step is to finally add the created contexts to the pipeline

Add Node

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}

This is a simple illustration of the process, which, to put it plainly, is a two-way chain table insertion.

When the operation is complete, the context is added to the pipeline

Now that pipeline has finished adding nodes, you can use this idea to learn all the addxxx() series methods

Callback user method

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
    ctx.setAddComplete();
}

By the fourth step, the new node in the pipeline is added, and the user code ctx.handler().handlerAdded(ctx) is called back; common user code is as follows

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // Callback here after the node has been added
        // do something
    }
}

Next, set the state of the node

AbstractChannelHandlerContext

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}

Modify the state of the node with cas to: REMOVE_COMPLETE (indicating that the node has been removed) or ADD_COMPLETE

pipeline delete node

One of the most important features of netty is that Handler is pluggable to dynamically weave pipelines. For example, when connecting for the first time, you need to authenticate with privileges. After the authentication is passed, you can remove this context. The next time the pipeline propagates an event, it will not call the privilege authentication processor

Below is the simplest implementation of a privilege authentication Handler. The first packet is the authentication information. If the check passes, delete the Handler. Otherwise, close the connection directly

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}

The focus is on the code ctx.pipeline().remove(this)

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}

The remove operation is much simpler than add and consists of three steps:

1. Find the node to delete
2. Adjust two-way Chain pointer deletion
3. Call back user functions

Find Node to Delete

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

Here, in order to find the context corresponding to the Handler, the node is found by iterating through the two-way chain table one by one until a context's Handler is the same as the current Handler

Adjust bi-directional list pointer deletion

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.Adjust bi-directional list pointer deletion
        remove0(ctx);
    }
    // 3.Callback user function
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

The process is simpler than adding nodes and can be represented by the following diagram

 

The final result is

 

Combining these two graphs gives you a clear idea of how privilege validation Handler works. In addition, deleted nodes are automatically recycled by gc over time because they have no object references

Callback user function

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}

In the third step, the node deletion in the pipeline is complete, and the callback to the user code ctx.handler().handlerRemoved(ctx)) begins; common code is as follows

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // Call back here after the node has been deleted to do some resource cleanup
        // do something
    }
}

Finally, set the state of the node to remove

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}

summary

1. There is only one ChannelPipeline corresponding to each Channel in Netty.

2. ChannelPipeline is a two-way chain table that maintains AbstractChannelHandlerContext as the node, where the chain table is a two-way chain table with head (HeadContext) as the head and tail (TailContext) as the tail.

3. Each node in the pipeline is enclosed with a specific processor ChannelHandler, which determines whether the node belongs to in or out or both depending on whether the ChannelHandler is of type ChannelInboundHandler or ChannelOutboundHandler

Keywords: Java Netty Spring codec

Added by alexguz79 on Mon, 09 Sep 2019 06:50:30 +0300