- Articles may be updated in priority Github , including article error correction and content addition. Other platforms will be late.
- Reprint notice: please indicate the source of GitHub, let's maintain a good technical creation environment together.
- If you want to submit issue or pr, it is recommended to Github Submit. The author will update one after another. If it helps you, you might as well Github Order a Star ~. Your Star is the driving force of my creation.
Section V: Dubbo service registration (export) source code analysis
Overview of service export principle
- The entry of service export is the export() method in ServiceBean. After Spring is started, the execution of the export() method is triggered by receiving the ContextRefreshedEvent event of Spring.
- A ServiceBean object represents a Dubbo Service. The parameters in the ServiceBean object represent the parameters of the Service, such as timeout. The parameter values of the object come from the parameters defined in the @ Service annotation.
- There are two main things to do for service export:
- According to the parameter information of the service, start the corresponding network server (netty, tomcat, jetty, etc.) to receive the network request
- Register the information of the service with the registry
- However, before doing these two things, you must first determine the parameters of the Service, because the parameters of a Dubbo Service can not only be configured in the @ Service annotation, but also inherit the configuration on the Application to which the Dubbo Service belongs, but also configure the parameters of a Service in the configuration center or JVM environment variables. Therefore, the first thing to do is to determine the final parameters of the current Service (highest priority) parameter value.
- After determining the service parameters, start the corresponding network server according to the configured protocol. When starting the network server and when the network server receives the request, you can obtain information from the service parameters, such as maximum connections, threads, socket timeout, etc.
- After starting the network server, register the service information in the registry. At the same time, register a listener with the registry to listen to the dynamic configuration information changes in Dubbo.
Service export means service registration
Evolution of service concept
- The DemoService interface represents a service, and the service at this time represents a service definition
- DemoServiceImpl represents the specific implementation of DemoService service, and the service at this time represents the specific implementation of service
- DemoService+group+version represents a service. At this time, the concepts of grouping and version are added to the service
- http://192.168.31.211:80/cn.imlql.DemoService Indicates a service. The service adds machine IP and Port, indicating that the remote machine can access this URL to use cn.imlql.DemoService
- http://192.168.31.211:80/cn.imlql.DemoService ? timeout = 3000 & version = 1.0.1 & application = Dubbo demo provider application indicates a service. At this time, the service has parameters, such as timeout, version number and application
dubbo is the last way to represent services.
Service export idea
Several things to do for service export:
- Determine the parameters of the service
2. Determine the service support agreement
2. Construct the final URL of the service - According to different protocols supported by the service, start different servers to receive and process requests
- Register the service URL to the registry
- Because Dubbo supports dynamic configuration of service parameters, a Listener needs to be bound when exporting a service to monitor whether the parameters of the service have been modified. If any modification is found, it needs to be exported again
Determine the parameters of the service
Overview of determining service parameters
-
When executing ServiceConfig.export(), the ServiceConfig object represents a Service (or ServiceBena represents a Service because it is an inheritance relationship). We already know the name of the Service (that is, the name of the Service provider interface) At this time, the Service may already have some parameters, that is, the parameters defined on the * * @ Service annotation * *.
-
However, in Dubbo, in addition to configuring parameters for services in the @ Service annotation, there are many places where you can configure parameters for services, such as:
- dubbo.properties file. You can create this file. dubbo will read the contents of this file as service parameters. Dubob's source code is called PropertiesConfiguration
- Configuration center: Dubbo supports the distributed configuration center after version 2.7. You can operate the configuration center in Dubbo admin. The distributed configuration center is equivalent to a remote dubbo.properties file. You can modify the dubbo.properties file in Dubbo admin. Of course, the configuration center supports configuration by application or global, In Dubbo's source code, AppExternalConfiguration represents application configuration and ExternalConfiguration represents global configuration.
- System environment variable. You can specify parameters by - D when starting the application. It is called SystemConfiguration in Dubbo's source code
- In addition, the parameters configured through the @ Service annotation are called AbstractConfig in Dubbo's source code
-
Service parameters can come from these four locations. If the same parameter is configured in these four locations, the priority can be from high to low:
- SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration
- SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration
-
When exporting a service, you must first determine the parameters of the service. Of course, the parameters of the service can come from its superior in addition to its own configuration. For example, if the service itself does not have a timeout parameter configured, but if the application to which the service belongs has a timeout configured, the services under the application will inherit the timeout configuration. Therefore, when determining the service parameters , you need to get the parameters from the superior first. After that, if the service itself is configured with the same parameters, it will be overwritten.
Determine the parameters of the service
ServiceBean
//After Spring is started, the execution of the export() method is triggered by receiving the ContextRefreshedEvent event of Spring. @Override public void onApplicationEvent(ContextRefreshedEvent event) { // The service cannot be exported until the current service has not been exported and uninstalled if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // Service export (service registration) export(); } } @Override public void export() { //Call ServiceConfig#export() super.export(); // Publish ServiceBeanExportedEvent // After Spring starts publishing ContextRefreshedEvent event -- > service export -- > publishing ServiceBeanExportedEvent // Programmers can monitor whether the service export is completed through the ApplicationListener in Spring publishExportEvent(); } private void publishExportEvent() { //By listening to this event, you can know whether Dubbo's service has been registered ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this); applicationEventPublisher.publishEvent(exportEvent); }
ServiceConfig
public synchronized void export() { //Read service configuration checkAndUpdateSubConfigs(); // Check whether the service needs to be exported if (!shouldExport()) { return; } // Check if delayed publishing is required if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // Export service doExport(); } } public void checkAndUpdateSubConfigs() { // Use default configs defined explicitly on global configs // If some properties in ServiceConfig are empty, they are obtained from ProviderConfig, ModuleConfig and ApplicationConfig // Complete the properties in ServiceConfig completeCompoundConfigs(); // Config Center should always being started first. // Obtain the configuration from the configuration center, including application configuration and global configuration // Put the obtained configuration into externalConfigurationMap and appExternalConfigurationMap in the Environment // And refresh the properties of all XxConfig (except ServiceConfig). Refresh means to overwrite the configuration of the configuration center and call the properties in XxConfig // Call AbstractInterfaceConfig#startConfigCenter() startConfigCenter(); checkDefault(); checkProtocol(); checkApplication(); // if protocol is not injvm checkRegistry // If the protocol is not only the injvm protocol, which means that the service call is not only called in the local jvm, the registry is required if (!isOnlyInJvm()) { checkRegistry(); } // Refresh ServiceConfig and call AbstractConfig#refresh() this.refresh(); // If metadata reportconfig is configured, refresh the configuration checkMetadataReport(); if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); } // The implementation class corresponding to the current service is a GenericService, indicating that there is no specific interface if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { // Load interface try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // Refresh MethodConfig and judge whether the corresponding method in MethodConfig exists in the interface checkInterfaceAndMethods(interfaceClass, methods); // Is the implementation class the interface type checkRef(); generic = Boolean.FALSE.toString(); } // local, like stub, is not recommended if (local != null) { // If the local stub is true, the stub class is interfaceName + "Local" if (Boolean.TRUE.toString().equals(local)) { local = interfaceName + "Local"; } // Load local stub class Class<?> localClass; try { localClass = ClassUtils.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } // Local stub if (stub != null) { // If the local stub is true, the stub class is interfaceName + "Stub" if (Boolean.TRUE.toString().equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName); } } // Check local and stub checkStubAndLocal(interfaceClass); // Check mock checkMock(interfaceClass); } /** * 1.In the last section, we wrote this configuration on the startup class * @PropertySource("classpath:/spring/dubbo-provider.properties") * 2.When Spring starts, it will load the configuration into some xxxConfig [mentioned when Spring integrates Dubbo] * 3.@Service The parameters configured in the annotation are first read into the ServiceBean * 4.Then, this method will be called to complete the configuration of ServiceBean. Where can I complete it? We configured it from above * dubbo-provider.properties Complete * 5.ServcieBean It inherits ServiceConfig, so it has the same meaning, which is emphasized here * In case you don't understand */ private void completeCompoundConfigs() { // If the provider is configured, get information from the provider and assign other attributes. If these attributes are empty if (provider != null) { if (application == null) { setApplication(provider.getApplication()); } if (module == null) { setModule(provider.getModule()); } if (registries == null) { setRegistries(provider.getRegistries()); } if (monitor == null) { setMonitor(provider.getMonitor()); } if (protocols == null) { setProtocols(provider.getProtocols()); } if (configCenter == null) { setConfigCenter(provider.getConfigCenter()); } } // If the module is configured, the information is obtained from the module and other attributes are assigned. When these attributes are empty if (module != null) { if (registries == null) { setRegistries(module.getRegistries()); } if (monitor == null) { setMonitor(module.getMonitor()); } } // If the application is configured, get information from the application and assign other attributes. If these attributes are empty if (application != null) { if (registries == null) { setRegistries(application.getRegistries()); } if (monitor == null) { setMonitor(application.getMonitor()); } } }
AbstractInterfaceConfig
void startConfigCenter() { if (configCenter == null) { ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc); } // If ConfigCenter is configured if (this.configCenter != null) { // Obtain the relevant attribute information of the configuration center from other locations, such as the configuration center address // TODO there may have duplicate refresh this.configCenter.refresh(); // After the attribute is updated, obtain data from the remote configuration center (application configuration, global configuration) prepareEnvironment(); } // After getting the configuration data from the configuration center, refresh the properties in all XxConfig except ServiceConfig ConfigManager.getInstance().refreshAll(); } private void prepareEnvironment() { if (configCenter.isValid()) { if (!configCenter.checkOrUpdateInited()) { return; } // Dynamic configuration center, configuration center on management console DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl()); // If it is zookeeper, you will get the contents in the / dubbo/config/dubbo/dubbo.properties node String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup()); String appGroup = application != null ? application.getName() : null; String appConfigContent = null; if (StringUtils.isNotEmpty(appGroup)) { // What you get is the content in the / Dubbo / config / Dubbo demo consumer application / dubbo.properties node // There are bug s here appConfigContent = dynamicConfiguration.getProperties (StringUtils.isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(), appGroup ); } try { Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority()); //This is the global, that is, the global in the configuration management on the web page Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent)); //This is the configuration of an application Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent)); } catch (IOException e) { throw new IllegalStateException("Failed to parse configurations from Config Center.", e); } } }
ConfigManager
public void refreshAll() { // refresh all configs here, getApplication().ifPresent(ApplicationConfig::refresh); getMonitor().ifPresent(MonitorConfig::refresh); getModule().ifPresent(ModuleConfig::refresh); getProtocols().values().forEach(ProtocolConfig::refresh); getRegistries().values().forEach(RegistryConfig::refresh); getProviders().values().forEach(ProviderConfig::refresh); getConsumers().values().forEach(ConsumerConfig::refresh); }
AbstractConfig
/** * 1.Refresh XxConfig * 2.The attribute of an XxConfig object may or may not have a value. In this case, you need to obtain the attribute value from other locations to overwrite the attribute * The priority of coverage, from large to small, is system variable - > configuration center application configuration - > configuration center global configuration - > annotation or definition in xml - > dubbo.properties file * 3.Take ServiceConfig as an example. ServiceConfig includes many attributes, such as timeout * However, when defining a Service, if timeout is not configured on the annotation, the timeout configuration will be obtained elsewhere * For example, you can define - > dubbo.properties file from system variable - > configuration center application configuration - > configuration center global configuration - > annotation or xml * refresh Yes, refresh to update the property corresponding to the set method on the current ServiceConfig to the value with the highest priority */ public void refresh() { try { /** * 1.The configuration priority determined here is from high to low * System environment variable [JVM environment variable - > operating system environment variable] - [configuration center application configuration - > configuration center global configuration - > dubbo.properties file * 2.Environment#getConfiguration() was called */ CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId()); // Represents the XxConfig object itself - AbstractConfig Configuration config = new ConfigConfigurationAdapter(this); // ServiceConfig if (Environment.getInstance().isConfigCenterFirst()) {//This is the default // Priority order: systemconfiguration - > appexternalconfiguration - > externalconfiguration - > abstractconfig - > propertiesconfiguration compositeConfiguration.addConfiguration(4, config); } else { // The sequence would be: SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration compositeConfiguration.addConfiguration(2, config); } // loop methods, get override value and set the new value back to method Method[] methods = getClass().getMethods(); //ServiceBean for (Method method : methods) { // Is it the setXX() method if (MethodUtils.isSetter(method)) { // Get the value of xx configuration item String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method))); // isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig. if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) { method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value)); } // Is it the setParameters() method } else if (isParametersSetter(method)) { // Gets the value of the parameter configuration item String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method))); if (StringUtils.isNotEmpty(value)) { Map<String, String> map = invokeGetParameters(getClass(), this); map = map == null ? new HashMap<>() : map; map.putAll(convert(StringUtils.parseParameters(value), "")); invokeSetParameters(getClass(), this, map); } } } } catch (Exception e) { logger.error("Failed to override ", e); } }
Environment
public CompositeConfiguration getConfiguration(String prefix, String id) { CompositeConfiguration compositeConfiguration = new CompositeConfiguration(); // Config center has the highest priority // JVM environment variables compositeConfiguration.addConfiguration(this.getSystemConfig(prefix, id)); // Operating system environment variables compositeConfiguration.addConfiguration(this.getEnvironmentConfig(prefix, id)); // Configuration center APP configuration compositeConfiguration.addConfiguration(this.getAppExternalConfig(prefix, id)); // Configuration center Global configuration compositeConfiguration.addConfiguration(this.getExternalConfig(prefix, id)); // Configuration in dubbo.properties compositeConfiguration.addConfiguration(this.getPropertiesConfig(prefix, id)); return compositeConfiguration; }
Determine the protocol supported by the service
It is relatively simple to determine the protocols supported by the service, which depends on how many protocols the user has configured. And the meaning of service parameters, Protocol can also be configured at various configuration points.
- First, the protocol may be configured in the application.properties file of SpringBoot
- The protocol may also be configured in the dubbo.properties file
- The protocol may also be configured in the configuration center
- The protocol may also be configured through - D
Therefore, when exporting services, you need to obtain protocols from the above places. The result may be one protocol or multiple protocols, so as to determine the protocol.
URL role
-
resources
- Registry URL: zookeeper://ip+port?dynamic=true
- Services: dubbo://ip+port / interface name? timeout=3000
- Services: http://ip+port / interface name? timeout=3000
-
Convenient expansion
Construct the final URL of the service
With a certain protocol, service name and service parameters, you can naturally assemble the URL of the service.
However, it is also very important to support dynamic service configuration in Dubbo. Note that this is not the same concept as the configuration center. Dynamic configuration can dynamically modify the service configuration after the service is exported, but the configuration center can not achieve this effect (I want to confirm this).
Dynamic configuration is actually adding some parameters to the service. Therefore, before registering the service URL in the registry, you must rewrite the URL according to the configuration added in the dynamic configuration, that is, the parameters in the dynamic configuration on the application.
Only after doing so can the URL obtained be the truly accurate service provider URL.
Start the service registration process
- According to different protocols supported by the service, start different servers to receive and process requests
- Register the service URL to the registry
- Because Dubbo supports dynamic configuration of service parameters, a Listener needs to be bound when exporting a service to monitor whether the parameters of the service have been modified. If any modification is found, it needs to be exported again
These three steps will be done in the ServiceConfig#export()#doExport() method. The process is complex. Just look at the code directly
Public source code
The source code of this part is common to the first three steps
ServiceConfig
public synchronized void export() { //Read service configuration checkAndUpdateSubConfigs(); // Check whether the Service needs to be exported. It can be configured in @ Service if (!shouldExport()) { return; } // Check whether it is necessary to delay publishing, which can be configured in @ Service if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // Export service doExport(); } } protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } // If it has been exported, it will not be exported again if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); } private void doExportUrls() { // The registry URL represents a registry List<URL> registryURLs = loadRegistries(true); //Each configured protocol will generate a dubbo service, so here is a circular configuration protocol, // Here, we assume that dubbo is configured, but two ports are configured, which is also two protocol s for (ProtocolConfig protocolConfig : protocols) { // pathKey = group/contextpath/path:version // Example: myGroup/user/org.apache.dubbo.demo.DemoService:1.0.1 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); // There are service provider access paths, implementation classes, interfaces, and providermethodmodels corresponding to each method in the interface in the ProviderModel // ProviderMethodModel represents a method, its name, and the name of the service to which it belongs, ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); // ApplicationModel indicates which service providers are in the application and which services are referenced ApplicationModel.initProviderModel(pathKey, providerModel); // The point is that each agreement will register a service doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // protocolConfig represents a protocol, and registryURLs represents all registries // If name is not configured for a configured protocol, it defaults to dubbo String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } // This map represents the parameters of the service url Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); // Monitoring center parameters appendParameters(map, metrics); // Application related parameters appendParameters(map, application); // Module related parameters appendParameters(map, module); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY); // Provider related parameters appendParameters(map, provider); // Protocol related parameters appendParameters(map, protocolConfig); // Service related parameters appendParameters(map, this); // For some method parameters in the Service, some parameters can be configured for some methods in @ Service if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { // For the configuration parameters of a method, note the prefix appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; // If xx.retry=false exists in a method configuration, change it to xx.retry=0 if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if (Boolean.FALSE.toString().equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { // Traverse the parameter configuration in the current method configuration for (ArgumentConfig argument : arguments) { // If type is configured, you can traverse all the methods of the current interface, and then find the method with the same method name as the current method name. There may be multiple methods // If index is configured, check whether the parameter type at the corresponding position of index is equal to type. If so, store the parameters in the argument object into the map // If index is not configured, all parameter types of the method will be traversed. If it is equal to type, the parameters in the argument object will be stored in the map // If the type is not configured but the index is configured, put the argument at the corresponding position into the map // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // Get the names of all methods in the interface through the Wrapper corresponding to the interface String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // Token is used to prevent the Service from being directly called by consumers (forging http requests). It can be configured in @ Service // What is prevented here is that some consumers do not call the provider with the URL obtained from the registration center, but with the URL spelled out by the consumers themselves // A Tokenfilter filter is used to intercept the service call (to be described later) if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // export service // Access the service through the host and port String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // Service url URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); /** * url: http://192.168.40.17:80/org.apache.dubbo.demo.DemoService?anyhost=true&application= * dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService * &bind.ip=192.168.40.17&bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true& * generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=285072 * &release=&side=provider×tamp=1585206500409 * * 1.You can configure the service url again through ConfiguratorFactory * 2.This means that you can implement an implementation class of ConfiguratorFactory and implement the corresponding methods to customize and modify the URL * 3.This implementation class is loaded through SPI */ if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // scope may be null, remote, local,none // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // If the scope is none, there will be no service export, neither remote nor local // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { // If the scope is not remote, it will be exported locally. The protocol of the current url will be changed to injvm, and then exported // In this case, only the local JVM can call exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { // If the scope is not local, it will be exported remotely if (CollectionUtils.isNotEmpty(registryURLs)) { // If there is a registry, register the service with the registry for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register // If it is an injvm, you do not need to register with the registry if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } // Whether the service is dynamic or not, the corresponding zookeeper indicates whether it is a temporary node, and the corresponding function in dubbo is a static service url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // Get the address of the monitoring center URL monitorUrl = loadMonitor(registryURL); // Which monitoring center is the current service connected to if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } // The register parameter of the service. If true, it means to register with the registry if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } } // For providers, this is used to enable custom proxy to generate invoker // The dynamic proxy mechanism used by the service. If it is empty, javassit is used String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } /** * 1.Generate a proxy object for the current service interface * 2.An Invoker is generated using a proxy. The Invoker represents the proxy of the service provider. You can use the invoke method of the Invoker to execute the service * That is to spell the URL of the registry and the URL of the service, registryURL + "export" + url, and the corresponding URL is: * <code> registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService? application=dubbo-demo-annotation-provider&dubbo=2.0.2&export= http://192.168.40.17:80/org.apache.dubbo.demo.DemoService? anyhost=true&application=dubbo-demo-annotation-provider&bean.name= ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17& bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true&generic= false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello& pid=19472&release=&side=provider×tamp=1585207994860&pid=19472& registry=zookeeper×tamp=1585207994828 * <code/> * 3.This Invoker includes the implementer of the service, the service interface class, and the registered address of the service (for the current service, the parameter export) (current service specified) * 4.This invoker represents an executable service. Call the invoke() method of the invoker to execute the service. At the same time, this invoker can also be used to export * When the service is exported (registered), the invoker only exists in a certain place and will not be executed until the consumer invokes the service * 5.ref This is the service implementation class mentioned earlier * 6.The second parameter here is the URL (specifically registryURL), which will be used by the exporter soon */ Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // invoker.invoke(Invocation) // DelegateProviderMetaDataInvoker also represents a service provider, including the configuration of Invoker and service //Wrap this (that is, serviceconfig service parameter) and invoker service implementation class again DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); /** * A specific protocol is used to export the service. The protocol here is RegistryProtocol. After successful export, an Exporter is obtained * 1.exporter How does the exporter determine which class export method to use? The SPI mechanism will determine which invoker has the getURL method * [I don't know how to adjust which method of which class. Please see SPI] * 2.Because the previous invoker passes the registryURL, we will use RegistryProtocol for service registration here * registryURL It can be understood as the registration agreement of the registry. debug here, you will see this registry://127.0.0.1:2181...... * 3.After registration, export using DubboProtocol * 4.What has been done so far? ServiceBean. Export() -- > refresh the parameters of ServiceBean -- > get the registry URL and protocol URL -- > * Traverse each protocol URL -- > to form a service URL -- > to generate an executable service invoker -- > export service * 5.RegistryProtocol#export(Invoker) will be called here */ Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // Services are also exported when the registry is not configured if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ // According to the service url, the meta information of the service is stored in the metadata center MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }
Start Netty,Tomcat and other Server source codes
RegistryProtocol
@SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); /** * 1.Here is the knowledge of SPI. Where does the value of the protocol attribute come from? It is injected into SPI. It is a proxy class * 2.InvokerDelegate The parent class InvokerWrapper of has a getURL method, so the final SPI decides which extension point to call * It is determined through the providerUrl, and the providerUrl here is basically DubboProtocol or HttpProtocol to export * 3.We use dubbo protocol here, so DubboProtocol will be called * 4.Why do I need an ExporterChangeableWrapper? It is convenient to log off services that have been exported */ return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
DubboProtocol
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // key that uniquely identifies a service String key = serviceKey(url); // Construct an Exporter to export services DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { // stub method of service stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // Turn on NettyServer // Request -- > invocation -- > service key -- > exportermap.get (key) - > exporter -- > invoker -- > invoker. Invoke (invocation) - > execute service openServer(url); // Some special serialization mechanisms, such as kryo, provide a registration mechanism to register classes to improve the speed of serialization and deserialization optimizeSerialization(url); return exporter; } private void openServer(URL url) { // find server. String key = url.getAddress(); // Get the ip address and port, 192.168.40.17:20880 // NettyClient, NettyServer //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // Cache Server object ExchangeServer server = serverMap.get(key); // DCL,Double Check Lock if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // Create a Server and cache it serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override // When the service is re exported, it will go here and call HeaderExchangeServer#reset server.reset(url); } } }
AbstractProtocol
protected static String serviceKey(URL url) { int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); // Path is matched with the @ Service annotation. If path is matched, it will be used. If not, it will not be used. These four parameters are used to generate key s that uniquely identify a Service // It can be seen from here that if the protocol is the same, as long as the port number is different, it is still a different service return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY)); }
Start Server summary
This is not the core of dubbo, so we don't post the source code
Protocols are specified in the service URL, such as Http protocol and Dubbo protocol. Start the corresponding Server according to different protocols.
For example, the Http protocol starts Tomcat and Jetty.
For example, the Dubbo protocol starts Netty.
You can't just start the Server. You also need to bind a RequestHandler to process requests.
For example, the Http protocol corresponds to the InternalHandler. The Dubbo protocol corresponds to the exchange handler.
Here is a detailed analysis of the Server started by the Dubbo protocol.
-
Call the OpenServer (URL) method of DubboProtocol to start the Server
-
Invoke the createServer(url) method of DubboProtocol, call **Exchangers.bind(url, requestHandler) * in createServer() method to get an ExchangeServer.
-
Where requestHandler represents the request processor and is used to process requests
-
In * * exchanges. Bind (URL, requesthandler) * *, you will first get an Exchanger according to the URL. The default is HeaderExchanger
-
HeaderExchangeClient and HeaderExchangeServer are included in HeaderExchanger
-
HeaderExchangeClient is responsible for sending heartbeat and HeaderExchangeServer is responsible for receiving heartbeat. If it times out, the channel will be closed
-
Before constructing HeaderExchangeServer, it will connect to a Server by calling the transporters. Bind (URL, new decodehandler (New headerexchangehandler (handler)) method
-
By default, getTransporter will be used to bind (URL, channelhandler, listener) to get a Servlet. At this time, the listener is the DecodeHandler passed in from the outside
-
In the bind method of NettyTransporter, you will go to new NettyServer(url, listener), so the Server returned above is NettyServer by default
-
When constructing NettyServer, it will call ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)) to construct a ChannelHandler.
-
The handler in wrap is the listener above
-
In the wrap method, new multimessagehandler (New heartbeathandler (extensionloader. Getextensionloader (dispatcher. Class). Getadaptiveextension(). Dispatch (handler, URL)) will be called); Construct a ChannelHandler.
-
After the ChannelHandler is constructed, the Server is really opened, and the Dopen method of the AbstractServer abstract class will be called.
-
In NettyServer, the doOpen method will be implemented. new NettyServerHandler(getUrl(), this) will be called to construct a NettyServerHandler and bind the address
-
At this point, the Server startup process of DubboProtocol protocol ends.
-
NettyServer Handler: the request processor directly binding to NettyServer is responsible for receiving requests from Netty, channelRead() method gets the request, and then calls the next (NettyServer) received() method to pass the request, and the request is Object msg at the same time.
-
NettyServer: received() exists in the parent class AbstractPeer of NettyServer. This method does nothing and directly passes msg to the next layer Handler (MultiMessageHandler)
-
MultiMessageHandler: this Handler will judge whether msg is a MultiMessage. If yes, the MultiMessage will be split, and the split msg will be passed to the lower Handler (HeartbeatHandler). If not, the msg will be directly passed to the lower Handler (HeartbeatHandler)
-
HeartbeatHandler: this Handler receives msg through the received() method, and then determines whether the msg is a heartbeat request or heartbeat Response. If it is a heartbeat request, this Handler returns a Response object (a very simple object). If it is a heartbeat Response, a log will be printed without other logic. If it is not, msg will be passed to the lower Handler (AllChannelHandler).
-
AllChannelHandler: this Handler receives the msg through the received() method, then encapsulates the msg as a ChannelEventRunnable object, and throws the ChannelEventRunnable into the thread pool to process the msg asynchronously. In the ChannelEventRunnable, the msg will be handed over to the next Handler (DecodeHandler)
-
DecodeHandler: this Handler receives msg through the received() method, decode s the msg, and then gives it to the next Handler(HeaderExchangeHandler)
-
HeaderExchangeHandler: this Handler receives msg through the received() method and will judge the type of msg
- If the Request is TwoWay, the reply method of the next handler (requesthandler in dubboprotocol) will be called to get a result, and then return
- If the Request is not TwoWay, the received method of the next handler (requesthandler in dubboprotocol) will be called to process the Request and no result will be returned
-
requestHandler: this Handler is the real processing request logic. In the received() method, if msg is Invocation, it will call the reply method, but it will not return the result returned by the reply method. In reply method, the msg is converted to Invocation type inv, then the corresponding service is obtained according to the Invocation, then the method is called to get the result.
Service registration source code
RegistryProtocol
public void register(URL registryUrl, URL registeredProviderUrl) { // Finally, the SPI mechanism is used to judge what is passed. We turned the registry into the URL of zookeeper Registry registry = registryFactory.getRegistry(registryUrl); // Therefore, the register method of ZookeeperRegistry will be called here. In fact, the parent class FailbackRegistry of ZookeeperRegistry will be called first registry.register(registeredProviderUrl); }
FailbackRegistry.java
public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Then call ZookeeperRegistry#doRegister here. This URL parameter is obviously a withdrawal of SPI doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } }
ZookeeperRegistry
@Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
Principle of service listener
Dynamic configuration cannot change the port
Principle summary of service listener
-
During the export process, the service needs to subscribe to the data in the dynamic configuration center, so that the service provider can make changes in time after the manager modifies the parameters of the corresponding service in the dynamic configuration center. This function involves version compatibility, because this function also existed before Dubbo 2.7, and Dubbo 2.7 began to adjust this function.
-
Prior to Dubbo 2.7, only dynamic configuration of multiple services was supported
-
After Dubbo 2.7, it supports not only the dynamic configuration of a single service, but also the dynamic configuration of an application (equivalent to taking effect for all services under the application)
-
In order to achieve this function, the Watcher mechanism of Zookeeper needs to be used. Therefore, for service providers, which Zookeeper node do I monitor for data changes?
-
This node is regular and different before and after Dubbo 2.7:
- Before Dubbo 2.7: the zk path monitored is: / Dubbo / org.apache.dubbo.demo.demoservice/configurators/ override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_ Config = true & dynamic = false & enabled = true & timeout = 6000. Note that you are listening for changes in node names, not node contents
- After Dubbo 2.7, the zk path monitored is:
- Services: contents of the / dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators node
- Apply the contents of the: / Dubbo / config / Dubbo / Dubbo demo provider application.configurators node
-
Note that to distinguish from the path of the configuration center, the path of the configuration center is:
- Apply the contents of the: / dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties node
- Global: / contents of dubbo/config/dubbo/dubbo.properties node
Therefore, when exporting a service, you need to generate a corresponding listener instance for the current service at the service provider. This listener instance is OverrideListener, which is responsible for monitoring the dynamic configuration changes of the corresponding service and rewriting the service URL according to the parameters of the dynamic configuration center.
In addition to OverrideListener, another two are added after Dubbo 2.7:
- ProviderConfigurationListener: it listens to the dynamic configuration data modification of an application, so it is an attribute in the RegistryProtocol class and is instantiated with the instantiation of RegistryProtocol. There is only one in an application
- ServiceConfigurationListener: it monitors the dynamic configuration data modification of a service. Similar to OverrideListener, it also corresponds to a service. Therefore, it will be generated when exporting each service. In fact, there is an internal attribute of ServiceConfigurationListener, OverrideListener. Therefore, when the monitoring data of ServiceConfigurationListener changes, The latest data of the configuration center will be handed over to the OverrideListener to rewrite the service URL.
- At the same time, the OverrideListener corresponding to all services is saved in the RegistryProtocol class, so in fact, when the ProviderConfigurationListener listens to data changes, it will call each OverrideListener in turn to rewrite the service URL corresponding to the service.
- ProviderConfigurationListener listens to the / Dubbo / config / Dubbo / Dubbo demo provider application.configurators node
- The ServiceConfigurationListener listens to the / dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators node
Sorting and modifying dynamic configuration trigger process:
-
Modify the service dynamic configuration, and the bottom layer will modify the data in Zookeeper,
- /Contents of the dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators node
-
ServiceConfigurationListener will listen to changes in node content and trigger the process (configchangeevent) method of the parent class AbstractConfiguratorListener of ServiceConfigurationListener
-
ConfigChangeEvent indicates an event. There are event types, event contents (node contents) and the name of the node triggering the event. There are three event types:
- ADDED
- MODIFIED
- DELETED
-
When a ConfigChangeEvent event is received, it will be processed according to the event type
- ADDED and MODIFIED: the URL of the override: / / protocol will be generated according to the node content, and then the Configurator will be generated according to the URL. The Configurator object is very important and represents a Configurator. The URL can be rewritten according to the Configurator
- DELETED: deletes all configurators in ServiceConfigurationListener
-
After Configurator is generated, the notifyOverrides() method is used to rewrite the service URL.
-
Note that each rewriting does not only use the Configurator generated above, but also uses all configurators, including the Configurator of this service, the Configurator of this application, and the Configurator of the old version management console. The logic of rewriting the URL is as follows:
-
Get the currently exported service URL currenturl from the exporter
-
Rewrite the service URL according to the Configurator of the old version management console
-
Rewrite the service URL according to the Configurator in the providerConfigurationListener
-
Rewrite the service URL according to the Configurator of the corresponding service in serviceConfigurationListeners
-
If newUrl and currentUrl are equal after rewriting, nothing needs to be done
-
If newUrl and currentUrl are not equal after rewriting, you need to re export the service:
- Export according to newUrl. Note that this is just to call DubboProtocol export to start NettyServer again
- Simplify newUrl to registeredProviderUrl
- Call the unregister() method of RegistryProtocol to delete the service provider URL before the current service from the registry
- Call the register() method of RegistryProtocol to register the new registeredProviderUrl with the registry
-
Service listener binding source code
RegistryProtocol
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // Export service // registry:// ---> RegistryProtocol // zookeeper:// ---> ZookeeperRegistry // dubbo:// ---> DubboProtocol /** * 1.registry://xxx?xx=xx®istry=zookeeper ---> zookeeper://xxx?xx=xx Represents the registry * Here is to replace registry with zookeeper * 2.Example: zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application= * dubbo-demo-provider-application&dubbo=2.0.2&export=dubbo://192.168.40.17:20880/ * org.apache.dubbo.demo.DemoService?anyhost=true&application= * dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService * &bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2& * dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService& * logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000& * timestamp=1590735956489&logger=log4j&pid=27656&release=2.7.0×tamp=1590735956479 */ URL registryUrl = getRegistryUrl(originInvoker); // Get the service provider url, which represents the service provider /** * 1.Here is the dubbo service url spelled after export * 2.Example: dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application= * dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService& * bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true& * generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello * &pid=27656&release=2.7.0&side=provider&timeout=3000×tamp=1590735956489 * 3.The ultimate purpose of service export is to save the providerUrl to the registry, but there are some other operations in the middle */ URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. // overrideSubscribeUrl is the dynamic configuration monitoring url of the old version, which indicates the services to be monitored and the type of monitoring (configurators, which is the dynamic configuration of the old version) // Based on the service provider url, generate an overrideSubscribeUrl with the protocol provider: / /, and add the parameter category = configurators & check = false final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // An overrideSubscribeUrl corresponds to an OverrideListener, which is used to listen to change events. After listening to the change of overrideSubscribeUrl, // The OverrideListener will handle the changes accordingly. See the implementation of OverrideListener for the specific processing logic final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); /** * In this method, providerConfigurationListener and serviceConfigurationListener will be used to rewrite providerUrl * providerConfigurationListener Represents an application level dynamic configuration listener. providerConfigurationListener is a property of RegistyProtocol * serviceConfigurationListener Represents a dynamic configuration listener of service level. serviceConfigurationListener generates one every time a service is exposed * Both listeners are listeners in the new version * The zk path of the new version listening is: * Services: contents of the / dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators node * Apply the contents of the: / Dubbo / config / Dubbo / Dubbo demo provider application.configurators node * Note that to distinguish from the path of the configuration center, the path of the configuration center is: * Apply the contents of the: / dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties node * Global: / contents of dubbo/config/dubbo/dubbo.properties node */ providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // export invoker // After rewriting the providerUrl according to the dynamic configuration, DubboProtocol or HttpProtocol will be called to export the service, // netty and tomcat will be started here final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry // Get the registry ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // The provider url stored in the registry will simplify the parameters in the service provider url, // Because it is useless to save some parameters to the registry final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // Save the current service provider Invoker, the registry address corresponding to the service, and the simplified service url into the ProviderConsumerRegTable ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish //Do you need to register with the registry boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // Register the service and register the simplified service provider url in registryUrl register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } /** * For the dynamic configuration of the old version, the overrideSubscribeListener needs to be bound to the overrideSubscribeUrl to listen * Compatible with the configuration modification of the old version, use the overrideSubscribeListener to monitor the dynamic configuration changes of the old version * Listen for overridesubscribeurl provider://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true& * application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService& * bind.ip=192.168.40.17&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2& * dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=416332& * release=&side=provider×tamp=1585318241955 * When are the new versions of providerConfigurationListener and serviceConfigurationListener subscribed? When these two classes are constructed * Deprecated! Subscribe to override rules in 2.6.x or before. * The zk path monitored by the old version is: / Dubbo / org.apache.dubbo.demo.demoservice/configurators/ override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_ config=true&dynamic=false&enabled=true&timeout=6000 * Listen to the contents of the path, not the contents of the node */ registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) { /** * 1.For application configuration, the providerConfigurationListener is directly initialized in the attribute, * providerConfigurationListener It will listen for changes in application configuration information in the configuration center * Each application has only one providerConfigurationListener * 2.First, the process here is: * 1.ProviderConfigurationListener Call the parent class AbstractConfiguratorListener through the constructor * #initWith method * 2.In the initWith method, listen to the registry through the path key passed in (commonly used is zookeeper) * key The node under the path will first get the current configuration from the registry and then convert it to configurators * 3.Next, call overrideUrl here and generate a new providerUrl with the previous configurators * 4.This is because the previous providerUrl is annotated with @ Service and the configuration center file (yml or properties) * There is also a URL composed of the configuration in the - D startup parameter. But this providerUrl has not been * The dynamic configuration of the web side, so you need to rewrite the URL here * 5.ServiceConfigurationListener Similarly, the ServiceConfigurationListener code is followed * Therefore, it is obvious that 'service configuration' will override 'application configuration' */ providerUrl = providerConfigurationListener.overrideUrl(providerUrl); // Service configuration. The new ServiceConfigurationListener will be initialized. The ServiceConfigurationListener will listen to the service information and configuration information changes in the configuration center // This is that each service will create a new ServiceConfigurationListener ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener); serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener); return serviceConfigurationListener.overrideUrl(providerUrl); } //RegistryProtocol internal class public ProviderConfigurationListener() { //Subscription application name + ". configurators" here is the listening path of the new version of ProviderConfigurationListener this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); }
AbstractConfiguratorListener
// This method will be called when constructing ProviderConfigurationListener and ServiceConfigurationListener // Complete the Listener's own subscription to the corresponding applications and services // After the subscription relationship is bound, take the initiative to obtain the corresponding configuration data from the dynamic configuration center to generate configurators, and then rewrite the providerUrl protected final void initWith(String key) { //What we get here is the registration center. We use zookeeper in most cases DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration(); // Added Listener and subscribed dynamicConfiguration.addListener(key, this); // Obtain the dynamic configuration data belonging to the current application from the configuration center ConfigCenter, and get the original data from zk (actively obtain data from the configuration center) String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP); // If there is application configuration information, the Configurator is generated according to the configuration information if (!StringUtils.isEmpty(rawConfig)) { genConfiguratorsFromRawRule(rawConfig); } } private boolean genConfiguratorsFromRawRule(String rawConfig) { boolean parseSuccess = true; try { // parseConfigurators will recognize app/service config automatically. // First convert the application or service configuration into a url, and then generate the corresponding Configurator according to the url configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig)) .orElse(configurators); } catch (Exception e) { logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " + rawConfig, e); parseSuccess = false; } return parseSuccess; }
Service listener listening source code
RegistryProtocol
private class ProviderConfigurationListener extends AbstractConfiguratorListener { public ProviderConfigurationListener() { //Subscription application name + ". configurators" here is the listening path of the new version of ProviderConfigurationListener this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); } /** * Get existing configuration rule and override provider url before exporting. * * @param providerUrl * @param <T> * @return */ private <T> URL overrideUrl(URL providerUrl) { // Modify / assemble providerUrl through configurators return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl); } @Override protected void notifyOverrides() { overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary()); } } private class ServiceConfigurationListener extends AbstractConfiguratorListener { private URL providerUrl; private OverrideListener notifyListener; public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) { this.providerUrl = providerUrl; this.notifyListener = notifyListener; // Subscription service interface name + group+version+".configurators" this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX); } private <T> URL overrideUrl(URL providerUrl) { return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl); } //This is the monitoring entrance @Override protected void notifyOverrides() { notifyListener.doOverrideIfNecessary(); } } public synchronized void doOverrideIfNecessary() { final Invoker<?> invoker; if (originInvoker instanceof InvokerDelegate) { invoker = ((InvokerDelegate<?>) originInvoker).getInvoker(); } else { invoker = originInvoker; } //The origin invoker is the original service provider URL of the current service, which has not been changed by any dynamic configuration URL originUrl = RegistryProtocol.this.getProviderUrl(invoker); String key = getCacheKey(originInvoker); ExporterChangeableWrapper<?> exporter = bounds.get(key); if (exporter == null) { logger.warn(new IllegalStateException("error state, exporter should not be null")); return; } //The current, may have been merged many times, the url of the current service to be exported before the event is triggered URL currentUrl = exporter.getInvoker().getUrl(); //Modify the url according to configurators. Configurators are full, not added or deleted, // Therefore, the modification is based on the original url, not the currentUrl. Here is the old version of configurators //Merged with this configuration URL newUrl = getConfigedInvokerUrl(configurators, originUrl); // This is a new version of configurators newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl); newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()) .getConfigurators(), newUrl); // If the modified url is different from the current url, re export it by newUrl if (!currentUrl.equals(newUrl)) { RegistryProtocol.this.reExport(originInvoker, newUrl); logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl); } } public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) { // Export according to newInvokerUrl // update local exporter ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl); // Get exact ProviderUrl // update registry URL registryUrl = getRegistryUrl(originInvoker); // For a service provider url, it will be simplified when registering in the registry final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl); //decide if we need to re-publish // Get ProviderInvokerWrapper according to getServiceKey ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker); // Generate a new ProviderInvokerWrapper ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); /** * Only if the new url going to Registry is different with the previous one should we do unregister and register. * If the simplified url of the new service provider url is not equal to the simplified url of the previous service provider url of the service, you need to register the new simplified service provider url with the registry */ if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) { unregister(registryUrl, providerInvokerWrapper.getProviderUrl()); register(registryUrl, registeredProviderUrl); newProviderInvokerWrapper.setReg(true); } exporter.setRegisterUrl(registeredProviderUrl); } private <T> ExporterChangeableWrapper doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) { String key = getCacheKey(originInvoker); final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { logger.warn(new IllegalStateException("error state, exporter should not be null")); } else { // Only here can we really understand why InvokerDelegate is needed // InvokerDelegate represents a caller, which is composed of invoker+url. The invoker remains unchanged and the url is variable final Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl); // Finally, we will go to the logic of DubboProtocol#export. We've seen it before exporter.setExporter(protocol.export(invokerDelegate)); } return exporter; }
Q: Here leads to a question: does Netty and tomcat need to be restarted after the configuration is changed?
A: No, why? The reset logic of DubboProtocol#export mentioned earlier
Service re export source code
DubboProtocol
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // ... omitted // Turn on NettyServer // Request -- > invocation -- > service key -- > exportermap.get (key) - > exporter -- > invoker -- > invoker. Invoke (invocation) - > execute service openServer(url); // Some special serialization mechanisms, such as kryo, provide a registration mechanism to register classes to improve the speed of serialization and deserialization optimizeSerialization(url); return exporter; } private void openServer(URL url) { // find server. String key = url.getAddress(); // Get the ip address and port, 192.168.40.17:20880 // NettyClient, NettyServer //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // Cache Server object ExchangeServer server = serverMap.get(key); // DCL,Double Check Lock if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // Create a Server and cache it serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override // When the service is re exported, it will go here and call HeaderExchangeServer#reset server.reset(url); } } }
HeaderExchangeServer
//This will be called when starting netty public HeaderExchangeServer(Server server) { Assert.notNull(server, "server == null"); this.server = server; // Start the Task that defines closing the Channel(socket) startIdleCheckTask(getUrl()); } private void startIdleCheckTask(URL url) { if (!server.canHandleIdle()) { // The underlying NettyServer has its own heartbeat mechanism, so the upper exchange server does not need to start the heartbeat task AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); int idleTimeout = getIdleTimeout(url); long idleTimeoutTick = calculateLeastDuration(idleTimeout); // Define the Task to close the Channel CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); this.closeTimerTask = closeTimerTask; // init task and start timer. // Run closeTimerTask regularly IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } } public void reset(URL url) { server.reset(url); try { int currHeartbeat = getHeartbeat(getUrl()); int currIdleTimeout = getIdleTimeout(getUrl()); int heartbeat = getHeartbeat(url); int idleTimeout = getIdleTimeout(url); /** * 1.Dynamically change the configuration. There is no need to restart netty,tomcat, etc. when re exporting the service * 2.Here, just close the channel task of the service and restart a task according to the new url */ if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) { cancelCloseTask(); startIdleCheckTask(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); } }
CloseTimerTask
@Override protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); Long now = now(); // check ping & pong at server // Indicates how long the Server has not read or written data. In other words, it has timed out if ((lastRead != null && now - lastRead > idleTimeout) || (lastWrite != null && now - lastWrite > idleTimeout)) { logger.warn("Close channel " + channel + ", because idleCheck timeout: " + idleTimeout + "ms"); channel.close(); } } catch (Throwable t) { logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t); } }
Service export source code process summary
-
ServiceBean.export() method is the export entry method. It will execute ServiceConfig.export() method to complete the service export. After the export, a Spring event servicebean exportedevent will be published
-
In the ServiceConfig.export() method, checkAndUpdateSubConfigs() will be called first. This method mainly completes the parameter refresh of AbstractConfig (obtaining parameters from the configuration center, etc.). AbstractConfig refers to ApplicationConfig, ProtocolConfig, ServiceConfig, etc. after the refresh, it will check whether the parameters such as stub, local, mock, etc. are configured correctly
-
After the parameter refresh and check are completed, the export service will be started. If delayed export is configured, the scheduled executorservice will be used for delayed export at the specified time
-
Otherwise, call doExport() to export the service
-
Continue calling doExportUrls() for service export
-
First, obtain the URL of the configured registry through the loadRegistries() method. There may be multiple configuration centers, so the currently exported services need to be registered in each configuration center. Here, the of the registry is expressed in the form of URL, what registry is used, the address and port of the registry, the parameters configured for the registry, etc, Will exist on the URL, which starts with * * registry: / / *
-
After obtaining the registry URLs from the registry, it will traverse all the protocolconfigs of the current service and call doexporturlsfor1protocol (protocol config, registry URLs); Method to export the current service to each registry according to each protocol
-
In the doExportUrlsFor1Protocol() method, a service URL will be constructed first, including
- Service agreement dubbo: / /,
- The IP and PORT of the service. If it is specified, the specified IP is taken. If it is not specified, the IP of the network card on the server is obtained,
- And the PATH of the service. If the PATH parameter is not specified, the interface name will be taken
- And the parameters of the service. The parameters include the parameters of the service and the parameters of a method in the service
- The resulting URL is similar: dubbo://192.168.1.110:20880/cn.imlql.DemoService?timeout=3000&&sayHello.loadbalance=random
-
After obtaining the service URL, the service URL will be added to the registryURL as a parameter, and then an Invoker proxy object will be generated from the registryURL, the service interface and the current service implementation class ref, and then the proxy object and the current ServiceConfig object will be wrapped into a DelegateProviderMetaDataInvoker object, DelegateProviderMetaDataInvoker represents a complete service
-
Next, you will use Protocol to export the service. After exporting, you will get an Exporter object (this Exporter object can be understood as mainly used to uninstall the service. When will the service be uninstalled? When will the Dubbo application be gracefully closed)
-
Next, let's take a detailed look at how Protocol exports services?
-
However, when calling the protocol. Export (wrappininvoker) method, because protocol is an Adaptive object of the protocol interface, a url will be obtained according to the genUrl method of wrappininvoker, and the corresponding extension point will be found according to the protocol of the url. At this time, the extension point is RegistryProtocol. However, because the protocol interface has two wrapper classes, one is ProtocolFilterWrapper ProtocolListenerWrapper. In fact, when calling the export method, it will go through the export methods of these two wrapper classes. However, the export methods of these two wrapper classes will be judged by the Registry protocol and will not do too much processing. Therefore, it will directly call the export (invoker < T > origininvoker) method of RegistryProtocol in the end
-
In the export (invoker < T > origininvoker) method of RegistryProtocol, the following tasks are completed:
-
Generate a listener to listen for changes in the parameter data of this service in the dynamic configuration center. Once a change is detected, rewrite the service URL, and rewrite the service URL first when exporting the service
-
After getting the URL after rewriting, call doLocalExport() to export the service. In this method, the export method of DubboProtocol will be used to export the service. After the export is successful, a ExporterChangeableWrapper will be obtained.
- In the export method of DubboProtocol, the main thing to do is to start NettyServer and set a series of requesthandlers so that they can be processed by these requesthandlers in turn when receiving requests
- These requesthandlers have been sorted out above
-
Get the implementation class of the registry from originInvoker, such as ZookeeperRegistry
-
Simplify the rewritten service URL and remove the parameters that do not need to be stored in the registry
-
Call the ZookeeperRegistry.registry() method to register the simplified service URL in the registry
-
Finally, the ExporterChangeableWrapper is encapsulated as a DestroyableExporter object and returned to complete the service export
-
Exporter architecture
After a service is exported successfully, the corresponding Exporter will be generated:
- DestroyableExporter: the outermost wrapper class of the Exporter. This class is mainly used to provide services corresponding to the nonexporter
- ExporterChangeableWrapper: this class is mainly responsible for removing the service URL from the registry and removing the dynamic configuration listener corresponding to the service before unexport ing the corresponding service
- ListenerExporterWrapper: this class is mainly responsible for removing the service export listener after the corresponding service is not exported
- DubboExporter: this class holds the Invoker object of the corresponding service and the unique flag of the current service. When NettyServer receives the request, it will find the DubboExporter object corresponding to the service according to the service information in the request, and then get the Invoker object from the object
Server Invoker architecture
- ProtocolFilterWrapper$CallbackRegistrationInvoker: it will call the lower level Invoker. After the lower level Invoker is executed, it will traverse the filter to see whether any filter implements the ListenableFilter interface. If so, it will call back the corresponding onResponse method, such as TimeoutFilter. After calling the lower level Invoker, the execution time of the service will be calculated
- The Invoker of the filter in ProtocolFilterWrapper$1:ProtocolFilterWrapper, using this Invoker, can execute the filter on the server side. After executing the filter, call the lower level Invoker.
- RegistryProtocol$InvokerDelegate: the delegate class of the service, which contains the DelegateProviderMetaDataInvoker object and the provider URL corresponding to the service. The underlying Invoker is called directly during execution
- DelegateProviderMetaDataInvoker: the delegate class of the service, which contains the AbstractProxyInvoker object and ServiceConfig object. The underlying Invoker is called directly during execution
- AbstractProxyInvoker: the proxy class of the service interface, which is bound with the corresponding implementation class. During execution, it will use reflection to call the specific method of the service implementation class instance and get the result