Analysis of Nacos service discovery source code

1, Nacos service discovery flow chart

I believe you can sort it out after reading the following source code analysis; Refer to: Nacos service registration source code analysis flow chart

2, Find the source code entry

The spring cloud commons package defines a set of service discovery specifications, and the core logic is in the DiscoveryClient interface;

All components that integrate Spring Cloud to realize service discovery will implement the DiscoveryClient interface; The NacosDiscoveryClient class under the Nacos discovery package implements the DiscoveryClient interface.

3, Client service discovery

1. When the nacos client is running, it just does service registration, configuration acquisition and other operations; Does not immediately request service information;
2. Only when the first request is made can the service be obtained, that is, the lazy loading mechanism;

1) First, obtain the service instance information from the local cache serviceInfoMap. If not, call the Nacos server through NamingProxy to obtain the service instance information; Finally, start the scheduled task, request the server to obtain the instance information list every second, and then update the local cache serviceInfoMap;

// NacosDiscoveryClient#getInstances()
public List<ServiceInstance> getInstances(String serviceId) {
    try {
        // Obtain the instance information corresponding to the service through NacosNamingService; Click in
        List<Instance> instances = discoveryProperties.namingServiceInstance()
                .selectInstances(serviceId, true);
        return hostToServiceInstanceList(instances, serviceId);
    } catch (Exception e) {
        throw new RuntimeException(
                "Can not get hosts from nacos server. serviceId: " + serviceId, e);
    }
}

// NacosNamingService#selectInstances()
public List<Instance> selectInstances(String serviceName, boolean healthy) throws NacosException {
    return selectInstances(serviceName, new ArrayList<String>(), healthy);
}
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy)
    throws NacosException {
    // Default subscription mode
    return selectInstances(serviceName, clusters, healthy, true);
}
public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy,
                                      boolean subscribe) throws NacosException {
    // Default query default_ Service instance information under group
    return selectInstances(serviceName, Constants.DEFAULT_GROUP, clusters, healthy, subscribe);
}
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

    ServiceInfo serviceInfo;
    // The default mode is subscription mode, that is, subscribe is TRUE
    if (subscribe) {
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}

The HostReactor#getServiceInfo() method is the real place to get the service instance information:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }

    // 1. Get instance information from the local cache serviceInfoMap
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

    // 2. If there is no in the local cache, go through the HTTP call to get it from the Nacos server
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);

        serviceInfoMap.put(serviceObj.getKey(), serviceObj);

        updatingMap.put(serviceName, new Object());
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);

    } else if (updatingMap.containsKey(serviceName)) {

        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {
                    NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                }
            }
        }
    }

    // 3. Start a scheduled task, get the latest service instance information from the Nacos server every second, and update it to the local cache serveinfomap
    scheduleUpdateIfAbsent(serviceName, clusters);

    // 4. Get the service instance information from the local cache serviceInfoMap
    return serviceInfoMap.get(serviceObj.getKey());
}

1. Get service instance information from local cache:

private ServiceInfo getServiceInfo0(String serviceName, String clusters) {

    String key = ServiceInfo.getKey(serviceName, clusters);

    return serviceInfoMap.get(key);
}

2. Then use HTTP call to obtain service instance information from Nacos server:

public void updateServiceNow(String serviceName, String clusters) {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {

        // Call through the HTTP interface through NamingProxy to obtain the service instance information
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
        if (StringUtils.isNotEmpty(result)) {
            // Update local cache serviceInfoMap
            processServiceJSON(result);
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

3. Start a scheduled task, get the latest service instance information from the Nacos server every second, and update it to the local cache serveinfomap:

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }

    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }

        // Start scheduled task
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

// Timed task execution logic, UpdateTask#run()
public void run() {
    try {
        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

        if (serviceObj == null) {
            updateServiceNow(serviceName, clusters);
            executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
            return;
        }

        if (serviceObj.getLastRefTime() <= lastRefTime) {
            updateServiceNow(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            // if serviceName already updated by push, we should not override it
            // since the push data may be different from pull through force push
            refreshOnly(serviceName, clusters);
        }

        // Start a scheduled task and execute it after 1s
        executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);

        lastRefTime = serviceObj.getLastRefTime();
    } catch (Throwable e) {
        NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
    }

}

Query service instance list:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException {

    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));

    return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}

2) When the HostReactor is instantiated, the PushReceiver will be instantiated, and then a thread loop will be started. The UDP notification after the service instance information in the Nacos server is changed will be monitored through datagram socket #receive().

