Microservice architecture * 3.5 source code analysis of Nacos service registration and discovery

preface

reference material:
<Spring Microservices in Action>
Principles and practice of Spring Cloud Alibaba microservice
"Spring cloud framework development tutorial in Silicon Valley of station B" Zhou Yang

In order to facilitate understanding and expression, the Nacos console and the Nacos registry are called the Nacos server (that is, the web interface), and the business service we write is called the Nacso client;

The Nacos client registers itself with the Nacos server. 1. How to register a service in the Nacos registry mainly explains how to send information to the Nacos server from the perspective of the Nacos client; 2. Nacos server registration service mainly explains the registration principle from the perspective of Nacos server;

3. Client query all service instances will explain how service consumers obtain all instances of providers from the perspective of service consumers and providers. Both service consumers and providers are clients of Nacos;

4. The client monitors the Nacos server to dynamically obtain service instances. From the perspective of the consumer client, the client monitors the Nacos server to dynamically obtain the changes of the provider;

1. The client registers in the Nacos Registry (client perspective)

1.1 specifications and standards provided by spring cloud

  • To register the service into the Nacos integrated by Spring Cloud, you must first meet the specifications and standards provided by Spring Cloud;
  • There is a class org. In the Spring Cloud common package springframework. cloud. client. serviceregistry. Service registry, which is the standard for service registration provided by Spring Cloud. Components integrated into Spring Cloud to implement service registration will implement the interface;
public interface ServiceRegistry<R extends Registration>{
    void register(R registration); 
    void deregister(R registration); 
    void close(); 
    void setStatus(R registration,String status);
    <T> T getstatus(R registration);
}

1.2 automatic configuration class of Nacos

  • In meta-inf / spring.inf of Spring Cloud commons package Factories contains configuration information for automatic assembly. That is, when Spring Cloud is started, those classes will be automatically injected into the container:

  • AutoServiceRegistrationAutoConfiguration (service registration autoconfiguration class) is a configuration class related to service registration. The source code is as follows:
@Configuration
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
    value = {"spring.cloud.service-registry.auto-registration.enabled"},
    matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
    //Automatic registration class
    @Autowired(required = false)
    private AutoServiceRegistration autoServiceRegistration;
    //Automatically register the configuration file of the class
    @Autowired
    private AutoServiceRegistrationProperties properties;

    public AutoServiceRegistrationAutoConfiguration() {
    }

    //Initialization function
    @PostConstruct
    protected void init() {
        if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
            throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
        }
    }
}
  • The key is to inject an AutoServiceRegistration (service registrar) automatic registration interface, which has an abstract implementation class AbstractAutoServiceRegistration (service registrar abstract class) in Spring Cloud;
  • The registry we want to use should inherit AbstractAutoServiceRegistration (service registrar abstract class);
  • For Nacos, use the Nacos autoserviceregistration (Nacos service registrar) class to inherit the AbstractAutoServiceRegistration (service registrar abstract class) abstract class;

1.3 listen to the service initialization event abstractautoserviceregistration bind()

  • In the above, you should pay attention to the applicationlister interface, which is an event listening mechanism. The interface declaration is as follows:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}
  • AbstractAutoServiceRegistration (service registrar abstract class) uses AbstractAutoServiceRegistration The bind () method implements this interface to listen to WebServerInitializedEvent (service initialization event);
@Override
@SuppressWarnings("deprecation")
//[breakpoint entry]
public void onApplicationEvent(WebServerInitializedEvent event) {
	bind(event);
}
  • We give abstractautoserviceregistration Set a breakpoint in the bind() method and start the service provider. You can find:
    • There is no service registration instance on the Nacos server;
    • Service initialization has been completed;

  • [Conclusion] the process of registering a service with Nacos is to initialize the service first, and then listen for initialization events through the event listening mechanism. When initialization is complete, abstractautoserviceregistration. Is called The bind () method registers the service into the Nacos registry;

