God, updating a small registry is so complicated? [hand torn eureka source code NO.2]

Through the previous study, we know that Eureka client actively reports its communication address to Eureka server. This process is completed by calling the service registration interface.

Now Eureka server can get the communication addresses of all Eureka clients, but how does Eureka client get the communication addresses of other Eureka clients? This article is announced.

How is Eureka client initialized

Let's take a look at what operations are done during Eureka client initialization. I don't know if you remember. In the last article, we talked about a step in the initialization process of Eureka server, which is to create Eureka client object.

Why does the Eureka client object need to be created during the initialization of Eureka server?

If our Eureka server is deployed on only one server and only one Eureka server is deployed, of course, we do not need to create Eureka client. But once the Eureka server needs to be deployed as a cluster, the situation becomes complicated.

Because eureka servers need to register with each other in the cluster, each eureka server is also eureka's client.

Here is just a brief mention. The specific content will be supplemented later when explaining the knowledge related to Eureka server cluster. Here, we just use the startup of Eureka server to view the initialization logic of Eureka client.

First post a short piece of code to see if you have a general impression in your mind. Then draw a flow chart to explain it.

//Create eurekaClient object
if (eurekaClient == null) {
    //Read the configuration in eureka-client.properties
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
        ? new CloudInstanceConfig()
        : new MyDataCenterInstanceConfig();

    //Construct Manager objects through instanceConfig and InstanceInfo
    applicationInfoManager = new ApplicationInfoManager(
        instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    //Create eurekaClient
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
}

flow chart:

As mentioned in the previous chapter of InstanceInfo, it contains the ip, port number, service name, instance id and other information of the service. During service registration, Eureka client will send the information in InstanceInfo to Eureka server.

Now you know where InstanceInfo was created: Based on our own configuration file + some default configurations.

How does Eureka client get the registry

A DiscoveryClient object was created above. In the process of creating this object, Eureka client initiated an http request and requested all registry information from Eureka server.

According to the old rule, the previous source code diagram created by DiscoveryClient.

The main logic of pulling the registry is posted in the getAndStoreFullRegistry() method (part of the code is omitted)

private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
        Applications apps = null;
    	//Build http request and initiate request
        EurekaHttpResponse<Applications> httpResponse = slightly...;
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //From the request result, get the list of services
            apps = httpResponse.getEntity();
        }
        if (apps == null) {
            logger.error("slightly...");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //Save service list
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("slightly...");
        }
    }

The list of services is saved in atomicreference < Applications > localregionapps. We will look at the contents of applications through the breakpoint later.

Before that, let's take a look at the logical processing that Eureka server does after receiving the client http request? You might think: why is it difficult? Just return the registry data directly? Hold on, let's take a look at the source code and verify that your guess is not correct.

How does Eureka server return to the registry

Eureka server receives the code of http request. In ApplicationsResource.java under Eureka core project, the directory structure is as follows:

The method to return the service list is getcontainers (parameter omitted). Let's take a look at the main logic inside the method through a code diagram.

In the processing logic of Eureka server, a set of multi-level caching mechanism is used. When returning the service list, it does not directly return to the registry, but first reads from the read-only cache. If there is no cached data, it is fetched from the read-write cache. If there is no read-write cache, it is read from the registry.

Read only cache: concurrentmap < key, value > readonlycahemap

Read / write cache: loadingcache < key, value > readwritecachemap, based on com.google.common.cache.LoadingCache.

The cache reading process is shown in the figure below:

Since caching is used here, a new problem is introduced. When will the cached data expire?

1. When creating readWriteCacheMap, 180s automatic expiration is specified. (timed expiration)

public class ResponseCacheImpl implements ResponseCache {
	private final LoadingCache<Key, Value> readWriteCacheMap;
    //Read write cache creation
    this.readWriteCacheMap =
        CacheBuilder.newBuilder().initialCapacity(1000)
        //Read the cache expiration time from the configuration. The specified time unit is - seconds
        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
}

Time configuration is in DefaultEurekaServerConfig.java:

public class DefaultEurekaServerConfig implements EurekaServerConfig {
	@Override
    public long getResponseCacheAutoExpirationInSeconds() {
        //Default configuration 180
        return configInstance.getIntProperty(
                namespace + "responseCacheAutoExpirationInSeconds", 180).get();
    }
}

2. When a new service is registered, the cache will expire. (active expiration)

In the previous content, when we explained the service registration, we actually called the register() method of AbstractInstanceRegistry.java in Eureka server. At that time, we ignored some details, including the short section of expired cache. Now let's take a look.

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
	public void register(slightly...) {
        //Call the internal invalidecache() method
		invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), 
                        registrant.getSecureVipAddress());
	}
    
    private void invalidateCache(slightly...) {
        // Call the method of expired cache in ResponseCacheImpl.java
        responseCache.invalidate(appName, vipAddress, secureVipAddress);
    }
}

