entrance
stay Last In the built Spring Cloud Client project, Eureka client was successfully started after adding a little configuration to the configuration file. How did the client join Spring?
We know that if we want to register some beans into Spring, but these beans are not under the scanning path of Spring, there are only two schemes left:
- Use @ Configuration or @ Import
- Use the SpringBoot SPI mechanism to configure to spring In the factories file
Find spring cloud Netflix Eureka client Meta-inf / spring.jar Factories file
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\ org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\ org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration
These configuration classes register a series of bean s required by Eureka, which is also the starting point for us to find some important classes.
EurekaDiscoveryClient
The most important thing for the client is to discover services from the server. Spring Cloud defines a top-level interface org. For service discovery springframework. cloud. client. discovery. DiscoveryClient
// Common service discovery operations public interface DiscoveryClient extends Ordered { // Omit Ordered interface method // Description of implementation class String description(); // Query the service instance information list through the service id List<ServiceInstance> getInstances(String serviceId); // Get all service instance IDS List<String> getServices(); }
Eureka also has corresponding implementation classes in spring The configuration class in the factories file is registered in EurekaDiscoveryClientConfiguration
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @ConditionalOnBlockingDiscoveryEnabled public class EurekaDiscoveryClientConfiguration { // Omit other configurations @Bean @ConditionalOnMissingBean public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client, clientConfig); } }
Two other beans eurekaclient and EurekaClientConfig are required to register this class
EurekaClientConfig
// Interface, specifying a default implementation @ImplementedBy(DefaultEurekaClientConfig.class) public interface EurekaClientConfig {...} // Eureka's own implementation class saves the configuration items in the configuration file in this class @ConfigurationProperties(EurekaClientConfigBean.PREFIX) public class EurekaClientConfigBean implements EurekaClientConfig, Ordered { // Prefix of configuration item in configuration file: Eureka client public static final String PREFIX = "eureka.client"; // Omit other }
This class is in spring Registered in EurekaClientAutoConfiguration in factories file
// Omit annotation on class public class EurekaClientAutoConfiguration { // Omit other configurations @Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { return new EurekaClientConfigBean(); } }
EurekaClient
The lowest implementation class of EurekaClient is CloudEurekaClient. This class only sends a heartbeat event when the method onCacheRefreshed is called. Its main work is com netflix. discovery. Discoveryclient completed.
public class DiscoveryClient implements EurekaClient { // Omit other methods and method bodies // Service registration boolean register(); // Service renewal boolean renew(); // Service offline, from Eureka client interface synchronized void shutdown(); // The following is from the LookupService interface // Query the service instance information of the same appName Application getApplication(String appName); // Find all service instance information Applications getApplications(); // Obtain the instance information according to the instance id List<InstanceInfo> getInstancesById(String id); }
Let's focus on DiscoveryClient
DiscoveryClient
Let's look at the constructor first
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // Some initialization work is omitted // Configuration item Eureka client. region logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); // Do not pull & & do not register if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { // Without starting any tasks, DiscoveryClient initialization is complete // Omit code return; } try { // Timer thread pool scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // Timed heartbeat heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // Cache refresh cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // Omit some code } // Pull registry if (clientConfig.shouldFetchRegistry()) { try { // Primary server pull, false: incremental pull boolean primaryFetchRegistryResult = fetchRegistry(false); if (!primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed"); } // Failed to pull from the primary server. Pull from the backup server boolean backupFetchRegistryResult = true; if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { backupFetchRegistryResult = false; logger.info("Initial registry fetch from backup servers failed"); } // Failed twice & & if the registry is forcibly pulled during initialization, the initialization fails // Configuration item: Eureka cletn. should-enforce-registration-at-init if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed."); } } catch (Throwable th) { logger.error("Fetch registry error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // Register with Eureka server & & force registration during initialization if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { // Register yourself with Eureka Server if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // Initialization timing task: Service heartbeat, service pull, service information instance sending initScheduledTasks(); // Omit some code }
Constructor summary:
- Initialization part information
- Pull registry
- Register yourself
- Initialization timer: Service heartbeat, service pull, service instance information sending
Registry: Registry pull
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // Get existing registry Applications applications = getApplications(); if (clientConfig.shouldDisableDelta() // Incremental acquisition disables Eureka clent. Disable Delta, false by default // Eureka is configured client. registry-refresh-single-vip-address || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch // The existing registry is null || (applications == null) // The existing registry is not null but empty || (applications.getRegisteredApplications().size() == 0) // The client does not support incremental pull || (applications.getVersion() == -1)) { // Full acquisition getAndStoreFullRegistry(); } else { // Incremental acquisition getAndUpdateDelta(applications); } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}", appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e)); return false; } finally { if (tracer != null) { tracer.stop(); } } // Send registry refresh event onCacheRefreshed(); // Update the status of this instance to the server updateInstanceRemoteStatus(); // Pull successful return true; } @Override public Applications getApplications() { return localRegionApps.get(); }
getAndStoreFullRegistry: full registry
// Registry version counter to prevent the registry from updating to the old version in the multithreaded environment private final AtomicLong fetchRegistryGeneration; private void getAndStoreFullRegistry() throws Throwable { // Registry current version long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; // eurekaTransport.queryClient: AbstractJerseyEurekaHttpClient // Eureka is configured client. Registry refresh single VIP address, which is taken from the configuration address // remoteRegionsRef.get() takes Eureka client. Fetch remote regions registry configuration item EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) { // Not pulled to registry logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // Pull to the registry and the current version is consistent, then the version + 1 // Filter (only those with UP status) and disrupt the registry localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { // There are other threads that have been updated and later logger.warn("Not updating applications as another thread is updating it already"); } }
AbstractJerseyEurekaHttpClient
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient { @Override public EurekaHttpResponse<Applications> getApplications(String... regions) { return getApplicationsInternal("apps/", regions); } @Override public EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions) { return getApplicationsInternal("vips/" + vipAddress, regions); } @Override public EurekaHttpResponse<Applications> getDelta(String... regions) { return getApplicationsInternal("apps/delta", regions); } private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { // Simplify code // Send GET request, Jersey HTTP GET serviceUrl/urlPath?regions= } }
getAndUpdateDelta: incremental registry
private final Lock fetchRegistryUpdateLock = new ReentrantLock(); private void getAndUpdateDelta(Applications applications) throws Throwable { // current version long currentUpdateGeneration = fetchRegistryGeneration.get(); // Send HTTP request to get incremental information Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } if (delta == null) { // If incremental pull is not allowed on the server, full pull is required getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // Pull to the registry and the current version is consistent, then the version + 1 logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { // Get the lock try { // Update increment updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { // Release lock fetchRegistryUpdateLock.unlock(); } } else { // If you don't get the lock, you will give up this update logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { // If the registration information of the client and the server is inconsistent due to some reasons, pull the full amount again reconcileAndLogDifference(delta, reconcileHashCode); } } else { // There are other threads that have been updated and later logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } } private void updateDelta(Applications delta) { for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { Applications applications = getApplications(); // Omit some code // New service instance if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } // Changed service instance else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } // Deleted service instance else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); // If the instance list is empty, the service will be deleted if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } // Omit the code, set the version, and disrupt the instance order }
Register: register yourself
boolean register() throws Throwable { // Simplify code // instanceInfo current instance information eurekaTransport.registrationClient.register(instanceInfo); } public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient { @Override public EurekaHttpResponse<Void> register(InstanceInfo info) { // Simplify code String urlPath = "apps/" + info.getAppName(); // Send request Jersey HTTP POST // url: serviceUrl/urlPath // body: info, contentType: application/json } }
initScheduledTasks: initializes the timer
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // Registry pull timer // Frequency of pulling registry, Eureka client. Registry fetch interval seconds, default 30s int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, // Pull registry thread new CacheRefreshThread() ); scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { // Heartbeat timer // Heartbeat transmission interval, Eureka instance. Lease renewal interval in seconds: the default is 30s int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, // Heartbeat thread new HeartbeatThread() ); scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS); // The service instance information is refreshed. If there is any change, register with the server instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, // eureka. client. Instance info replication interval seconds, 30s by default clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // Status monitoring. If the status changes, instanceInfoReplicator will be triggered to execute statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { logger.error("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; // eureka. client. On demand update status change, the default is true if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
CacheRefreshThread: pull registry thread
// DiscoveryClient internal class class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } @VisibleForTesting void refreshRegistry() { try { // Omit code, region processing // Pull registry boolean success = fetchRegistry(remoteRegionsModified); } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
HeartbeatThread: heartbeat thread
// DiscoveryClient internal class private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } // Renew the contract boolean renew() { // Simplify some codes EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); // The server returns 404, indicating that there is no such instance, and then re initiate the registration if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { long timestamp = instanceInfo.setIsDirtyWithTime(); // Re register yourself boolean success = register(); return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } } // AbstractJerseyEurekaHttpClient#sendHeartBeat public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { // Simplify code String urlPath = "apps/" + appName + '/' + id; // Send request Jersey HTTP PUT // url: serviceUrl/urlPath // query param: status = info.getStatus() // query param: lastDirtyTimestamp = info.getLastDirtyTimestamp() }
InstanceInfoReplicator: service instance information refresh thread
class InstanceInfoReplicator implements Runnable { public void run() { try { // Refresh service instance information discoveryClient.refreshInstanceInfo(); // If the instance information changes, return the data update time Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // Re initiate registration discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { // Hibernate replicationIntervalSeconds seconds and execute again Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } }
Timer summary
- Pull the registry, and the frequency is determined by Eureka client. Determined by registry fetch interval seconds, the default is 30s
- Heartbeat renewal, frequency by Eureka instance. It is determined by lease renewal interval in seconds. The default is 30s
- The service instance information is refreshed by Eureka client. Determined by instance info replication interval seconds, the default is 30s
- The status listener listens to the event of status change and triggers the execution of service instance information refresh. It belongs to the passive performance of service instance information refresh. Whether it is enabled or not is determined by Eureka client. On demand update status change. The default value is true
shutdown: service offline
@PreDestroy @Override public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); // Delete status listening if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } // Close scheduled task cancelScheduledTasks(); if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { // Send offline status to the server applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } // Omit the code and close something logger.info("Completed shut down of DiscoveryClient"); } } void unregister() { // Simplify code eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); } public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient { @Override public EurekaHttpResponse<Void> cancel(String appName, String id) { // Simplify code String urlPath = "apps/" + appName + '/' + id; // Send request Jersey HTTP DELETE // url: serviceUrl/urlPath } }
summary
This article starts with the constructor of DiscoveryClient and browses the workflow of Eureka client, including pulling the registry, registering itself, heartbeat renewal and other tasks. You can also see the source code of some configuration items and the HTTP interface path to communicate with the server. After reading these source codes, I also have a certain understanding of the implementation principle of Eureka client. If you are interested, you can implement a client in other languages by yourself.