Eureka Server source code analysis

Eureka Server provides the following functions to meet the interaction with Eureka Client:

  • Service registration
  • Receive service heartbeat
  • Service rejection
  • Service offline
  • Get cluster
  • Gets the service instance in the registry

1, Eureka Server end basic design

Here, first look at a class structure diagram:

From the above class diagram, we can see that the top-level interfaces are LookupService and LeaseManager. The LookupService interface has been described in the previous article. Its main function is to query the service list (because Eureka Server can also initiate registration with itself). The LeaseManager interface can be seen from its name that it is used for lease management. The so-called lease management refers to the registration, offline, elimination and renewal of services. The interface codes of LookupService and LeaseManager are as follows:

public interface LeaseManager<T> {

    void register(T r, int leaseDuration, boolean isReplication);

    boolean cancel(String appName, String id, boolean isReplication);
    
    boolean renew(String appName, String id, boolean isReplication);

    void evict();
}


public interface LookupService<T> {

    Application getApplication(String appName);

    Applications getApplications();

    List<InstanceInfo> getInstancesById(String id);

    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

The object managed by LeaseManager is Lease, which represents the Lease of service instance information in Eureka Client. It holds the time validity operation of the classes held by its classes. The class held by Lease is InstanceInfo (service instance information), which defines the operation types of Lease, such as registration, update and offline, and defines various operations on the Lease time attribute (the default effective duration of Lease is 90 s).

InstanceRegistry interface is the core interface of Eureka Server registry management. It is mainly used to manage the service instances registered in Eureka Server in memory. It inherits LookupService and LeaseManager, and adds some other functions to make it easier to manage the query of service instance lease and service instance list information. In the AbstractInstanceRegistry abstract class, you can view its concrete implementation of the InstanceRegistry interface.

The PeerAwareInstanceRegistry interface inherits the InstanceRegistry interface. The PeerAwareInstanceRegistry interface mainly adds relevant operation methods for Eureka Server cluster. Its implementation class PeerAwareInstanceRegistryImpl inherits the implementation of AbstractInstanceRegistry. This implementation class is mainly based on the operation on the local registry, The peer node pair synchronous replication operation is added to make the registry information in Eureka Server cluster consistent.

The InstanceRegistry class at the lowest level inherits the PeerAwareInstanceRegistryImpl class, which is mainly used to adapt to the use environment of Spring Cloud.

2, Service registration

When Eureka Client initiates service registration, it will package its own information into InstanceInfo, and then send the InstanceInfo to Eureka Server. When Eureka Server receives the InstanceInfo sent by Eureka Client, it will put it into the local registry to facilitate subsequent Eureka Client service queries. The main implementation code of service registration is in AbstractInstanceRegistry#register, as follows:

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        // Acquire read lock
        read.lock();
        try {
            // Service instance clusters are classified according to appName
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                // This is a more rigorous operation to prevent overwriting the lease information added by other threads when adding a new service instance cluster lease. So the semantics here is if the key value exists
                // Directly return the existing value when the value is; Otherwise, add the key value pair and return null
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // Obtain the lease of the instance according to the instanceId
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
      
                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // If the instance lease already exists, compare the last update timestamp size, and the registration information with the maximum value is valid
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                // If the lease does not exist, it is a new registered instance
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        // Self protection mechanism
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
            }
            // Create a new lease
            Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
            if (existingLease != null) {
                // If the lease exists, inherit the initial test time of the lease when the service is online
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // Save lease
            gMap.put(registrant.getId(), lease);
            // Add to queue
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            // Obtain the set of coverage instance status according to the instance Id. if it exists, you need to update the coverage status of the service instance            
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            // The reddest state of the service instance is obtained according to the override state rule, and the current state of the service instance is set
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            // If the status of the service instance is UP, set the online time of the lease. Only the first setting is valid
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            // Add the recent lease change record queue and identify its status as ADDED
            // This will be used by Eureka Client to incrementally obtain registry information
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            // Set the update time of the service instance
            registrant.setLastUpdatedTimestamp();
            // Setting the response cache expiration means that it will be used by Eureka Client to obtain full registry information
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            // Release lock
            read.unlock();
        }
    }

The registry set in the above code registry.get(registrant.getAppName()) is a thread safe ConcurrentHashMap. Its key stores appName and its value stores map < string, Lease < InstanceInfo > >. It can be seen that value is the map set and the value of the map set stores Lease, What is managed in the Lease object from the code is InstanceInfo (service instance). During service registration, a lock will be obtained first to prevent other threads from operating the registry data again, so as to avoid data inconsistency. Then query the corresponding registry from the collection according to appName. If it does not exist, perform relevant operations (see code comments). When it exists, the corresponding service instance information is obtained according to the service instance Id,. If the Lease exists, it will compare the last update time of InstanceInfo in the two leases, LastDirtyTimestamp, and save the information of the service instance with a large timestamp. If it does not exist, it means that it is a new service registration, and self-protection statistics will be performed. Then, a new Lease will be created, and the results will be stored in the registry.

