Dubbo's first glimpse of Workflow

flow chart

  • When the service provider starts, it writes its own metadata information to the registry and subscribes to the configuration metadata information.
  • When consumers start, they will also write their own metadata information to the registry and subscribe to service provider, routing and configuration metadata information.
  • If monitor is configured, it will subscribe to all consumers, service providers, routing and configuration metadata information at the same time.
  • When the service provider changes (such as leaving or adding a new service provider), the registration center will notify the consumer and the service governance center of the change information.
  • When a consumer initiates a service call, it will 0 report the call and statistical information to the monitoring center. (monitor)

Service provider registration service

When the service starts, first initialize the service instance, then use the Proxy component call Protocol to encapsulate the interface to be exposed at the service end into an Invoker, and then convert it into an Exporter. At this time, the framework will open the service port and record the service instance into memory, and then register the service metadata to the Registry through Registry.

  • Proxy: service proxy layer. In using Dubbo, we found that neither the server nor the consumer need to expose and listen to the address port and subscribe to the corresponding address port like writing a socket. This is due to the fact that proxy helps us generate a layer of proxy through pre agreed configuration rules.
  • Invoker: an executable that allows invoking to Dubbo. It may be a local interface implementation, a remote implementation, or a cluster implementation.
  • Protocol: remote call layer. Encapsulate the specific process of RPC call. Protocol is the main function entry of Invoker exposure (publishing a service so that others can call) and reference (referring to a remote service to the local). It is responsible for managing the whole life cycle of Invoker.
  • Exporter: it is used for objects exposed to the registry. Its internal properties hold the Invoker object. We can think that it packages a layer on the Invoker.
  • Registry: This module registers the Exporter with the registry.
private void doExportUrls() {
	// Get registry instance
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            // If there are multiple protocols (dubbo, rest), they will be exposed in turn
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

doExportUrlsFor1Protocol source code, only some of which are listed here:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);
		// Read other information into the map to facilitate the later construction of the url
        appendRuntimeParameters(map);
        appendParameters(map, metrics);
        /**
          In various cases, the parameter is added. For example, if a method is specified to call, the method name and other information need to be added to the map
          ...
        **/
        // export service
        // Get address and port
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
	// Determine whether the url is specified in the configuration
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url); // Local service exposure
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (!isOnlyInJvm() && logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        // If the monitoring address is not empty, relevant information will be reported
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
  					// It is converted into invoker through dynamic proxy. The registryurl stores the I address of the registration center, and uses export as the key to add service metadata information
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
 					// Expose services and register service information with the registry|
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
               // Deal with the situation of no registration center and direct exposure
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }

Service consumer subscription service

  • When starting, consumers need to generate an Invoker by holding a remote service instance, which is the core remote proxy object at the client.
  • Then, the Invoker will be transformed into a dynamic proxy reference that implements the user interface through a dynamic proxy. At this time, the Invoker here carries the functions of network connection, service call and retry. On the client, it may be a remote implementation or a cluster implementation.

In the ReferenceConfig#createProxy method:

 private T createProxy(Map<String, String> map) {
       // Check whether it is an internal reference of the same JVM
        if (shouldJvmRefer(map)) {
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear(); // reference retry init will add url to urls, lead to OOM
            // If the address is indicated
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
                    checkRegistry();
                    ,
                    List<URL> us = loadRegistries(false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = loadMonitor(u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            } 
                            // The URL splices the metadata information of consumer services and adds it to the collection urls
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }

            if (urls.size() == 1) {
            // Processing of single registration center
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
            // Handling of multiple registries
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                // Get the services of the registry one by one and add them to the invokers list
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use RegistryAwareCluster only when register's CLUSTER is available
                    URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    // Merge multiple invoker s
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (shouldCheck() && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }
        // create service proxy
        // Turn the invoker into the agent of the interface
        return (T) PROXY_FACTORY.getProxy(invoker);
    }

About the creation of registry instances, the functions of metadata registration to the registry and subscription are mainly in the registryprotocol #ref method:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
		// Set specific registration agreement
        url = URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
        // Create an instance of a specific registry
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        // Extract the metadata information saved in the consumer's refer ence. If multiple grouping values are included, the call result values will be merged
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // Process subscription data and merge multiple invokers through cluster
        return doRefer(cluster, registry, type, url);
    }

doRefer method:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// The core of consumption is to hold the actual Invoker and receive subscription notifications. Service changes will trigger this class to call back the notify method to re reference the service
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
             // Register consumption information to the registration center
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        // Subscription service provider, routing and dynamic configuration
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
		// The invokers are merged through the Cluster, and the FailoverCluster policy is enabled by default to retry the service call
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

(unfinished, to be continued...)

Keywords: Dubbo source code

Added by prometheuzz on Tue, 08 Mar 2022 15:38:16 +0200