1.4 logic for registering service instances: nacosserviceregistry register()

This can explain when the service will send its own information to the Nacos server;

  • We chased AbstractAutoServiceRegistration Bind() method. It is found that nacosserviceregistry is called in AbstractAutoServiceRegistration (abstract class of service registrar) After the register () method, the service instance appears on the Nacos server;

  • We chased into the Nacos service registry Register() method. The logic of the method is as follows:
@Override
public void register(Registration registration) {
    //Determine whether there is a service ID
	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}
	String serviceId = registration.getServiceId();  //Service ID (service provider)
	Instance instance = getNacosInstanceFromRegistration(registration);  //Service instance (with ip, port and other information)
	try {
	    //[breakpoint] registration method
		namingService.registerInstance(serviceId, instance);
		log.info("nacos registry, {} {}:{} register finished", serviceId,
				instance.getIp(), instance.getPort());
	}
	catch (Exception e) {
		log.error("nacos registry, {} register failed...{},", serviceId,
				registration.toString(), e);
	}
}

  • Let's click in namingservice Registerinstance() method (the implementation logic is in NacosNamingService.registerInstance()) to get the specific logic of registration, as follows:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        //Encapsulate service instance information with heartbeat BeatInfo
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
        //[breakpoint 1.4.1] put the beatInfo heartbeat information into the beatReactor heartbeat transmitter (after sending the heartbeat, the Nacos server still has no service instance)
        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
    //[1.4.2] use the naming proxy method to send the service instance information to the service instance in the form of POST request
    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
  • That is, the logic of registering service instances is divided into two steps:

    • Start with beatreactor Addbeatinfo creates heartbeat information to realize health detection;
    • Then send the service instance information in the form of POST request;

1.4.1 heartbeat mechanism beatreactor addBeatInfo()

  • We enter beatreactor The addbeatinfo() method explores the heartbeat mechanism. The source code is as follows:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
    //[core] regularly send a heartbeat packet beatInfo to the server
    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
    //[core]
    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
  • BeatReactor. The addbeatinfo () method mainly does two things:

    • The client calls scheduledexecutorservice The schedule () interface method (implemented by the ScheduledThreadPoolExecutor) executes scheduled tasks and sends a heartbeat packet beatInfo to the server at fixed time in each task cycle;
    • Then through metricsmonitor Getdom2beatsizemonitor() method obtains a heartbeat measurement monitor (actually Gauge) and constantly detects the response from the server. If the response from the server is not received within the set time, it is considered that the server has failed;
  • The Nacos server will constantly update the service status according to the heartbeat packet of the client;

1.4.2 registration service namingproxy registerService()

  • Then enter namingproxy Registerservice() method, the source code is as follows:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
    Map<String, String> params = new HashMap(9);
    params.put("namespaceId", this.namespaceId);
    params.put("serviceName", serviceName);
    params.put("groupName", groupName);
    params.put("clusterName", instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    //[breakpoint entry] after this step is executed, the service instance information appears on the Nacos server
    this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
  • This method first encapsulates the service instance, and then uses namingproxy The reqapi () method pieced together the API of the registration service. NamingProxy. The source code of reqapi () method is as follows:
    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        } else {
            Exception exception = new Exception();
            if (servers != null && !servers.isEmpty()) {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());

                for(int i = 0; i < servers.size(); ++i) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, server, method);
                    } catch (NacosException var11) {
                        exception = var11;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
                    } catch (Exception var12) {
                        exception = var12;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
                    }
                    index = (index + 1) % servers.size();
                }

                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            } else {
                int i = 0;
                while(i < 3) {
                    try {
                        return this.callServer(api, params, this.nacosDomain);
                    } catch (Exception var13) {
                        exception = var13;
                        LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
                        ++i;
                    }
                }
                throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            }
        }
    }