After that, a series of caching operations will be performed, and the state of the service instance will be set according to the override state rule. The caching operations here include adding InstanceInfo to the recentlyChangedQueue queue and responseCache used to count the incremental access to registry information by Eureka Client. Finally, set the online time of the service instance lease to calculate the effective time of the lease, and then release the read lock to complete the service registration operation.

3, Service renewal

After the service registration is completed, you also need to regularly send heartbeat requests to Eureka Server (30 s by default), so as to maintain the validity of your lease in Eureka Server.

About the core code of Eureka Server for processing heartbeat, in the AbstractInstanceRegistry#renew method, the input parameters of this method include service name and service instance Id, and the code is as follows:

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // Get the collection information of the service instance according to the appName
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
     // If the lease does not exist, return false directly
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // According to the coverage state rule, the final state of service instance information is obtained
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                // If the status of the obtained service instance is UNKNOWN, the lease is cancelled
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            // When the status information of the service instance does not contain the status information of the overlay instance, the status information of its service instance needs to be set
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                overriddenInstanceStatus.name(),
                                instanceInfo.getId());
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        // Count the number of lease renewals per minute for self-protection mechanism
        renewsLastMin.increment();
        // Update the effective time in the lease
        leaseToRenew.renew();
        return true;
    }
}

4, Service rejection

After the Eureka Client service is registered, it is neither renewed nor offline (caused by service crash or network exception), so the service status is unknown. At this time, the data of this service instance is not practical, so it needs to be cleaned up. The code for eliminating service operations is in AbstractInstanceRegistry#evict, This method also batch processes all expired leases, with the following code:

@Override
public void evict() {
    evict(0l);
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    // It is related to self-protection. If this state occurs, it does not need to be eliminated
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    // Traverse the registry collection to get all expired leases at once
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // Calculate the maximum number of leases allowed to be dropped and get the total number of registry leases
    int registrySize = (int) getLocalRegistrySize();
    // Calculate the threshold of registry lease, which is related to self-protection
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    // Calculate the number of leases excluded
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
        // Random elimination one by one
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // Eliminate one by one
            internalCancel(appName, id, false);
        }
    }
}

The service culling operation will facilitate the registry, find the leases to be culled, and then calculate the maximum allowable number of culled leases according to the threshold of renewal percentage in the configuration file and the total number of leases in the current Registry (the total number of leases in the current registry minus the threshold of the current group side lease), and then cull the expired service instance leases in batches, This operation is in the AbstractInstanceRegistry#internalCancel method.

There are many restrictions in the AbstractInstanceRegistry#evict method of service rejection, all of which are to ensure the availability of Eureka Server:

  • Service culling cannot be performed during self-protection period
  • Batch execution of expired operations
  • Service culling is randomly culled one by one. The culling is evenly distributed in all applications to prevent all expired services in the same service cluster from being culled at the same time, so that when a large number of culling occurs, the program crashes before self-protection

In fact, service culling is also a scheduled task. An evaluationtask is defined in the AbstractInstanceRegistry class to perform service culling regularly (once every 60S by default). This scheduled task is generally performed after the initialization of AbstractInstanceRegistry class, and its execution frequency is set according to evaluationintervaltimerinms, To periodically clear expired service instance leases.

The self-protection mechanism is mainly used when there is network partition between Eureka Server and Eureka Client, and can be implemented on both server and client. Join. Under certain circumstances (network error), Eureka Client and Eureka Server cannot communicate, so Eureka Client cannot initiate service registration and renewal requests to Eureka Server. In this way, Eureka Server may be rejected due to a large number of expired service instance leases, but Eureka Client may still be in a healthy state, In this way, it is unreasonable to directly eliminate the service instance lease. Therefore, Eureka has designed a self-protection mechanism. On Eureka Server side, if a large number of service removal times occur, the self-protection mechanism will be triggered to protect the service instances in the registry from being removed. After the communication is stable, exit this mode. When the Eureka Client fails to register with Eureka Server, it will quickly timeout and try to communicate with other Eureka servers. Therefore, the design of "self-protection mechanism" greatly improves the availability of Eureka.

5, Service offline

When the application is destroyed, Eureka Client will send a service offline request to Eureka Server, and Eureka Server will know the corresponding application lease in the service instance registry to avoid invalid service calls. In the process of service elimination, the expired lease of a single service instance is cleared through service offline.

The main code of service offline is in the AbstractInstanceRegistry#cancel method. The service instance name and service instance Id are passed here to complete the service offline requirements. The code is as follows:

