Cluster in Dubbo, load balancing, fault tolerance, route analysis

The Cluster in Dubbo can disguise multiple service providers as one provider, specifically, disguise multiple invokers in the Directory as one Invoker. The camouflage process includes fault-tolerant processing, load balancing processing and routing processing. This article introduces Cluster related things. First, explain the concepts of fault-tolerant mode, load balancing and routing to the document, and then analyze the processing of the source code. (a little messy, in a bad mood, not suitable for analyzing the source code.)

Fault tolerant mode of cluster

Failover Cluster

This is the default cluster fault tolerance mode in dubbo

  • Fail to switch automatically. In case of failure, retry other servers.
  • It is usually used for read operations, but retries cause longer delays.
  • You can set the number of retries (excluding the first time) through retries="2".

Failfast Cluster

  • For quick failure, only one call is initiated, and an error is reported immediately after failure.
  • It is usually used for non idempotent write operations, such as adding new records.

Failsafe Cluster

  • Fail safe. In case of exception, ignore it directly.
  • It is usually used for operations such as writing audit logs.

Failback Cluster

    • Failure automatic recovery, background record failure request, regular retransmission.
  • Typically used for message notification operations.

Forking Cluster

  • Call multiple servers in parallel, and return as long as one succeeds.
  • It is usually used for read operations with high real-time requirements, but it needs to waste more service resources.
  • You can set the maximum number of parallels by forks="2".

Broadcast Cluster

  • The broadcast calls all providers one by one. If any one reports an error, it will report an error. (support from 2.1.0)
  • It is usually used to notify all providers to update local resource information such as cache or log.

load balancing

dubbo's default load balancing strategy is random, which is called randomly.

Random LoadBalance

  • Random, set random probability by weight.
  • The probability of collision on a section is high, but the larger the adjustment amount is, the more uniform the distribution is, and the weight is relatively uniform after being used according to the probability, which is conducive to the dynamic adjustment of the provider's weight.

RoundRobin LoadBalance

  • Rotation: set the rotation ratio according to the weight after the Convention.
  • There is a problem of slow providers accumulating requests. For example, the second machine is very slow, but it does not hang up. When the request is transferred to the second machine, it is stuck there. Over time, all requests are stuck to the second machine.

LeastActive LoadBalance

  • The minimum number of active calls is the random number of the same active number. The active number refers to the count difference before and after the call.
  • Make slow providers receive fewer requests, because the slower the provider, the greater the count difference before and after the call.

ConsistentHash LoadBalance

  • Consistency Hash: requests with the same parameters are always sent to the same provider.
  • When a provider hangs up, the requests originally sent to the provider are shared among other providers based on the virtual node, without causing drastic changes.
  • By default, only the first parameter Hash is used.
  • 160 virtual nodes are used by default.

Cluster related source code analysis

Recall that in the process of initializing service consumers, the doRefer method is called in the refer method of RegistryProtocol, and the first parameter in the doRefer method is cluster. Refer method of registryprotocol:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    //Get the registry instance according to the url
    //This step connects to the registry and registers the consumer with the registry
    Registry registry = registryFactory.getRegistry(url);
    //Processing of registry services
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    //The following is the service processing of our own defined business
    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    //Services need to merge different implementations
    if (group != null && group.length() > 0 ) {
        if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                || "*".equals( group ) ) {
            return doRefer( getMergeableCluster(), registry, type, url );
        }
    }
    //Here, the parameter cluster is the adaptation class of the cluster, and the code is shown below
    return doRefer(cluster, registry, type, url);
}

Next, take a look at doRefer, and the real method of service reference:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    //Directory is a collection of invokers, which is equivalent to a List
    //In other words, there are multiple invokers stored here, so which one should we call?
    //Which Invoker should be called is handled by the Cluster
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        
 //Register the service registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    
 //When subscribing to the service, the registry will push the service message to the consumer, and the consumer will reference the service again. directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    //Service references and changes are all done asynchronously by Directory
    //There may be more than one Invoker in the Directory
    //Cluster will disguise multiple invokers as one
    //This step is to do this
    return cluster.join(directory);
}

Entrance to cluster processing

The entry is the last step in doRefer: cluster.join(directory);.

First, explain the cluster, which is generated according to dubbo's extension mechanism. There is a setCluster method in RegistryProtocol. According to the extension mechanism, this is the place to inject cluster. The code is as follows:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {

  public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory {
    if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");

    if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();

    String extName = url.getParameter("cluster", "failover");
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");

    com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);

    return extension.join(arg0);
  }
}

You can see that if we do not configure the Cluster policy, the default mode is failover. You can also see @ SPI(FailoverCluster.NAME) on the annotation of the Cluster interface that the default mode is failover.

If you continue to execute the cluster.join method, you will first enter the join method of MockClusterWrapper:

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    //Execute the join method of FailoverCluster first
    //Then, the Directory and the returned Invoker are encapsulated into a MockCluster
    return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
}

Take a look at the join method of Failover:

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    //Directly return an instance of FailoverClusterInvoker
    return new FailoverClusterInvoker<T>(directory);
}

Even if the Invoker is encapsulated here, the returned Invoker is a MockClusterInvoker, which contains a Directory and a FailoverClusterInvoker.

After the Invoker is encapsulated, we create a proxy, and then use the proxy to call our method to be called.

Cluster processing when calling methods