1.5 send registration request through Open API

  • Finally, the following requests will be sent to the Nacos server in the form of Open API:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'

1.6 summary

  • Nacos first loads the configuration file to initialize the service, and uses the ApplicationListener (event listener) listening mechanism to listen for the service initialization events. After the service initialization is completed, enter the Nacos service registry Register() register logic;
  • The registration principle is divided into two steps:
    • First use the Nacos naming service (Nacos naming service) to send the heartbeat;
    • Then use NamingProxy (naming proxy) to send the service instance to the Nacos server in the form of POST request;
  • That is, Nacos Service must ensure that the registered service instance is healthy (heartbeat detection) before service registration;

2. Nacos server registration service (server perspective)

  • For the source code download and startup of Nacos server, see Microservice architecture | 3.2 Alibaba Nacos registryï¼›
  • As mentioned above, the last step for the client to register the server is to send the following Open API request to the server:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'
  • Then the source code analysis of the server will start from this request;

2.1 the server receives the request instancecontroller register()

  • The server defines a register method in the InstanceController (service instance controller) under the Nacos naming module to handle the registration of the client. The source code is as follows:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    //Omit other codes

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //Resolve the namespaceId namespace from the request body (in this case, public)
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //Resolve serviceName service name (default_group @ @ service provider in this case, actually service provider)
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        //[breakpoint] method of registering service instance in server console
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

        //Omit other codes
}

2.2 register the service instance servicemanager in the server console registerInstance()

  • ServiceManager. The source code of the registerinstance () method is as follows:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //[2.3] create an empty service
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    //Add service instance
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
  • This code mainly contains three logical contents:
    • createEmptyService(): creates an empty service (used to display service information in the "service list" of the Nacos server console). In fact, it initializes a serviceMap, which is a concurrent HashMap collection;
    • getService(): get a service object from serviceMap according to namespaceld and serviceName;
    • addInstance(): save the currently registered Service instance to the Service;

2.3 create an empty service servicemanager createEmptyService()

  • We kept chasing it and found that the last call was ServiceManager.. Createserviceifabsent() method. The source code is as follows:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
    //Obtain the service from the cache through the namespace ID and service name. For the first time, there is no service. Enter the if statement to create the service
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //Judge whether the service is valid. If it does not take effect, an exception will be thrown
        service.validate();
        //[breakpoint] add and initialize services
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
  • When registering the service for the first time, it must not be in the cache. We enter the servicemanager in the if statement Putserviceandinit() method;
private void putServiceAndInit(Service service) throws NacosException {
    //[2.3.1] add service to cache
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    //[2.3.2] establish heartbeat mechanism
    service.init();
    //[2.3.3] monitor data consistency and synchronize service data
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

2.3.1 add service to cache service putService()

public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    //Save Service to serviceMap
    serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}

2.3.2 establish heartbeat mechanism init()

public void init() {
    //[breakpoint] check heartbeat regularly
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
  • Enter healthcheckreactor Schedulecheck() method;
public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

  • It is mainly to continuously detect the time when all instances under the current service send heartbeat packets by creating a scheduled task. If it times out, set health to false, indicating that the service is un healthy, and send a service change event;
  • The heartbeat detection mechanism can be summarized in the following figure:

2.3.3 monitoring for data consistency: delegateconsistencyserviceimpl listen()

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    
    // This key will be monitored by two services
    if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
        //Persistent consistency service
        persistentConsistencyService.listen(key, listener);
        //Transient consistency service
        ephemeralConsistencyService.listen(key, listener);
        return;
    }
    
    mapConsistencyService(key).listen(key, listener);
}

2.4 summary

  • The Nacos client sends a service registration request through the Open API. After receiving the request, the server does the following three things:

    • Build a Service object and save it in the ConcurrentHashMap collection;
    • Establish a heartbeat detection mechanism for all instances under the current service using a scheduled task;
    • Synchronize service data based on data consistency protocol;
  • So far, the analysis of service registration principle based on Nacos client and server can come to an end. Here are some supplementary contents;