As can be seen from invalidecache (), the invalide () method defined internally in ResponseCacheImpl.java is actually used.

public void invalidate(Key... keys) {
    for (Key key : keys) {
        //Clear the cache by using the self-contained clearing method of the cache framework
        readWriteCacheMap.invalidate(key);
        //Slightly
    }
}

3. When responsecacheimpl.java is created, a scheduled task is started, and the cache will expire every 30 seconds. (passive expiration)

public class ResponseCacheImpl implements ResponseCache {
	//Construction method
    ResponseCacheImpl(slightly...){
        //Whether to start read-only cache. The default value is true
        if (shouldUseReadOnlyResponseCache) {
            //Specific tasks to perform: getCacheUpdateTask()
            timer.schedule(getCacheUpdateTask(),
                           //How many milliseconds after the first execution of this task
                           new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs),
                           //How many seconds later, repeat this task
                           //config: "responseCacheUpdateIntervalMs", (30 * 1000)
                        responseCacheUpdateIntervalMs);
            }
    }
    
    //Specific cache processing logic
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                //Traverse all caches in readOnlyCacheMap
                for (Key key : readOnlyCacheMap.keySet()) {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    //If the data in the read-only cache is inconsistent with the data in the read-write cache, overwrite the read-only cache with the read-write cache data
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                }
            }
        };
    }
}

A chart summarizes:

Finally, let's take a look at what the registry data returned by Eureka server looks like.

About what Applications looks like.

So far, we have learned half of the mechanism of Eureka client to obtain the registry.

Why half? Hold your breath and keep looking back.

Now Eureka client can get the registry. It seems that there is no problem, but as long as we think more, we will find the problem. For example, once a new service is registered to Eureka server, how can Eureka client that has previously obtained the registry synchronize the new data?

How does Eureka client update the registry

In order to obtain the changed registry of Eureka server, a scheduled task is started during the initialization of Eureka client, and the changed registry data is requested from Eureka server every 30 seconds.

In the code diagram at the beginning of this article, we combed the creation process of discoverClient. One of them is initScheduledTasks(), which is the place to initialize scheduled tasks. At that time, let's ignore it for now.

initScheduledTasks() contains multiple scheduled tasks. For the time being, we only focus on the scheduled tasks that refresh the registry. The rest is still used later. We'll see it later.

The main logic codes are as follows:

@Singleton
public class DiscoveryClient implements EurekaClient {
	private final ScheduledExecutorService scheduler;
    private void initScheduledTasks() {
        //Slightly
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                //How often to execute scheduled tasks? 30 by default (obtained from the default configuration)
                registryFetchIntervalSeconds,
                //Unit of interval time
                TimeUnit.SECONDS,
                expBackOffBound,
                //Specific task logic
                new CacheRefreshThread()
            ),registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
}

Here, let's take a look at where the default configuration of Eureka client is defined. It is defined in DefaultEurekaClientConfig.java:

public class DefaultEurekaClientConfig implements EurekaClientConfig {
    @Override
    public int getRegistryFetchIntervalSeconds() {
        //Default configuration: how long is the timer interval
        return configInstance.getIntProperty(
                namespace + REGISTRY_REFRESH_INTERVAL_KEY, 30).get();
        //Configured key definition: PropertyBasedClientConfigConstants.java
        //String REGISTRY_REFRESH_INTERVAL_KEY = "client.refresh.interval";
    }
}

After understanding the default configuration, let's pull back and continue to look at the scheduled task of refreshing the registry.

Because the method call inside the code is too complex, the source code is not pasted here, and the code diagram is used instead.

The main method is getAndUpdateDelta(). Post the source code and have a look:

private void getAndUpdateDelta(Applications applications)
    Applications delta = null;
	//Send request to get incremental registry
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.
        getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
	//If the returned data is empty, pull all registries again
	if (delta == null) {
        getAndStoreFullRegistry();
    }else if(slightly...){
        //Merge the obtained registry data with the local registry
        updateDelta(delta);
        //Generate a HashCode from the merged results (to be analyzed later here)
        String reconcileHashCode = getReconcileHashCode(applications);
        //Compare the HashCode returned by the http request with the newly generated HashCode
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()){
            //If the comparison results are inconsistent, pull the full registry again
            reconcileAndLogDifference(delta, reconcileHashCode);  
        }
    }else{
        //log.err(..);
    }

)

From the source code, we can see that after Eureka client obtains the data, it performs a local process of merging and verifying the data. Let's take a look at this one.