@Override
public boolean cancel(String appName, String id, boolean isReplication) {
    return internalCancel(appName, id, isReplication);
}

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    // Obtain a read lock to prevent other threads from modifying
    read.lock();
    try {
        CANCEL.increment(isReplication);
        // Obtain the corresponding service instance cluster according to appName
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        // Remove service instance lease
        if (gMap != null) {
            leaseToCancel = gMap.remove(id);
        }
        // Add the service instance information to the nearest offline service instance statistics queue
        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        // If the lease does not exist, return false directly
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            // Set the lease downline time
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                // Add the record queue of the latest lease change, and identify that its service instance status is DELETED
                // Apply to Eureka Client to get registry information incrementally
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // Set response cache expiration
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
        }
    } finally {
        read.unlock();
    }

    synchronized (lock) {
        // Judge whether the number of updates expected to be sent is greater than 0
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            // Since the client wants to cancel it, reduce the number of clients to send renews.
            // Calculate the number of updates expected to be sent
            this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
            // Calculate threshold per minute
            updateRenewsPerMinThreshold();
        }
    }
    
    // Offline successful
    return true;
}

6, Cluster synchronization

When Eureka Server is deployed through cluster, in order to maintain the consistency of Eureka Server registry data, a synchronization mechanism is necessary to synchronize the consistency of registry information in Eureka Server cluster. Eureka Server synchronization consists of two parts:

  • During startup, Eureka Server pulls the registry information from its peer and synchronizes these service instance information to the local registry
  • Eureka Server synchronizes the local registry information to its peer node every time it operates the local registry

1. Eureka Server initializes local registry information

During the startup of Eureka Server (see EurekaServerBootstrap#initEurekaServerContext method), it will pull the registry information from its peer node and synchronize it to the local registry. It is mainly implemented in PeerAwareInstanceRegistryImpl#syncUp method. The code is as follows:

@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    // Copy the entire registry from adjacent nodes
    int count = 0;
    // If not, the thread waits
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // Get all service instances
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    // Judge whether it can be registered. It is mainly used in AWS environment. If it is deployed in other environments, it directly returns true
                    if (isRegisterable(instance)) {
                        // Register in its own registry
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

Eureka Server itself is also an Eureka Client. When starting, it will also initialize the DiscoveryClient and pull the corresponding Eureka Server registry information in full. When Eureka Server cluster is deployed, Eureka Server will also pull the registry information from its peer node, then traverse the entire Applications, and register all service instances in its own registry through the AbstractInstanceRegistry#register method.

When initializing the local registry, Eureka Server will not accept all requests from Eureka Client (such as registration and obtaining registry information). After information synchronization, the Server will be allowed to accept requests through the InstanceRegistry#openForTraffic method. The code is as follows:

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // Initialize statistical parameters of self-protection mechanism
    this.expectedNumberOfClientsSendingRenews = count;
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    // If the number of synchronized instances is 0, the Client will be denied access to registry information for a period of time
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    // Judge whether it operates in AWS environment, Cihu ignores it
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // Modify the status of the service instance to be healthy online and accept the request
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

2. Synchronous replication of registry information between Eureka servers

In order to ensure the consistency of registry data during the running of Eureka cluster, each Eureka Server needs to synchronize to its peer node when managing its own registry. In the registration, renewal and offline methods of PeerAwareInstanceRegistryImpl, operations on peer nodes are added to maintain the consistency of registry information in the Server cluster. The code is as follows:

@Override
public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        // Synchronous offline status
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);

        return true;
    }
    return false;
}

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    // Synchronous registration status
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        // Synchronous renewal status
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}

From this, we can see that we only need to focus on the PeerAwareInstanceRegistryImpl#replicateToPeers method, which will traverse all the information in the peer node of Eureka Server, and then send a synchronization request to each node. The code is as follows:

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // If the peer cluster is empty or does not need to be replicated, it will not be replicated to prevent an endless loop
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // Send synchronization requests to each peer in the peer cluster
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            // If you are yourself, you do not need to synchronize replication
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // Call different synchronization requests according to the action
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

Peer eurekanode here represents an Eureka Server that can share data synchronously. Peereeurekanode has many operations for synchronizing registration information with peer nodes, such as register, cancel, heartbeat and status Update. PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers are implemented as follows:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                 // Synchronous offline 
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // MF liveTalk 
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                // Synchronous registration
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // Synchronization status update
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // Synchronously delete status overrides
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    } finally {
        CurrentRequestVersion.remove();
    }
}

Each synchronous replication operation in peereeurekanode is performed through task flow. The same operation of the same service instance in the same time period will use the same task number. When performing synchronous replication, operations will be merged according to the task number to reduce the number of synchronous operations and network consumption, However, it also causes the delay of synchronous replication, which does not meet the C (strong consistency) in CAP.

Keywords: Java eureka Cloud Native

Added by akdrmeb on Sat, 20 Nov 2021 14:17:27 +0200