public class PushReceiver implements Runnable {
    private DatagramSocket udpSocket;

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            udpSocket = new DatagramSocket();
            // Start a thread
            executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });

            executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }

    public void run() {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                // Listen for the notification after the service instance information of the Nacos server is changed
                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\""
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

}

4, Server service discovery

The service discovery of Nacos server mainly does two things:

1. Query the service instance list; First find the Cluster corresponding to the service from the cached serviceMap, and then obtain the full amount of instance information from the two sets of the Cluster: persistentInstances and ephemeralsinstances;
2. Add the ip and udp port numbers transmitted from the client to the clientMap, and then push the service; clientMap belongs to the implementation class NamingSubscriberServiceV1Impl of NamingSubscriberService. Its key is service name and value is the list of clients (ip + port number) that have subscribed to the service.

See the list() method of InstanceController class under naming:

1) Get service instance list

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public Object list(HttpServletRequest request) throws Exception {
    
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    
    Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
            udpPort, clusters);
    // Go to the InstanceOperatorServiceImpl#listInstance() method to get the list of service instances
    return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
}

//InstanceOperatorServiceImpl#listInstance()
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
            boolean healthOnly) throws Exception {
        ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());
        String clientIP = subscriber.getIp();
        ServiceInfo result = new ServiceInfo(serviceName, cluster);
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();
        
        // now try to enable the push
        try {
            // Try to enable the push service udpppushservice, that is, notify the Nacos Client through UDP when the service instance information changes
            if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
                subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),
                        new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,
                        StringUtils.EMPTY);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP,
                    subscriber.getPort(), e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }
        
        if (service == null) {
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.setCacheMillis(cacheMillis);
            return result;
        }

        // Check if the service is disabled
        checkIfDisabled(service);

        // Here is the key code to obtain the service registration information, and obtain all permanent and temporary service instances
        List<com.alibaba.nacos.naming.core.Instance> srvedIps = service
                .srvIPs(Arrays.asList(StringUtils.split(cluster, StringUtils.COMMA)));
        
        // filter ips using selector
        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
            srvedIps = selectorManager.select(service.getSelector(), clientIP, srvedIps);
        }

        // If the service cannot be found, the current service is returned
        if (CollectionUtils.isEmpty(srvedIps)) {
        .......
        return result;
    }

// Service#srvIPs()
public List<Instance> srvIPs(List<String> clusters) {
    if (CollectionUtils.isEmpty(clusters)) {
        clusters = new ArrayList<>();
        clusters.addAll(clusterMap.keySet());
    }
    return allIPs(clusters);
}

// Service#allIPs()
public List<Instance> allIPs(List<String> clusters) {
    List<Instance> result = new ArrayList<>();
    for (String cluster : clusters) {
        // When registering the service, the instance information will be written to the clusterMap, and now it will be retrieved from it
        Cluster clusterObj = clusterMap.get(cluster);
        if (clusterObj == null) {
            continue;
        }

        result.addAll(clusterObj.allIPs());
    }
    return result;
}

// Cluster#allIPs()
public List<Instance> allIPs() {
    List<Instance> allInstances = new ArrayList<>();
    // Get all persistent instances under the service
    allInstances.addAll(persistentInstances);
    // Get all temporary instances under the service
    allInstances.addAll(ephemeralInstances);
    return allInstances;
}

2) UDP is used to push service instances

NamingSubscriberServiceV1Impl#addClient():

public void addClient(String namespaceId, String serviceName, String clusters, String agent,
        InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {

    // Initialize push client instance PushClient
    PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
            app);
    // Add push target client
    addClient(client);
}

// Overload method addClient()
public void addClient(PushClient client) {
    // client is stored by key 'serviceName' because notify event is driven by serviceName change
    // The client is stored by the key 'serviceName' because notification events are driven by serviceName changes
    String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
    ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
    // If the push client corresponding to the ServiceName that the client wants to call cannot be obtained, a new push client will be created and cached
    if (clients == null) {
        clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
        clients = clientMap.get(serviceKey);
    }
    
    PushClient oldClient = clients.get(client.toString());
    // If there is an old PushClient, refresh
    if (oldClient != null) {
        oldClient.refresh();
    } else {
        // Otherwise cache PushClient
        PushClient res = clients.putIfAbsent(client.toString(), client);
        if (res != null) {
            Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res);
        }
        Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
    }
}

5, Summary

client:

1. Get the service instance information from the local cache first;
2. The maintenance task regularly obtains the service instance information from the Nacos server;

Server:

1. Return all permanent and temporary instances in the memory registry under the specified namespace to the client;
2. Start a UDP service instance information change push service;

Keywords: Java Spring Cloud Microservices Nacos

Added by rossh on Mon, 20 Dec 2021 10:49:21 +0200