Chapter 04 core process of Nacos Client service subscription mechanism

Learning is not so utilitarian. The second senior brother will take you to easily read the source code from a higher dimension ~

Speaking of the service subscription mechanism of Nacos, friends who don't know about it may feel very mysterious. This article will let you understand the subscription implementation of Nacos 2.0 client in simple terms. As there are many contents involved, this is the first one in terms of several articles.

Nacos subscription overview

The subscription mechanism of Nacos is described in one sentence: the Nacos client obtains the instance list from the registry every 6 seconds through a scheduled task. When it finds that the instance changes, it publishes the change event, and the subscriber carries out business processing. The update instance of the update instance and the update local cache of the update local cache.

The figure above shows the main flow of the subscription method, involving many contents and complex processing details. Here, just grasp the core part. Let's analyze the above process step by step through the code and flow chart.

From subscription to scheduled task on

The subscription mechanism we are talking about here is actually the quasi real-time perception of service discovery. As we have seen above, when the subscription method is executed, the scheduled task will be triggered to pull the server-side data regularly. Therefore, in essence, the subscription mechanism is a way to realize service discovery, and the comparison method is to directly query the interface.

There are many overloaded subscriptions exposed in NacosNamingService. The purpose of overloading is to let you write fewer parameters. These parameters are handled by Nacos by default. Finally, these overloaded methods will call the following method:

// NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
        throws NacosException {
    if (null == listener) {
        return;
    }
    String clusterString = StringUtils.join(clusters, ",");
    changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
    clientProxy.subscribe(serviceName, groupName, clusterString);
}

We won't talk about the event listening in the method for the time being, but look directly at the subscribe method. Here, the clientProxy type is NamingClientProxyDelegate. This class is instantiated when NacosNamingService is instantiated. It has been mentioned in the previous chapter and will not be repeated.

And clientproxy The subscribe method is implemented in NamingClientProxyDelegate:

// NamingClientProxyDelegate
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
    String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
    String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
    // Get ServiceInfo in cache
    ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
    if (null == result) {
        // If it is null, the subscription logic is processed based on gRPC protocol
        result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
    }
    // Schedule UpdateTask regularly
    serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
    // ServiceInfo local cache processing
    serviceInfoHolder.processServiceInfo(result);
    return result;
}

Does this method look familiar? Yes, in the previous analysis< Nacos Client service discovery >We already talked about it. It seems that different approaches lead to the same goal. Both querying the service list and subscribing call the same method in the end.

In the previous chapter, we talked about other processes. Here we focus on task scheduling:

// ServiceInfoUpdateService
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
    String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    if (futureMap.get(serviceKey) != null) {
        return;
    }
    synchronized (futureMap) {
        if (futureMap.get(serviceKey) != null) {
            return;
        }
        // Build UpdateTask
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
        futureMap.put(serviceKey, future);
    }
}

This method includes building a serviceKey, judging the duplication through the serviceKey, and finally adding an UpdateTask.

The implementation of addTask is to initiate a scheduled task:

// ServiceInfoUpdateService
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

The scheduled task is delayed for 1 second.

This is the end of tracking. There are only two core functions: calling subscription methods and initiating scheduled tasks.

What did the scheduled task do

UpdateTask encapsulates the core business logic of the subscription mechanism. First, let's see what has been done through a flowchart.

With the above flow chart, you can basically clearly understand what UpdateTask does. Directly post all codes of the run method:

public void run() {
    long delayTime = DEFAULT_DELAY;

    try {
        // Judge whether the registered Service is subscribed. If there is no subscription, it will not be executed again
        if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
            NAMING_LOGGER
                    .info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);
            return;
        }

        // Get cached service information
        ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
        if (serviceObj == null) {
            // Obtain Service information from the registry server according to serviceName
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            serviceInfoHolder.processServiceInfo(serviceObj);
            lastRefTime = serviceObj.getLastRefTime();
            return;
        }

        // For expired services (the latest update time of the service is less than or equal to the cache refresh time), re query from the registry
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
            // Processing Service messages
            serviceInfoHolder.processServiceInfo(serviceObj);
        }
        // Refresh update time
        lastRefTime = serviceObj.getLastRefTime();
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // The cache time setting is updated next time. The default is 6 seconds
        // TODO multiple time can be configured.
        delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
        // The number of reset failures is 0
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);
    } finally {
        // The next scheduled refresh time. The next execution time is related to failCount
        // If failCount=0, the next scheduling time is 6 seconds and the maximum is 1 minute
        // That is, when there is no exception, the refresh time of the cache instance is 6 seconds
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

First, judge whether the service has been subscribed. The implementation method is ChangeNotifier#isSubscribed:

public boolean isSubscribed(String groupName, String serviceName, String clusters) {
    String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    return CollectionUtils.isNotEmpty(eventListeners);
}

Looking at the source code of this method, you will find that the listener map here is the EventListener registered by registerListener in the initial subscribe method.

The business processing behind the run method is basically the same. First judge whether there is ServiceInfo information in the cache. If not, query the registry, process ServiceInfo and update the last processing time.

The next step is to judge whether the ServiceInfo is invalid by comparing the "last update time" with the "last update time" in the current ServiceInfo. If it fails, it will also query the registry, process ServiceInfo, update the last processing time, and so on.

The business logic finally calculates the execution time of the next scheduled task and delays the execution through delayTime. delayTime defaults to 1000L * 6, that is, 6 seconds. In finally, the next scheduled task is really initiated. When an exception occurs, the next execution time is related to the number of failures, but the maximum time is no more than 1 minute.

Summary

In this article, we talked about the source code of Nacos client service subscription mechanism, mainly including the following steps:

Step 1: call the subscription method and register the EventListener. Then UpdateTask will be used to judge;

Step 2: handle the subscription logic through the proxy class, which uses the same method as the method of obtaining the instance list;

Step 3: execute the UpdateTask method through a scheduled task. The default execution interval is 6 seconds. When an exception occurs, it will be extended, but not more than 1 minute;

Step 4: the UpdateTask method will compare whether the local cache exists and whether the cache has expired. When it does not exist or expires, query the registry to obtain the latest instance, update the last acquisition time, and process ServiceInfo.

Step 5: recalculate the timing task time and execute the above process in a cycle.

In the next article, we will continue to explain how to process the obtained instance information in the ServiceInfoHolder#processServiceInfo method.

About the blogger: the author of the technical book "inside of SpringBoot technology", loves to study technology and write technical dry goods articles.

The official account: "new horizon of procedures", the official account of bloggers, welcome the attention.

Technical exchange: please contact blogger wechat: zhuan2quan

Keywords: Spring Cloud source code analysis Nacos

Added by rline101 on Fri, 24 Dec 2021 22:22:16 +0200