When making specific method calls, the agent will invoke. Invoke (), where the Invoker is the MockClusterInvoker encapsulated above, so first enter the invoke method of MockClusterInvoker:

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    //We don't configure mock, so it's false here
    //Mock is usually used for service degradation
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
    //mock is not used
    if (value.length() == 0 || value.equalsIgnoreCase("false")){
        //The invoker here is failovercluster invoker
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        //mock=force:return+null
        //It means that the consumer's calls to the method directly return null and do not initiate remote calls
        //It can be used to shield the impact on the caller when unimportant services are unavailable
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //mock=fail:return+null
        //It means that after the consumer fails to call the method of the service, null will be returned without throwing exceptions
        //It can be used to ignore the impact on the caller when the unimportant service is unstable
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        }catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            } else {
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}

We do not configure the mock property here. First, enter the invoke method of AbstractClusterInvoker:

public Result invoke(final Invocation invocation) throws RpcException {
    //Check whether it has been destroyed
    checkWheatherDestoried();
    //As you can see, it's time to deal with the problem of load balancing
    LoadBalance loadbalance;
    //Obtain the Invoker list from the Directory according to the information in the invocation
    //Routing will be processed in this step
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        //Use the extension mechanism to load the implementation class of LoadBalance. random is used by default
        //What we get here is RandomLoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    //The asynchronous operation adds the invocation id by default
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //Call the doInvoke method of the specific implementation class. Here is FailoverClusterInvoker
    return doInvoke(invocation, invokers, loadbalance);
}

Take a look at the invoke method of FailoverClusterInvoker:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    //Invoker list
    List<Invoker<T>> copyinvokers = invokers;
    //Confirm that the next Invoker list is not empty
    checkInvokers(copyinvokers, invocation);
    //retry count
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Re select when retrying to avoid that the invoker list has changed when retrying
        //Note: if the list changes, the invoked judgment will fail because the invoker example has changed
        if (i > 0) {
            checkWheatherDestoried();
            copyinvokers = list(invocation);
            //Check it again
            checkInvokers(copyinvokers, invocation);
        }
        //Use loadBalance to select an Invoker to return
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);
        try {
            //Use the selected Invoker to call and return the result
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {. . . } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(. . . );
}

Let's first look at the select method of selecting invoker using loadbalance:

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.size() == 0)
        return null;
    String methodName = invocation == null ? "" : invocation.getMethodName();

    //sticky, which is used for stateful services, enables clients to always make calls to the same provider unless the provider hangs up and connects to another one.
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
    {
        //ignore overloaded method
        if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
            stickyInvoker = null;
        }
        //ignore cucurrent problem
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){
            if (availablecheck && stickyInvoker.isAvailable()){
                return stickyInvoker;
            }
        }
    }
    Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);

    if (sticky){
        stickyInvoker = invoker;
    }
    return invoker;
}

doselect method:

private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.size() == 0)
        return null;
    //There is only one invoker, which is returned directly without processing
    if (invokers.size() == 1)
        return invokers.get(0);
    // If there are only two invoker s, it degenerates into a round robin
    if (invokers.size() == 2 && selected != null && selected.size() > 0) {
        return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
    }
    //Select using loadBalance
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the selected contains (priority judgment) or is unavailable & & availablecheck = true, try again
    if( (selected != null && selected.contains(invoker))
            ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
        try{
            //Reselect
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if(rinvoker != null){
                invoker =  rinvoker;
            }else{
                //Look at the position chosen for the first time. If not the last, choose the + 1 position
                int index = invokers.indexOf(invoker);
                try{
                    //Finally, avoid collision
                    invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
                }catch (Exception e) {. . .  }
            }
        }catch (Throwable t){. . . }
    }
    return invoker;
} 

Next, let's use loadBalance to select. First, enter the select method of AbstractLoadBalance:

 public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers == null || invokers.size() == 0)
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    //  Select the specific subclass implementation. Here is RandomLoadBalance
    return doSelect(invokers, url, invocation);
}

Then go to RandomLoadBalance to check:

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // Total number
    int totalWeight = 0; // Total weight
    boolean sameWeight = true; // Are the weights the same
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight; // Cumulative total weight
        if (sameWeight && i > 0
                && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false; // Calculate whether all weights are the same
        }
    }
    if (totalWeight > 0 && ! sameWeight) {
        // If the weights are different and the weight is greater than 0, the total weight number is used randomly
        int offset = random.nextInt(totalWeight);
        // And determine which segment the random value falls on
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If the weight is the same or the weight is 0, the random is equal
    return invokers.get(random.nextInt(length));
}

The above selection is based on weights and so on. An Invoker returns. Next, the method of reselect is not described. It is first selected from the non selected list, not from the selected list.

After selecting the Invoker, go back to the doInvoke method of FailoverClusterInvoker, then call the invoke method according to the selected Invoker to return the result, and then call the specific Invoker. This part of the resolution has been resolved in the consumer and provider request response process, and will not be repeated.

route

Returning to the invoke method of AbstractClusterInvoker, the next step here is list < Invoker < T > > invokers = list (invocation); Get the Invoker list. Here you also perform routing. See the list method below:

protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
    List<Invoker<T>> invokers = directory.list(invocation);
    return invokers;
}

Next, look at the list method of AbstractDirectory:

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed){
        throw new RpcException("Directory already destroyed .url: "+ getUrl());
    }
    //Implementation of doList in RegistryDirectory
    List<Invoker<T>> invokers = doList(invocation);
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && localRouters.size() > 0) {
        for (Router router: localRouters){
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
                    //Routing
                    //In MockInvokersSelector
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {. . . }
        }
    }
    return invokers;
}

After the route is filtered, load balancing is performed.



Link: https://www.jianshu.com/p/db424fac0fab

Keywords: Java Dubbo Load Balance

Added by The Cat on Fri, 05 Nov 2021 00:54:02 +0200