Dubbo source code analysis - the whole process of sending requests by consumers

preface:

Previous articles have combed the whole process from front to back in terms of call structure.

This article analyzes the whole process from the perspective of actual combat call. Before it was abstract, now it is actual combat.

1. Example code

The code is the same as before. I'll post it here again

1.1 provider

public class ProviderApplication {
    public static void main(String[] args) throws Exception {
        ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(DemoService.class);
        service.setRef(new DemoServiceImpl());
        service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        service.export();

        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

1.2 comsumer

public class ConsumerApplication {
    public static void main(String[] args) {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        reference.setScope("remote");
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

2. Consumers call the whole process

The previous blog has analyzed the main steps. Here, the author has a quick look

Let's start with String message = service.sayHello("dubbo"); This call begins

2.1 service is Proxy0

Proxy0 created by javassist inherits the proxy abstract class. The brief contents are as follows:

public final class $Proxy0
        extends Proxy
        implements Subject {
    private static Method m1;
    private static Method m2;
    private static Method m3;
    private static Method m0;

    public $Proxy0(InvocationHandler paramInvocationHandler) {
        super(paramInvocationHandler);
    }
	@Override
    public final String sayHello(String paramString) {
        try {
        // It is essentially a call to InvocationHandler
            return (String) this.h.invoke(this, m3, new Object[]{paramString});
        } catch (Error | RuntimeException localError) {
            throw localError;
        } catch (Throwable localThrowable) {
            throw new UndeclaredThrowableException(localThrowable);
        }
    }
}

The call of Proxy proxy class to method is finally reflected in the call of InvokerInvocationHandler

2.2 InvokerInvocationHandler.invoke()

public class InvokerInvocationHandler implements InvocationHandler {
    private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
    private final Invoker<?> invoker;
    private ConsumerModel consumerModel;
	...

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        ...
        // Wrap all the requested information, including request method, interface information and parameter information    
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        // The invoker here = mockclusterinvoker
        return invoker.invoke(rpcInvocation).recreate();
    }
}

All information of the request body is wrapped in RpcInvocation and handed over to the next Invoker (MockClusterInvoker) for processing

2.3 ClusterInvoker.invoke()

The call chain of ClusterInvoker is: mockclusterinvoker -- > failoverclusterinvoker

The main function of MockClusterInvoker is to visit mocks. We will explain this in detail later;

FailoverClusterInvoker is of great significance. It provides A cluster fault-tolerant scheme when consumers call. In the case of multiple service providers, when the current consumer fails to call provider A, it will automatically switch to provider B to call again, and retry N times at most.

Of course, in addition to FailoverClusterInvoker, there are many other cluster fault-tolerant schemes to choose from, which will be described in a series of articles in the future.

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
 
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // retry count
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        ...
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                checkInvokers(copyInvokers, invocation);
            }
            // Select the appropriate dubbo provider Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // Initiate a call for this Invoker
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    ...
                }
                return result;
            // Exception, enter the loop again and enter the next call    
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        ...
    }
}

Under the packaging of ClusterInvoker, the implementation of Dubbo cluster fault tolerance and load balancing strategy is completed. These two aspects will be introduced in more detail later.  

2.4 ProtocolFilterWrapper.invoke()

The call of Filter wrapper class will go through a series of Filter calls before calling DubboProtocol

public class ProtocolFilterWrapper implements Protocol {
 
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // Obtain all filters through SPI (belonging to the Consumer group)
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                   ...
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            // Call Filter one by one
                            asyncResult = filter.invoke(next, invocation);
                        } ...
                };
            }
        }

        return last;
    }
}

It seems that each framework has a Filter layer, so you can do some global operations before the real call. Later, there will be a special topic on Filter, which we can know here.

2.5 ListenerInvokerWrapper.invoke()

public class ListenerInvokerWrapper<T> implements Invoker<T> {
 
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }

}

It is mainly to create a series of listeners for subsequent analysis

2.6 synchronization? Asynchronous call?

public class AsyncToSyncInvoker<T> implements Invoker<T> {

    private Invoker<T> invoker;
	...
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = invoker.invoke(invocation);

        try {
            // If it is a synchronous call, wait for the result set
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } ...
        return asyncResult;
    }
}

In order to improve performance, Dubbo provides asynchronous call mode, which will be introduced later.

By default, they are synchronous calls and will wait for the result set

2.7 DubboInvoker.doInvoker()

Finally, it is executed to the real calling class

public class DubboInvoker<T> extends AbstractInvoker<T> {
	protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        // Poll for available client s
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // Finally, synchronous or asynchronous calls are made by calling currentClient.send()
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

2.8 HeaderExchangeChannel.request()

final class HeaderExchangeChannel implements ExchangeChannel {
 
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // Create request object
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        // The main content is request, including interface name, method name and parameter information
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            // Send it to the channel
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

It's almost the end. The subsequent specific sending work is entrusted to NettyClient (default)

2.9 NettyClient.send()

public class NettyClient extends AbstractClient {
	public void send(Object message, boolean sent) throws RemotingException {
        // If no connection is created, the connection is created first
        if (needReconnect && !isConnected()) {
            connect();
        }
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // Finally, send it to the channel
        channel.send(message, sent);
    }
}

2.10 NettyChannel.send()

final class NettyChannel extends AbstractChannel {
	public void send(Object message, boolean sent) throws RemotingException {
        // whether the channel is closed
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // Finally, it is sent through channel
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // wait timeout ms
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
}

Finally, the request was sent through Netty's channel

The process is a little long. Let's show it through the sequence diagram

 

Keywords: Java Database MySQL

Added by altergothen on Tue, 30 Nov 2021 06:51:17 +0200