1, Nacos service discovery flow chart
I believe you can sort it out after reading the following source code analysis; Refer to: Nacos service registration source code analysis flow chart
2, Find the source code entry
The spring cloud commons package defines a set of service discovery specifications, and the core logic is in the DiscoveryClient interface;
All components that integrate Spring Cloud to realize service discovery will implement the DiscoveryClient interface; The NacosDiscoveryClient class under the Nacos discovery package implements the DiscoveryClient interface.
3, Client service discovery
1. When the nacos client is running, it just does service registration, configuration acquisition and other operations; Does not immediately request service information;
2. Only when the first request is made can the service be obtained, that is, the lazy loading mechanism;
1) First, obtain the service instance information from the local cache serviceInfoMap. If not, call the Nacos server through NamingProxy to obtain the service instance information; Finally, start the scheduled task, request the server to obtain the instance information list every second, and then update the local cache serviceInfoMap;
// NacosDiscoveryClient#getInstances() public List<ServiceInstance> getInstances(String serviceId) { try { // Obtain the instance information corresponding to the service through NacosNamingService; Click in List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, true); return hostToServiceInstanceList(instances, serviceId); } catch (Exception e) { throw new RuntimeException( "Can not get hosts from nacos server. serviceId: " + serviceId, e); } } // NacosNamingService#selectInstances() public List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException { return selectInstances(serviceName, new ArrayList<String>(), healthy); } public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy) throws NacosException { // Default subscription mode return selectInstances(serviceName, clusters, healthy, true); } public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { // Default query default_ Service instance information under group return selectInstances(serviceName, Constants.DEFAULT_GROUP, clusters, healthy, subscribe); } public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // The default mode is subscription mode, that is, subscribe is TRUE if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); }
The HostReactor#getServiceInfo() method is the real place to get the service instance information:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 1. Get instance information from the local cache serviceInfoMap ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // 2. If there is no in the local cache, go through the HTTP call to get it from the Nacos server if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 3. Start a scheduled task, get the latest service instance information from the Nacos server every second, and update it to the local cache serveinfomap scheduleUpdateIfAbsent(serviceName, clusters); // 4. Get the service instance information from the local cache serviceInfoMap return serviceInfoMap.get(serviceObj.getKey()); }
1. Get service instance information from local cache:
private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key); }
2. Then use HTTP call to obtain service instance information from Nacos server:
public void updateServiceNow(String serviceName, String clusters) { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // Call through the HTTP interface through NamingProxy to obtain the service instance information String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); if (StringUtils.isNotEmpty(result)) { // Update local cache serviceInfoMap processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
3. Start a scheduled task, get the latest service instance information from the Nacos server every second, and update it to the local cache serveinfomap:
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } // Start scheduled task ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } // Timed task execution logic, UpdateTask#run() public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateServiceNow(serviceName, clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateServiceNow(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } // Start a scheduled task and execute it after 1s executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS); lastRefTime = serviceObj.getLastRefTime(); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } }
Query service instance list:
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET); }
2) When the HostReactor is instantiated, the PushReceiver will be instantiated, and then a thread loop will be started. The UDP notification after the service instance information in the Nacos server is changed will be monitored through datagram socket #receive().
public class PushReceiver implements Runnable { private DatagramSocket udpSocket; public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; udpSocket = new DatagramSocket(); // Start a thread executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); } } public void run() { while (true) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // Listen for the notification after the service instance information of the Nacos server is changed udpSocket.receive(packet); String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JSON.parseObject(json, PushPacket.class); String ack; if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { hostReactor.processServiceJSON(pushPacket.data); // send ack to server ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } else if ("dump".equals(pushPacket.type)) { // dump data to server ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap())) + "\"}"; } else { // do nothing send ack only ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")), ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress())); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while receiving push data", e); } } } }
4, Server service discovery
The service discovery of Nacos server mainly does two things:
1. Query the service instance list; First find the Cluster corresponding to the service from the cached serviceMap, and then obtain the full amount of instance information from the two sets of the Cluster: persistentInstances and ephemeralsinstances;
2. Add the ip and udp port numbers transmitted from the client to the clientMap, and then push the service; clientMap belongs to the implementation class NamingSubscriberServiceV1Impl of NamingSubscriberService. Its key is service name and value is the list of clients (ip + port number) that have subscribed to the service.
See the list() method of InstanceController class under naming:
1) Get service instance list
@GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public Object list(HttpServletRequest request) throws Exception { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY); int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0")); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); String app = WebUtils.optional(request, "app", StringUtils.EMPTY); String env = WebUtils.optional(request, "env", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName, udpPort, clusters); // Go to the InstanceOperatorServiceImpl#listInstance() method to get the list of service instances return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); } //InstanceOperatorServiceImpl#listInstance() public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster, boolean healthOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(subscriber.getAgent()); String clientIP = subscriber.getIp(); ServiceInfo result = new ServiceInfo(serviceName, cluster); Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { // Try to enable the push service udpppushservice, that is, notify the Nacos Client through UDP when the service instance information changes if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) { subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(), new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY, StringUtils.EMPTY); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, subscriber.getPort(), e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.setCacheMillis(cacheMillis); return result; } // Check if the service is disabled checkIfDisabled(service); // Here is the key code to obtain the service registration information, and obtain all permanent and temporary service instances List<com.alibaba.nacos.naming.core.Instance> srvedIps = service .srvIPs(Arrays.asList(StringUtils.split(cluster, StringUtils.COMMA))); // filter ips using selector if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIps = selectorManager.select(service.getSelector(), clientIP, srvedIps); } // If the service cannot be found, the current service is returned if (CollectionUtils.isEmpty(srvedIps)) { ....... return result; } // Service#srvIPs() public List<Instance> srvIPs(List<String> clusters) { if (CollectionUtils.isEmpty(clusters)) { clusters = new ArrayList<>(); clusters.addAll(clusterMap.keySet()); } return allIPs(clusters); } // Service#allIPs() public List<Instance> allIPs(List<String> clusters) { List<Instance> result = new ArrayList<>(); for (String cluster : clusters) { // When registering the service, the instance information will be written to the clusterMap, and now it will be retrieved from it Cluster clusterObj = clusterMap.get(cluster); if (clusterObj == null) { continue; } result.addAll(clusterObj.allIPs()); } return result; } // Cluster#allIPs() public List<Instance> allIPs() { List<Instance> allInstances = new ArrayList<>(); // Get all persistent instances under the service allInstances.addAll(persistentInstances); // Get all temporary instances under the service allInstances.addAll(ephemeralInstances); return allInstances; }
2) UDP is used to push service instances
NamingSubscriberServiceV1Impl#addClient():
public void addClient(String namespaceId, String serviceName, String clusters, String agent, InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) { // Initialize push client instance PushClient PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant, app); // Add push target client addClient(client); } // Overload method addClient() public void addClient(PushClient client) { // client is stored by key 'serviceName' because notify event is driven by serviceName change // The client is stored by the key 'serviceName' because notification events are driven by serviceName changes String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName()); ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey); // If the push client corresponding to the ServiceName that the client wants to call cannot be obtained, a new push client will be created and cached if (clients == null) { clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024)); clients = clientMap.get(serviceKey); } PushClient oldClient = clients.get(client.toString()); // If there is an old PushClient, refresh if (oldClient != null) { oldClient.refresh(); } else { // Otherwise cache PushClient PushClient res = clients.putIfAbsent(client.toString(), client); if (res != null) { Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res); } Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName()); } }
5, Summary
client:
1. Get the service instance information from the local cache first;
2. The maintenance task regularly obtains the service instance information from the Nacos server;
Server:
1. Return all permanent and temporary instances in the memory registry under the specified namespace to the client;
2. Start a UDP service instance information change push service;