Registry data merging: after Eureka client obtains the registry data, it will judge the change type of the service instance according to an ActionType, that is, judge whether the service instance needs to be added to the local registry, deleted from the local registry, or updated with the information of a local service instance.

Take a look at the source code and understand it later through the flow chart:

private void updateDelta(Applications delta) {
    //Traverse the obtained registry information
	for (Application app : delta.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (ActionType.ADDED.equals(instance.getActionType())) {
                  //newly added
                 applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                }else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                  //modify
                 applications.***.addInstance(instance);
                }else if (ActionType.DELETED.equals(instance.getActionType())) {
                  //delete
                 applications.***.removeInstance(instance);
                }
            }
    }
}

**Data verification: * * after Eureka server updates the local registry, it will hash the local registry information to obtain a hash value. At the same time, Eureka server also carries a hash value when returning data.

Theoretically, after a round of update, the registry data in Eureka server and Eureka client are completely consistent, so the hash calculation results should be the same.

If it is different, it indicates that there is a problem in data synchronization between Eureka client and Eureka server. At this time, Eureka client will request all registry data from Eureka server again. Then, the newly obtained data is overwritten with the local registry data to ensure the consistency between the data of yourself and Eureka server.

Understand the consolidation and data verification process in combination with the flow chart:

Above, we have sorted out how the entire Eureka client sends http requests regularly and how the acquired data is merged and verified.

Next, let's look at how the Eureka server interface is handled? Do you still return data through multi-level caching mechanism?

How is Eureka server incremental registry data maintained

For incremental registry data, Eureka server still returns through the multi-level cache mechanism. However, because the registry information is constantly changing, Eureka server will not return all the registry information repeatedly. Here, Eureka server records the changed part of the registry information with the help of queue (queue).

Whenever a service is registered or actively offline, the changed registry information is sent to a recentlyChangedQueue. At the same time, when this part of data is needed, the data in the queue can be obtained directly.

Cooperate with the flow chart to understand:

recentlyChangedQueue is defined in AbstractInstanceRegistry.java. If you are still impressed, you can remember that the ConcurrentHashMap representing the registry is also defined here.

Let's take a brief look at what the recentlyChangedQueue looks like:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    //Variable definition
    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue 
        = new ConcurrentLinkedQueue<RecentlyChangedItem>();
    //Data type stored in the queue (an internal class)
    private static final class RecentlyChangedItem {
        private long lastUpdateTime;
        private Lease<InstanceInfo> leaseInfo;
    }
}

Lease and InstanceInfo represent lease information and service instance information respectively, as mentioned earlier.

Next, let's look at the logic for readWriteCacheMap to retrieve data from the Queue.

public class ResponseCacheImpl implements ResponseCache {
     private final AbstractInstanceRegistry registry;
     //If readWriteCacheMap does not get data according to the specified key, this method is executed
     private Value generatePayload(Key key) {
         //Obtain all registry data (full data)
         if (ALL_APPS.equals(key.getName())) {slightly..}
         //Get changed registry data (incremental data)
         else if(ALL_APPS_DELTA.equals(key.getName())){
             payload = getPayLoad(key,
             //Calling AbstractInstanceRegistry.java
             //getApplicationDeltasFromMultipleRegions() method gets the queue data
             registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
         } 
     }
}

Let's think about what new problems will arise from using queues to store data here? As time goes by, the data in recentlyChangedQueue will continue to increase, which leads to repeated data when Eureka client acquires data regularly for many times.

It is absolutely unnecessary to repeatedly obtain the data that has been obtained before. After Eureka client obtains the incremental registry data, it also needs to do some merging and verification locally. With the increase of data, the efficiency of the whole work path of network transmission, merging data and verifying data will be reduced.

Therefore, Eureka server starts a scheduled task to clean up the data in the recentlyChangedQueue every 30 seconds to ensure that the data in the Queue is a service instance that changes within 180 seconds.

Flow chart of scheduled task clearing queue data:

Paste a short paragraph to simplify the source code:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    //Construction method
    protected AbstractInstanceRegistry(slightly..){
        //Start scheduled task
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                //Default configuration 30s
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    }
    
    //Timed task execution logic
    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {
            @Override
            public void run() {
                //Traverse the data in the queue
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    //Delete the service that has not changed in 180s from the queue
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - 
                        //Default 180s
                        serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }
        };
    }
}

So far, we have fully understood the core process of service registration and service discovery. Next, let's take a look at how eureka's heartbeat mechanism is implemented.

Keywords: Java Spring Cloud eureka

Added by xexmple on Wed, 17 Nov 2021 07:14:46 +0200