In the previous article, we looked at what was done during the startup phase of a Dubbo service reference, and what was not seen during the remote invocation phase.The Dubbo service invocation process is complex and involves many steps, such as sending requests, codec, service demotion, filter chain processing, serialization, thread dispatch, and responding to requests.For space reasons, this article cannot analyze all the steps one by one.This article will focus on the sending and receiving of requests, codec, thread dispatch, and the sending and receiving of responses. As for service degradation, filter chain and serialization, you will analyze them yourself.
1. Agent Object
Dubbo supports both synchronous and asynchronous calls, where asynchronous calls can also be subdivided into asynchronous calls with return values and asynchronous calls with no return values.The so-called "no return value" asynchronous call refers to a service consumer who only calls, but does not care about the result of the call, in which case Dubbo will directly return an empty RpcResult.To use the asynchronous feature, service consumers need to manually configure it.By default, Dubbo uses synchronous calls.
In the previous article, we analyzed that dubbo actually calls remote methods through a proxy class, so let's first look at the details in the proxy class through decompiling:
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { // Method Array public static Method[] methods; private InvocationHandler handler; public proxy0(InvocationHandler invocationHandler) { this.handler = invocationHandler; } public proxy0() { } public String sayHello(String string) { // Store parameters in an Object array Object[] arrobject = new Object[]{string}; // Call the invoke method of the InvocationHandler implementation class to get the result of the call Object object = this.handler.invoke(this, methods[0], arrobject); // Returns the result of the call return (String)object; } /** Echo Test Method */ public Object $echo(Object object) { Object[] arrobject = new Object[]{object}; Object object2 = this.handler.invoke(this, methods[1], arrobject); return object2; } }
The logic of the proxy class is simple.First, the runtime parameters are stored in an array, then the InvocationHandler interface is called to implement the invoke method of the class to get the result of the call, and then the result is transformed and returned to the caller.
2. InvocationHandler
Next, look at the InvocationHandler.invoke() method
//InvokerInvocationHandler.java public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //Target method invoked String methodName = method.getName(); //Parameter type Class<?>[] parameterTypes = method.getParameterTypes(); // Intercept methods defined in Object classes (not overridden by subclasses), such as wait/notify if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // If methods such as toString, hashCode, and equals are overridden by subclasses, they are also called directly here if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // Encapsulate method and args into RpcInvocation and make subsequent calls return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
3. Service Demotion
Next we will call the MockClusterInvoker.invoke() method, which is a processing class for service degradation. Let's see the code:
//MockClusterInvoker.java public Result invoke(Invocation invocation) throws RpcException { Result result = null; // Get mock configuration value String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { // Without mock logic, call invoke methods of other Invoker objects directly. // For example, FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { // force:xxx executes mock logic directly without making remote calls result = doMockInvoke(invocation, null); } else { // fail:xxx means that after the consumer fails to invoke the service, it executes the mock logic without throwing an exception try { // Call invoke method of other Invoker objects result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { // Call failed to execute mock logic result = doMockInvoke(invocation, e); } } } return result; }
4. Get available service providers, routes
The comment here is clear enough to go beyond it, and then look at this.invoker.invoke()
//AbstractClusterInvoker.java /** * * @param invocation RpcInvocation(method,args) * @return * @throws RpcException */ @Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; // binding attachments into invocation. //Implicit Pass-Through Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } //Get list of available service providers List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { //Get Load Balancing Policy loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
The main content of this code is to get a list of available service providers
- The list(invocation) method first retrieves the invokers collection of service providers we previously saved in the service reference from the Directory
- The Router interface is then called to route these invokers, filtering out the list of available service providers to return.
Take a brief look at the RegistryDirectory.doList() method:
//RegistryDirectory.java public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // The service provider shuts down or disables the service, throwing a No provider exception throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry ..."); } List<Invoker<T>> invokers = null; // Get Invoker Local Cache Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { // Get method name and parameter list String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); // Detect if the first parameter in the parameter list is of type String or enum if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { // Query Invoker list by method name + first parameter name, use scenario is temporarily unexpected invokers = localMethodInvokerMap.get(methodName + "." + args[0]); } if (invokers == null) { // Get Invoker List by Method Name invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { // Get Invoker List by Asterisk* invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } // Redundant logic, pull request #61removes the following if branch code if (invokers == null) { Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } // Return to the Invoker list return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; }
The logic here is also simple: get the method name from invocation, then get the invoker corresponding to the method name from the local MethodInvokerMap based on the key.
5. Cluster fault tolerance
Let's go back to the AbstractClusterInvoker.doInvoke() method. With a list of service providers and load balancing strategy, the next step is the fault tolerance phase. There are five dubbo fault tolerance mechanisms, such as Failove, Failfast, and so on. The default is Failover, which means failed retries. Next, take a look at FailoverClusterInvoker.doInvoke():
//FailoverClusterInvoker.java @Override public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); // Get retry count len int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } RpcException le = null; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); // Loop call, failed retry for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); // Re-enumerating Invoker before retrying has the advantage that if a service hangs, // Get the latest available Invoker list by calling list copyinvokers = list(invocation); // Null check for copyinvokers checkInvokers(copyinvokers, invocation); } // Select Invoker through Load Balancing Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); // Add to invoker to invoked list invoked.add(invoker); // Set invoked to RPC context RpcContext.getContext().setInvokers((List) invoked); try { // Call invoke method of target Invoker Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { //Business exception, no retry if (e.isBiz()) { throw e; } //Record exceptions le = e; } catch (Throwable e) { //Record exceptions le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // If the retry fails, an exception is thrown throw new RpcException(..., "Failed to invoke the method ..."); } }
Gets the number of retries, then makes a circular call based on the number of retries, and retries after failures.
Within the for loop, an Invoker is selected through the load balancing component first, and then called remotely through the invoke method of the Invoker.If it fails, record the exception and try again.The list method of the parent class is called again to enumerate Invoker when retrying.
6. Load Balancing
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked) This is the code for load balancing to choose invoker, which is not specifically analyzed here.
7. DubboInvoker
Next, let's look at Result result = invoker.invoke(invocation); the code, which will eventually execute into the DubboInvoker.doInvoke() method:
//DubboInvoker.java public class DubboInvoker<T> extends AbstractInvoker<T> { private final ExchangeClient[] clients; protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // Set path and version to attachment inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { // Get ExchangeClient from clients array currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // Get Asynchronous Configuration boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // Is there a return value for the current method isOneway boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // Asynchronous no return value if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // Send Request currentClient.send(inv, isSent); // Set the future field to null in the context RpcContext.getContext().setFuture(null); // Return an empty RpcResult return new RpcResult(); } // Asynchronous Has Return Value else if (isAsync) { // Send the request and get a ResponseFuture instance ResponseFuture future = currentClient.request(inv, timeout); // Set future to context RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // Temporarily returns an empty result return new RpcResult(); } // Synchronous Call else { RpcContext.getContext().setFuture(null); // Send the request, get a ResponseFuture instance, and call its get method to wait return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(..., "Invoke remote method timeout...."); } catch (RemotingException e) { throw new RpcException(..., "Failed to invoke remote method: ..."); } } // Omit other methods }
clients here are the encapsulation of the netty communication client that we built when the service was referenced previously.
The code above contains Dubbo's processing logic for both synchronous and asynchronous calls, which is not covered here.
Next, look at the currentClient.request(inv, timeout) method, which passes through the
ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request This execution chain eventually executes the following methods:
//HeaderExchangeChannel.java @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(..., "Failed to send request ...); } // Create Request Object Request req = new Request(); req.setVersion(Version.getProtocolVersion()); // Set bidirectional communication flag to true req.setTwoWay(true); // The request variable type here is RpcInvocation req.setData(request); // Create DefaultFuture Object DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // Call the send method of NettyClient to send the request channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } // Return DefaultFuture object return future; }
At this point, you finally see the Request semantics. The above method first defines a Request object, then passes it to the send method of NettyClient for subsequent encoding, serialization, and netty calls.
This is not a detailed analysis, interested students can learn more.
8. Service Providers Accept Calls
8.1
After analyzing the process of remote calls initiated by service consumers, let's take a look at the process of receiving calls by service providers.
As mentioned earlier, by default, Dubbo uses Netty as the underlying communication framework.After Netty detects a data inbound, it first decodes the data through a decoder and passes the decoded data to the specified method of the next inbound processor.
Call the DubboCodec.decodeBody() method to decode and encapsulate the decoded field in Request.
The DecodeableRpcInvocation.decode() method is then called for subsequent decoding.
Once this is done, the call method name, attachment, and call parameters can be resolved, then populated into the DecodeableRpcInvocation, and then the DecodeableRpcInvocation into the Request object.
After the decoder parses the packet into a Request object, NettyHandler's messageReceived method immediately receives the object and continues to pass it down.
This object is passed to NettyServer, MultiMessageHandler, HeartbeatHandler, and AllChannelHandler in turn.
Finally, the object is encapsulated by AllChannelHandler into the Runnable implementation class object, and Runnable is put into the thread pool to perform subsequent call logic.The entire call stack is as follows:
NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent) —> AbstractPeer#received(Channel, Object) —> MultiMessageHandler#received(Channel, Object) —> HeartbeatHandler#received(Channel, Object) —> AllChannelHandler#received(Channel, Object) —> ExecutorService#execute(Runnable)//Follow-up call logic executed by thread pool
8.2,AllChannelHandler
Each method in the call stack is not analyzed here.Here we directly analyze the last call method logic in the call stack.The following:
//AllChannelHandler.java /** Processing request and response messages, where the message variable type may be Request or Response */ @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { // Dispatch request and response messages to the thread pool for processing cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; // If the communication mode is bidirectional, then Server side... ThreadPool is exhausted // Error messages are encapsulated in Response and returned to the consumer of the service. if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); // Return Response object with error information channel.send(response); return; } } throw new ExecutionException(..., " error when process received event .", t); } }
8.3,ChannelEventRunnable
As mentioned above, the request object is encapsulated in the ChannelEventRunnable, which will be the new starting point for the service invocation process.So let's start with ChannelEventRunnable and go down.
//ChannelEventRunnable.java @Override public void run() { // Detect channel state, in which case state = RECEIVED for request or response messages if (state == ChannelState.RECEIVED) { try { // Pass channel and message to ChannelHandler object for subsequent calls handler.received(channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..."); } } // Other message types are handled by switch ing else { switch (state) { case CONNECTED: case DISCONNECTED: // ... case SENT: // ... case CAUGHT: // ... default: logger.warn("unknown state: " + state + ", message is " + message); } }
8.4,DecodeHandler
ChannelEventRunnable is only a transit station. Its run method does not contain specific call logic and is only used to pass parameters to other ChannelHandler objects for processing. The object type is DecodeHandler.
//DecodeHandler.java @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { // Decode Decodeable Interface Implementation Class Objects decode(message); } if (message instanceof Request) { // Decode the data field of Request decode(((Request) message).getData()); } if (message instanceof Response) { // Decode the result field of Request decode(((Response) message).getResult()); } // Perform subsequent logic handler.received(channel, message); }
8.5,HeaderExchangeHandler
DecodeHandler mainly contains some decoding logic.Section 8.1, Analyzing request decoding, states that request decoding can be performed either on an IO thread or in a thread pool, depending on the runtime configuration.
DecodeHandler exists to ensure that request or response objects can be decoded in the thread pool.After decoding is complete, the fully decoded Request object continues to be passed back, and the next stop is HeaderExchangeHandler.
//HeaderExchangeHandler.java public class HeaderExchangeHandler implements ChannelHandlerDelegate { private final ExchangeHandler handler; @Override public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { // Processing Request Objects if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { // Handle Events handlerEvent(channel, request); } // Processing normal requests else { // Two-way communication if (request.isTwoWay()) { // Call the service backwards and get the result Response response = handleRequest(exchangeChannel, request); // Return the result of the call to the service consumer channel.send(response); } // In case of one-way communication, only the specified service can be invoked backwards without returning the result of the call else { handler.received(exchangeChannel, request.getData()); } } } // Processing the response object, the service consumer will execute this logic, which will be analyzed later else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { // telnet correlation, ignore } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // Detect whether the request is legal or not and return a response with a status code of BAD_REQUEST if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); // Set BAD_REQUEST status res.setStatus(Response.BAD_REQUEST); return res; } // Gets the data field value, which is also the RpcInvocation object Object msg = req.getData(); try { //A key!!Continue Calling Down Object result = handler.reply(channel, msg); // Set OK Status Code res.setStatus(Response.OK); // Set Call Result res.setResult(result); } catch (Throwable e) { // Set SERVICE_ERROR if the calling process has an exception to indicate a service-side exception res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; } }
Here we see clearer request and response logic.For two-way communication, the HeaderExchangeHandler calls back first to get the result.The result of the call is then encapsulated in a Response object, which is then returned to the consumer of the service.
If the request is not valid or the call fails, the error information is encapsulated in the Response object and returned to the service consumer.
8.6,DubboProtocol.reply()
Let's move on to the handler.reply(channel, msg) method, where the DubboProtocol.reply() method will eventually be called
//DubboProtocol.java public class DubboProtocol extends AbstractProtocol { public static final String NAME = "dubbo"; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // Get an Invoker instance Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // Callback correlation, ignore } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // Invoker calls to specific services return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: ..."); } // Ignore other methods } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { // Ignore callback and local stub related logic // ... int port = channel.getLocalAddress().getPort(); // Calculate the service key in groupName/serviceName:serviceVersion:port format.For example: // dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880 String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // Find the DubboExporter object corresponding to the serviceKey from the exporterMap. // The service export process stores <serviceKey, DubboExporter>mapping relationships in the exporterMap collection DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) throw new RemotingException(channel, "Not found exported service ..."); // Get the Invoker object and return return exporter.getInvoker(); } // Ignore other methods }
The above logic is used to obtain an Invoker instance corresponding to the specified service and invoke the service logic through Invoker's invoke method.
The invoke method is defined in AbstractProxyInvoker with the following code:
public abstract class AbstractProxyInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { try { // Call doInvoke to make subsequent calls, encapsulate the results of the call into RpcResult, and return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method ..."); } } protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable; }
As mentioned above, doInvoke is an abstract method that needs to be implemented by a specific Invoker instance.The Invoker instance is created at runtime through JavassistProxyFactory with the following logic:
public class JavassistProxyFactory extends AbstractProxyFactory { // Omit other methods @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // Create anonymous class object return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // Call invokeMethod method for subsequent calls return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Wrapper is an abstract class, where invokeMethod is an abstract method.
Dubbo generates implementation classes for Wrapper at runtime through the Javassist framework and implements the invokeMethod method, which ultimately calls specific services based on the call information.
8.7,wrapper.invokeMethod()
In the case of DemoServiceImpl, Javassist generates the following proxy classes for it.
/** Wrapper0 Is generated at runtime and can be decompiled using Arthas */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // Omit other methods public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // Type Conversion demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // Invokes the specified method based on the method name if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); } }
The following is the attribute information debug sees, which gives you a clear view of the structure of invoker, wrapper, and proxy.
At this point, the entire service call process is analyzed.
If it doesn't feel clear, it's recommended Talking about RPC This article is very thoroughly analyzed.
Last
A flow chart of dubbo service references has been compiled for easy understanding.
Reference article:
Dubbo website