3. The client queries all service instances

  • When making RPC calls, the Nacos client can be divided into two roles according to different requirements for services, namely, service consumer and service provider;
  • If the service consumer needs to use the provider's services, it must first obtain all registered service provider instances from the Nacos server. Point 3 will discuss how the client (consumer) obtains these instances (providers);

3.1 consumer client sends a request to Nacos

  • Like the client registering the service with the Nacos server, the consumer client wants to query all instances of the provider and also sends a request to the Nacos server;

  • There are also two forms of sending requests to the Nacos server, Open API and SDK. The SDK is finally sent through the Open API:

    • Open API: GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider
    • SDK: List<Instance> selectInstances(String serviceName, boolean healthy)throws NacosException;
  • We use postman to send get 127.0.0.1:8848 / Nacos / V1 / NS / instance / list? Servicename = service provider request to simulate the consumer client to obtain the provider instance;

  • Here, we mainly send more requests, because the Nacos server may cache all provider instance information, and may take it directly from the cache instead of processing the request;

3.2 Nacos server processing request instancecontroller list()

  • Whether the Nacos server handles the controller that obtains all service instances or the InstanceController (service instance controller) under the Nacos naming module;
  • InstanceController (service instance controller) provides an InstanceController The list () interface handles the request:

  • You can see that our breakpoints are also captured;
  • The source code is as follows:
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    
    //Resolve namespace ID
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //Resolve service name
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    //Parse and get a pile of information
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    //[3.2] obtain all information of all services
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}

3.2 get all information of all services instancecontroller doSrvIpxt()

  • InstanceController. The dosrvipxt() method is very long. The author deletes some non key source codes for easy understanding:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    //Omit some non core code
    
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //Obtain the Service instance according to namespaceld and serviceName
    Service service = serviceManager.getService(namespaceId, serviceName);
    List<Instance> srvedIPs;
    
    //Get all instances under the specified Service, and get the instance information of all Service providers from the Service instance based on srvIPs
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    //Filter some ip using selectors
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    
    //Filter unhealthy instances
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    //Traversal completes the encapsulation of JSON strings
    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();
        if (healthyOnly && !entry.getKey()) {
            continue;
        }
        for (Instance instance : ips) {
            //Delete unavailable instances
            if (!instance.isEnabled()) {
                continue;
            }
            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }
            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);
        }
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}
  • The main logic of this method consists of three steps:
    • First, get all service instances;
    • Filter some service instances;
    • Traversal completes the encapsulation of JSON string and returns;

4. The client listens to the Nacos server to dynamically obtain the service instance

  • From the perspective of the consumer client, monitor the Nacos server to dynamically know the changes of the provider;

4.1 client sending request

  • Client services can be called in two ways:
    • void subscribe(String serviceName, EventListener listener) throws NacosExceptionï¼›
    • Public list < instance > selectinstances (string servicename, list < string > clusters, Boolean health, Boolean subscribe), where subscribe is set to true;

4.2 principle of service dynamic perception

  • Nacos client has a HostReactor class, whose function is to realize the dynamic update of services;
  • After the client initiates the event subscription, there is an UpdateTask thread in the HostReactor, which sends a Pull request every 10s to obtain the latest address list of the server;
  • For the server, heartbeat detection is maintained between it and the instance of the service provider. Once the service provider has an exception, it will send a Push message to the Nacos client, that is, the service consumer;
  • After receiving the request, the service consumer parses the message using processServiceJSON provided in HostReactor and updates the local service address list;
  • The principle can be summarized in the following figure:

4.3 Nacos server processing requests

  • The server is the same as point 3 of this chapter, in which the logic of the server is shown in 3.2 Nacos server processing requests;

5. Supplementary contents

