dubbo Source Reading Cluster (Fault Handling Strategy)

Overview of dubbo cluster

The entry point of dubbo cluster function is in ReferenceConfig.createProxy method and Protocol.refer method.
In the ReferenceConfig.createProxy method, if a user specifies multiple provider URLs or registry url s, multiple Invokers are created, then the Invokers are encapsulated in Static Directory, and then the static service directory is wrapped into an Invoker with the corresponding Cluster implementation class, each of which corresponds to an Invoker cluster wrapper class, such as Fa. Ilover Cluster Invoker, Failback Cluster Invoker, Failfast Cluster Invoker, Failsafe Cluster Invoker, Forking Cluster Invoker and so on. These Invoker wrapper classes encapsulating cluster logic all inherit from AbstractCluster Invoker Abstract class. This abstract class mainly implements state checking when invoking, parameter setting of Invocation class, load balancing, service provider availability checking and other logic, while the behavior logic after service invocation failure is implemented by subclasses.

AbstractClusterInvoker.invoke

First, we look at this method, which is the Invoker class invocation entry.

@Override
// The main function of this method is to do some pre-work for calls.
// It includes checking status, setting parameters, extracting invoker list from service directory, and obtaining corresponding load balancer according to <method name>.load balance parameter value.
// Finally, call the template method
public Result invoke(final Invocation invocation) throws RpcException {
    // Check if the Invoker has been destroyed
    // Invoker that is no longer available may be destroyed when you listen for registry changes to refresh the Invoker list
    checkWhetherDestroyed();

    // binding attachments into invocation.
    // Binding parameters in RpcContext to invocation
    // Users can pass different parameters to each call through RpcContext
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // List all service providers
    // This method directly calls the list method of the service directory
    List<Invoker<T>> invokers = list(invocation);
    // The corresponding load balancer is obtained according to the load balance parameter value in the url. Random Load Balance is the default load balancer.
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // Add the call id to uniquely identify the call
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // Template method, subclass implementation
    return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker.doInvoke

Let's take the default cluster class FailoverCluster Invoker as an example to analyze the doInvoke method of this class.

// This method mainly implements the logic of retry, which is also the feature of this class, failover function.
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    // Copy a local reference, invokers may change
    List<Invoker<T>> copyInvokers = invokers;
    // Check if the list of providers is empty
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // Gets the retries parameter value of the method invoked, and the number of retries is equal to the value + 1, because the first call is not a retry
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    // Cyclic retry
    // Record the last anomaly
    RpcException le = null; // last exception.
    // Providers that record invocation failures
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    // Record the address of the invoked provider,
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        // Each iteration checks the status, re-lists the available provider Invoker, and checks whether the available Invoker is empty
        // Because these states or provider information can change at any time
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        // Select one from the list of available Invoker s
        // The logic of "sticky" calls and load balancing are considered in the selection logic.
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        // Add to the list that has been invoked
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + methodName
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // For business exceptions thrown directly, the exceptions are thrown directly to users through the dubbo framework
            // Non-business anomalies such as network problems, disconnection and provider offline can be solved through failover and retry mechanisms.
            // The reason why this is thrown directly is that once a business exception occurs, it is not handled by the dubbo framework, and it is meaningless to try again.
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

The logic of this method is still clear, that is, retry, which is the main function of this class, failover, if the call is abnormal, retry the call to other available providers. The implementation of select method is in the abstract class AbstractCluster Invoker.

AbstractClusterInvoker.select

// This method mainly implements the logic of "sticky" calls.
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

    // You can decide whether to enable the "sticky" call feature by setting the sticky parameter value in the url
    // This feature is not enabled by default
    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    // If the cached sticky Invoker is no longer in the available list, it should be removed
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    // If viscous calls are enabled, and viscous calls exist, and the viscous Invoker is not in the Invoker list that has failed to invoke
    // So go straight back to the sticky Invoker.
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }

    // Choose an Invoker according to load balancing strategy
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    // Setting Viscous Invoker
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

This method mainly implements the logic of "sticky" calls.

AbstractClusterInvoker.doSelect

// Choose an Invoker according to load balancing strategy
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    // Choose an Invoker according to load balancing strategy
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // The usability of the selected Invoker should be judged again.
    // For the following situations, you need to select Invoker again
    // 1. Selected Invoker in the list of call failures
    // 2. Set the available check to true and the selected Invoker is not available
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // Re-select Invoker, first exclude the invocation failure list for selection, really can not go to the invocation failure list to see if you can find a "live" provider.
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    // If no new Invoker is re-elected, use the next Invoker directly.
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

The first selection does not consider the invocation failure list, so the selected Invoker may be in the invocation failure list, which needs to be re-selected.

AbstractClusterInvoker.reselect

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // First, try picking a invoker not in `selected`.
    for (Invoker<T> invoker : invokers) {
        if (availablecheck && !invoker.isAvailable()) {
            continue;
        }

        // Exclude Invoker from the list of invocation failures
        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }

    // If there are remaining Invoker s, choose one according to the load balancing logic strategy
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // Just pick an available invoker using loadbalance policy
    // Is not available, can only be found from the invocation failure list to see if there is any available
    // Because the provider that fails to invoke before it is possible to retry becomes available
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first
                    && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    // Choose again
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // There are no available providers, so we have to return to null.
    return null;
}

In fact, we can see from these methods that the author of dubbo is very careful to ensure the success of the call as far as possible.

FailfastClusterInvoker

Quick failure, call only once, throw exception directly after failure. The code is simple, let's not say much.

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                        + " select from all providers " + invokers + " for service " + getInterface().getName()
                        + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion()
                        + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                e.getCause() != null ? e.getCause() : e);
    }
}

FailsafeClusterInvoker

Failure-safe fault-handling strategy, so-called fail-safe refers to logging without throwing exceptions after the call fails.

@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        // Returns an empty result, and the user needs to judge the return result.
        return new RpcResult(); // ignore
    }
}

FailbackClusterInvoker

Recording failed calls after failures and retrying at intervals is a good strategy for notifying class of service calls. The retry interval is fixed to 5 seconds, and the number of retries can be set by parameters. The default is 3.

ForkingClusterInvoker

This strategy is more interesting, each call will start multiple threads running in parallel, who runs out of the results first with whom, this estimate is rarely used, who is so rich, a lot of resources are wasted.
However, this is very much like some speculative execution strategies in distributed computing frameworks. If some tasks run slowly, they will run the same task in other nodes. Whoever runs first will use the results of whoever runs, such as the speculative execution mechanism in spark.

summary

Different cluster packaging classes have different fault handling strategies, default fault transfer, in addition to the commonly used fast failure, failure security, timing retry, merge calls and so on.

Keywords: Java Dubbo Load Balance network Spark

Added by tjhilder on Tue, 14 May 2019 22:13:18 +0300