5.1 automatic assembly of Dubbo

  • When Spring Cloud Alibaba Dubbo integrates Nacos, the service registration is completed by relying on the automatic assembly mechanism in Dubbo;
  • Meta-inf / spring. Under spring cloud Alibaba Dubbo A configuration class related to service registration is automatically assembled in the factories file, Dubbo service registration non web application autoconfiguration (Dubbo registration autoconfiguration class);

@Configuration
@ConditionalOnNotWebApplication
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(DubboServiceRegistrationAutoConfiguration.class)
@Aspect
public class DubboServiceRegistrationNonWebApplicationAutoConfiguration {

	@Autowired
	private ServiceRegistry serviceRegistry; //The implementation class is NacosServiceRegistry
    ...

	//Listens to ApplicationStartedEvent events, which trigger before invoking the application command after refreshing the context.
	@EventListener(ApplicationStartedEvent.class)
	public void onApplicationStarted() {
		setServerPort();
		//Finally, call nacosserviceregistry The registry method implements service registration
		register();
	}
	private void register() {
		if (registered) {
			return;
		}
		//This is the Nacos service registry registry()
		serviceRegistry.register(registration);
		registered = true;
	}
	...
}
  • NacosServiceRegistry. For details of the principle of registry (), please refer to 1.4 logic of registering service instances NacosServiceRegistry.register();

  • It can be concluded that:

    • When Dubbo integrates Nacos, the service registration depends on Dubbo's automatic assembly mechanism;
    • Dubbo's automatic assembly mechanism relies on Nacos service registry Register();

6. Summary of source code structure diagram

The source code structure diagram is roughly consistent with this directory;

6.1 structure diagram of service registration from the perspective of client

  • AbstractAutoServiceRegistration.bind(): listen for service initialization events;
    • NacosServiceRegistry.register(): register the service instance;
      • NacosNamingService.registerInstance(): register a service instance through the Nacos naming service;
        • BeatReactor.addBeatInfo(): heartbeat mechanism;
          • ScheduledThreadPoolExecutor.schedule(): execute scheduled tasks and send heartbeat packet beatInfo;
          • MetricsMonitor.getDom2BeatSizeMonitor(): start the heartbeat measurement monitor;
        • NamingProxy.registerService(): Send a registration request in the Open API mode;
          • NamingProxy.reqAPI(): piecing together the API of the registration service;
            • NamingProxy.callServer(): call service and send request;

6.2 service registration structure from the perspective of server

  • InstanceController.register(): the server receives the request;
    • ServiceManager.registerInstance(): register the service instance on the server console;
      • ServiceManager.createEmptyService(): create an empty service;
        • ServiceManager.createServiceIfAbsent(): create a service if it is vacant;
          • ServiceManager.putServiceAndInit(): initializes the service;
            • Service.putService(): add the service to the cache;
            • Service.init(): establish heartbeat mechanism;
            • DelegateConsistencyServiceImpl.listen(): monitor data consistency;
      • ServiceManager.addInstance(): adds a service instance;
        • KeyBuilder.buildInstanceListKey(): the service created is the consistency key;
        • DelegateConsistencyServiceImpl.put(): put the key and service instance into the ConsistencyService for consistency maintenance;

6.3 the client queries the structure diagram of all service instances

  • InstanceController.list(): the server receives the request;
    • InstanceController.doSrvIpxt(): obtain all information of all services;
      • ServiceManager.getService(): obtain the service instance according to namespaceld and serviceName;
      • Service.srvIPs(): get all instances under the specified service;
      • JacksonUtils.createEmptyArrayNode(): traverse and complete the encapsulation of JSON string;

last

Newcomer production, if there are mistakes, welcome to point out, thank you very much! Welcome to the official account and share some more everyday things. If you need to reprint, please mark the source!

Keywords: Distribution Spring Cloud Microservices

Added by shaun112 on Thu, 20 Jan 2022 18:46:32 +0200