preface
Earlier, we studied the principle of RPC. There are many frameworks based on RPC, such as Dubbo. Today, we will deeply analyze Dubbo from its SPI mechanism, service registration and discovery source code and network communication process.
Dubbo architecture
summary
Dubbo is a high-performance and excellent open source service framework of Alibaba company, which enables applications to realize service output and input functions through high-performance RPC, and can be seamlessly integrated with Spring framework.
Dubbo is a high-performance and lightweight open source Java RPC framework. It provides three core capabilities: interface oriented remote method invocation, intelligent fault tolerance and load balancing, and automatic service registration and discovery.
Call process:
- The service container is responsible for starting, loading and running the service provider.
- When a service provider starts, it registers its services with the registry.
- When the service consumer starts, it subscribes to the service it needs from the registry.
- The registry returns the service provider address list to the consumer. If there is any change, the registry will push the change data to the consumer based on the long connection.
- From the provider address list, the service consumer selects one provider to call based on the soft load balancing algorithm. If the call fails, it selects another provider to call.
- Service consumers and providers accumulate call times and call times in memory, and regularly send statistical data to the monitoring center every minute.
Architecture system
Source code structure
- Dubbo common: common logic module: including Util class and general model
- dubbo remoting remote communication module: equivalent to the implementation of dubbo protocol. If RPC uses RMI protocol, this package is not required
- Dubbo RPC remote call module: Abstract various protocols and dynamic agents, including one-to-one calls, and do not care about the principle of cluster.
- Dubbo cluster cluster module: disguise multiple service providers as one provider, including load balancing, fault tolerance, routing, etc. the cluster address list can be statically configured or distributed by the registry
- Dubbo registry registry module: cluster mode based on registry distribution and abstraction of various registries
- Dubbo monitor monitoring module: count the number of service calls, call time, and services tracked by the call chain
- dubbo config configuration module: it is the external api of dubbo. Users use dubbo through config to hide all details of dubbo
- Dubbo container module: it is a standone container. It is started by simply loading spring in main. Because services usually do not need the characteristics of Web containers such as Tomcat/Jboss, it is not necessary to load services with web containers
Overall design
- In the figure, the light blue background on the left is the interface used by the service consumer, the light green background on the right is the interface used by the service provider, and the interface located on the central axis is the interface used by both sides.
- In the figure, there are ten layers from bottom to top. Each layer is unidirectional dependency. Each layer can be stripped and the upper layer can be reused. Among them, the Service and Config layers are API and the other layers are SPI.
- In the figure, the green block is the extension interface, and the blue block is the implementation class. Only the implementation classes used to associate each layer are shown in the figure.
- In the figure, the blue dotted line is the initialization process, that is, the assembly chain at startup, the red solid line is the method calling process, that is, the runtime timing chain, the purple triangular arrow is inheritance, the subclass can be regarded as the same node of the parent class, and the text on the line is the called method.
Description of each layer
- config configuration layer: external configuration interface. Centered on serviceconfig and referenceconfig, configuration classes can be initialized directly or generated by spring parsing configuration
- Proxy service proxy layer: transparent proxy of service interface, generating client Stub and server Skeleton of the service, with ServiceProxy as the center and ProxyFactory as the extension interface
- Registry registry layer: encapsulates the registration and discovery of service addresses, takes the service URL as the center, and the extension interfaces are registryfactory, registry and registryservice
- Cluster routing layer: encapsulates the routing and load balancing of multiple providers, bridges the registry, takes Invoker as the center, and extends the interfaces to cluster, directory, router, and loadbalance
- Monitor monitoring layer: RPC call times and call time monitoring. Statistics is the center, and the extended interfaces are monitorfactory, monitor and monitorservice
- Protocol remote call layer: encapsulates RPC calls, takes invocation and result as the center, and the extension interfaces are protocol, invoker and exporter
- Exchange information exchange layer: encapsulates the request response mode, from synchronous to asynchronous, with request and response as the center, and the extension interfaces are exchange, exchangechannel, exchangeclient and exchangeserver
- Transport network transport layer: Abstract mina and netty as unified interfaces, Message as the center, and extended interfaces as channel, transporter, client, server and codec
- serialize data serialization layer: some reusable tools. The extension interfaces are serialization, objectinput, objectoutput and ThreadPool
Call process
Compared with the overall architecture diagram above, it can be roughly divided into the following steps:
1. The service provider starts, starts the Netty service, creates a Zookeeper client, and registers the service with the registry.
2. The service consumer starts, obtains the list of service providers from the registry through Zookeeper, and establishes a long connection with the service provider through Netty.
3. The service consumer starts to call the service remotely through the interface. ProxyFactory initializes the Proxy object and Proxy creates the dynamic Proxy object.
4. The dynamic proxy object is wrapped layer by layer through the invoke method to generate an Invoker object, which contains the proxy object.
5. Invoker selects the most appropriate service provider through routing and load balancing. By adding various filters, the protocol layer wrapper generates a new DubboInvoker object.
6. Then, the DubboInvoker object is wrapped into a Reuqest object through exchange, which is transmitted to the NettyServer side of the service provider through serialization through NettyClient.
7. At the service provider side, a DubboExporter object is generated through deserialization, protocol decryption and other operations, and then passed layer by layer to generate an Invoker object at the service provider side
8. This Invoker object will call the local service to obtain the result, and then return it to the service consumer through layers of callback. After the service consumer obtains the result, it will parse it to obtain the final result.
SPI mechanism in Dubbo
What is SPI
summary
SPI is a very important module in Dubbo. Based on SPI, we can easily expand Dubbo. If you want to learn Dubbo's source code, you must understand the SPI mechanism. Next, let's first understand the usage of Java SPI and Dubbo SPI, and then analyze the source code of Dubbo SPI.
SPI is the abbreviation of Service Provider Interface. It is a service discovery mechanism. The essence of SPI is to define the fully qualified name of the implementation class of the interface in the configuration file, and the server reads the configuration file and loads the implementation class. In this way, you can dynamically replace the implementation class for the interface at run time.
SPI in JDK
Java SPI is actually a dynamic loading mechanism realized by the combination of "interface based programming + policy mode + configuration file".
Let's understand SPI through a case
Define an interface:
package com.laowang; /** * @author primary * @date 2021/3/27 * @since 1.0 **/ public interface User { String showName(); }
Define two implementation classes
package com.laowang.impl; import com.laowang.User; /** * @author primary * @date 2021/3/27 * @since 1.0 **/ public class Student implements User { @Override public String showName() { System.out.println("my name is laowang"); return null; } }
package com.laowang.impl; import com.laowang.User; /** * @author primary * @date 2021/3/27 * @since 1.0 **/ public class Teacher implements User { @Override public String showName() { System.out.println("my name is zhangsan"); return null; } }
Create a folder META-INF.services under the resources directory, and create a file com.laowang.User with the same name as the full path of User under this folder
Write the full pathnames of the two implementation classes in the file
Write test class:
package com.laowang; import java.util.ServiceLoader; /** * @author primary * @date 2021/3/27 * @since 1.0 **/ public class SpiTest { public static void main(String[] args) { ServiceLoader<User> serviceLoader = ServiceLoader.load(User.class); serviceLoader.forEach(User::showName); } }
Operation results:
We found that the SPI mechanism helped us automatically run two implementation classes.
Check the ServiceLoader source code:
In fact, by reading the full path class name of the implementation class in the configuration file, create an object through reflection and put it into the providers container.
Summary:
Call procedure
The application calls the ServiceLoader.load method to create a new ServiceLoader and instantiate the member variables in the class
The application obtains the object instance through the iterator interface. ServiceLoader first determines whether there is a cache instance object in the member variable providers object (LinkedHashMap < string, s > type). If there is a cache, it returns directly. If there is no cache, perform class loading,
advantage
The advantage of using Java SPI mechanism is to realize decoupling, so that the definition of interface is separated from the specific business implementation, rather than coupled together. The application process can enable or replace specific components according to the actual business situation.
shortcoming
Cannot load on demand. Although ServiceLoader does delayed loading, it can only be obtained by traversing all, that is, the implementation classes of the interface have to be loaded and instantiated. If you don't want to use some implementation classes, or some class instantiation is time-consuming, it is also loaded and instantiated, which is a waste.
The method of obtaining an implementation class is not flexible enough. It can only be obtained in the form of Iterator, and the corresponding implementation class cannot be obtained according to a parameter.
It is not safe for multiple concurrent threads to use instances of the ServiceLoader class.
Throw an exception that is not the real reason when the implementation class cannot be loaded, and the error is difficult to locate.
SPI in Dubbo
Dubbo does not use Java SPI, but re implements a more powerful SPI mechanism. The relevant logic of Dubbo SPI is encapsulated in the ExtensionLoader class. Through ExtensionLoader, we can load the specified implementation class.
Chestnuts
Different from Java SPI implementation class configuration, Dubbo SPI is configured through key value pairs, so that we can load the specified implementation class on demand. Let's demonstrate the usage of Dubbo SPI:
The configuration files required by Dubbo SPI need to be placed in the META-INF/dubbo path. Different from the Java SPI implementation class configuration, Dubbo SPI is configured through key value pairs. The configuration contents are as follows.
optimusPrime = org.apache.spi.OptimusPrime bumblebee = org.apache.spi.Bumblebee
When using Dubbo SPI, you need to mark @ SPI annotation on the interface.
@SPI public interface Robot { void sayHello(); }
Through ExtensionLoader, we can load the specified implementation class. The following is a demonstration of Dubbo SPI:
public class DubboSPITest { @Test public void sayHello() throws Exception { ExtensionLoader<Robot> extensionLoader = ExtensionLoader.getExtensionLoader(Robot.class); Robot optimusPrime = extensionLoader.getExtension("optimusPrime"); optimusPrime.sayHello(); Robot bumblebee = extensionLoader.getExtension("bumblebee"); bumblebee.sayHello(); } }
In addition to supporting on-demand loading of interface implementation classes, Dubbo SPI also adds features such as IOC and AOP, which will be introduced one by one in the next chapter of source code analysis.
Source code analysis
The getExtensionLoader method of ExtensionLoader obtains an ExtensionLoader instance, and then obtains the extension class object through the getExtension method of ExtensionLoader. Next, we take the getExtension method of ExtensionLoader as the entry to analyze the acquisition process of extended class objects in detail.
public T getExtension(String name) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException("Extension name == null"); } if ("true".equals(name)) { // Get the default extension implementation class return getDefaultExtension(); } // Holder, as the name suggests, is used to hold the target object from the container. If there is no direct new holder Holder<Object> holder = getOrCreateHolder(name); //Get target object instance Object instance = holder.get(); // If the target object instance is null, you need to create an instance through double checking if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { // Create extension instance instance = createExtension(name); // Set instance to holder holder.set(instance); } } } return (T) instance; }
The logic of the above code is relatively simple. First, check the cache. If the cache misses, create an extension object. Let's take a look at the process of creating extension objects.
private T createExtension(String name) { // Load all extension classes from the configuration file to get the mapping relationship table from "configuration item name" to "configuration class" Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { //Get the corresponding instance object from the container. If it does not exist, it will be created through reflection T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { // Create instances by reflection EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } // Inject dependencies into the instance. The following is the implementation of IOC and AOP injectExtension(instance); Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (CollectionUtils.isNotEmpty(wrapperClasses)) { // Loop to create Wrapper instance for (Class<?> wrapperClass : wrapperClasses) { // Pass the current instance as a parameter to the constructor of the Wrapper, and create the Wrapper instance through reflection. // Then inject dependencies into the Wrapper instance, and finally assign the Wrapper instance to the instance variable again instance = injectExtension( (T) wrapperClass.getConstructor(type).newInstance(instance)); } }
The logic of the createExtension method is slightly more complex, including the following steps:
- Get all extension classes through getExtensionClasses
- Creating extended objects through reflection
- Injecting dependencies into extended objects
Wrap the extension object in the corresponding Wrapper object
Among the above steps, the first step is the key to loading the extension class, and the third and fourth steps are the specific implementation of Dubbo IOC and AOP. Due to the large number of design source codes, here is a brief summary of the whole execution logic of ExtensionLoader:
getExtension(String name) #Get the extension object according to the key -->createExtension(String name) #Create extension instance -->getExtensionClasses #Get all extension classes according to the path -->loadExtensionClasses #Load extension class -->cacheDefaultExtensionName #Parse @ SPI annotation -->loadDirectory #Method to load the specified folder configuration file -->loadResource #load resources -->loadClass #Load the class and cache the class through the loadClass method
How does Dubbo's SPI implement IOC and AOP
Dubbo IOC
Dubbo IOC injects dependencies through the setter method. Dubbo first obtains all the methods of the instance through reflection, and then traverses the method list to detect whether the method name has the characteristics of setter method. If yes, obtain the dependent object through ObjectFactory, and finally call the setter method through reflection to set the dependency into the target object. The codes corresponding to the whole process are as follows:
private T injectExtension(T instance) { try { if (objectFactory != null) { //Get all methods of the instance for (Method method : instance.getClass().getMethods()) { //What isSetter does: check whether the method starts with set, the method has only one parameter, and the method access level is public if (isSetter(method)) { /** * Check {@link DisableInject} to see if we need auto injection for this property */ if (method.getAnnotation(DisableInject.class) != null) { continue; } Class<?> pt = method.getParameterTypes()[0]; if (ReflectUtils.isPrimitives(pt)) { continue; } try { String property = getSetterProperty(method); //Get dependent objects Object object = objectFactory.getExtension(pt, property); if (object != null) { //set a property method.invoke(instance, object); } } catch (Exception e) { logger.error("Failed to inject via method " + method.getName() + " of interface " + type.getName() + ": " + e.getMessage(), e); } } } } } catch (Exception e) { logger.error(e.getMessage(), e); } return instance; }
Dubbo Aop
Before we say this, we need to know the decorator mode
Decorator mode: dynamically attach responsibility to an object without changing the original class file and using inheritance, so as to dynamically expand the function of an object. It wraps the real object by creating a wrapper object, that is, decoration.
When using Spring, we often use the AOP function. Insert additional logic before and after the method of the target class. For example, Spring AOP is usually used to realize logging, monitoring, authentication and other functions. Does Dubbo's extension mechanism support similar functions? The answer is yes. In Dubbo, there is a special class called Wrapper class. Through decorator mode, wrap the original extension point instance with a Wrapper class. Insert other logic before and after the implementation of the original extension point to realize the AOP function.
Generally speaking, the decorator mode has the following participants:
Component: the common parent class of decorator and decoratee. It is an interface or abstract class used to define basic behavior
ConcreteComponent: defines a concrete object, that is, the decorator
Decorator: Abstract decorator, which inherits from Component and extends ConcreteComponent from external classes. For ConcreteComponent, you don't need to know the existence of decorator. Decorator is an interface or abstract class
ConcreteDecorator: concrete decorator, used to extend ConcreteComponent
//Get all classes that need to be wrapped Set<Class<?>> wrapperClasses = cachedWrapperClasses;
Let's see what cachedWrapperClasses are?
private Set<Class<?>> cachedWrapperClasses;
Is a set set. When did the set add elements?
/** * cache wrapper class * <p> * like: ProtocolFilterWrapper, ProtocolListenerWrapper */ private void cacheWrapperClass(Class<?> clazz) { if (cachedWrapperClasses == null) { cachedWrapperClasses = new ConcurrentHashSet<>(); } cachedWrapperClasses.add(clazz); }
Added through this method, and then see who called this private method:
/** * test if clazz is a wrapper class * <p> * which has Constructor with given class type as its only argument */ private boolean isWrapperClass(Class<?> clazz) { try { clazz.getConstructor(type); return true; } catch (NoSuchMethodException e) { return false; } }
Originally, the isWrapperClass method was used to determine whether the object is held in the constructor of other objects. If so, dubbo considers it a decorator class, calls the constructor of the decorator class, and returns the instance object
Then replace the class to be loaded by instantiating the wrapper class. The method executed in this way is the method of the wrapper class.
Dynamic compilation in Dubbo
We know that in Dubbo, many extensions are loaded through SPI mechanism, such as Protocol, Cluster, LoadBalance, ProxyFactory, etc. Sometimes, some extensions do not want to be loaded at the framework startup stage, but want to be loaded according to the runtime parameters when the extension method is called, that is, dynamically load the implementation class according to the parameters.
At runtime, the specific extension is dynamically determined according to the method parameters. In Dubbo, it is called the extension point adaptive instance. It is actually an extension point proxy that delays the selection of extension from Dubbo startup to RPC call. Each extension point in Dubbo has an adaptive class. If it is not explicitly provided, Dubbo will automatically create one for us. Javaassist is used by default.
The implementation logic of adaptive expansion mechanism is as follows
- Firstly, Dubbo will generate code with proxy function for the extended interface;
- Compile this code through javassist or jdk to get the Class class;
- Create proxy classes through reflection;
- In the proxy class, determine which implementation class to call through the parameters of the URL object;
javassist
Javassist is an open source class library for analyzing, editing and creating Java bytecode. It was founded by Shigeru Chiba of the Department of mathematics and computer science of Tokyo University of technology. It has joined the open source JBoss application server project to implement a dynamic AOP framework for JBoss by using javassist to operate on bytecode. Javassist is a sub project of JBoss. Its main advantage is that it is simple and fast. Directly use the form of java coding without understanding the virtual machine instructions, you can dynamically change the class structure or dynamically generate classes.
/** * Javassist Is an open source class library for analyzing, editing and creating Java bytecode * Can dynamically change the structure of a class, or dynamically generate a class */ public class CompilerByJavassist { public static void main(String[] args) throws Exception { // ClassPool: class object container ClassPool pool = ClassPool.getDefault(); // Generate a User class through ClassPool CtClass ctClass = pool.makeClass("com.itheima.domain.User"); // Add attribute -- private String username CtField enameField = new CtField(pool.getCtClass("java.lang.String"), "username", ctClass); enameField.setModifiers(Modifier.PRIVATE); ctClass.addField(enameField); // Add attribute -- private int age CtField enoField = new CtField(pool.getCtClass("int"), "age", ctClass); enoField.setModifiers(Modifier.PRIVATE); ctClass.addField(enoField); //Add method ctClass.addMethod(CtNewMethod.getter("getUsername", enameField)); ctClass.addMethod(CtNewMethod.setter("setUsername", enameField)); ctClass.addMethod(CtNewMethod.getter("getAge", enoField)); ctClass.addMethod(CtNewMethod.setter("setAge", enoField)); // Parameterless constructor CtConstructor constructor = new CtConstructor(null, ctClass); constructor.setBody("{}"); ctClass.addConstructor(constructor); // Add constructor //ctClass.addConstructor(new CtConstructor(new CtClass[] {}, ctClass)); CtConstructor ctConstructor = new CtConstructor(new CtClass[] {pool.get(String.class.getName()),CtClass.intType}, ctClass); ctConstructor.setBody("{\n this.username=$1; \n this.age=$2;\n}"); ctClass.addConstructor(ctConstructor); // Add custom method CtMethod ctMethod = new CtMethod(CtClass.voidType, "printUser",new CtClass[] {}, ctClass); // Set modifiers for custom methods ctMethod.setModifiers(Modifier.PUBLIC); // Set function body for custom method StringBuffer buffer2 = new StringBuffer(); buffer2.append("{\nSystem.out.println(\"User information is as follows\");\n") .append("System.out.println(\"user name=\"+username);\n") .append("System.out.println(\"Age=\"+age);\n").append("}"); ctMethod.setBody(buffer2.toString()); ctClass.addMethod(ctMethod); //Generate a class Class<?> clazz = ctClass.toClass(); Constructor cons2 = clazz.getDeclaredConstructor(String.class,Integer.TYPE); Object obj = cons2.newInstance("itheima",20); //Reflection execution method obj.getClass().getMethod("printUser", new Class[] {}) .invoke(obj, new Object[] {}); // Write the generated class file to the file byte[] byteArr = ctClass.toBytecode(); FileOutputStream fos = new FileOutputStream(new File("D://User.class")); fos.write(byteArr); fos.close(); } }
Source code analysis
Adaptive annotation
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive { String[] value() default {}; }
Adaptive can be annotated on classes or methods.
Label on class: Dubbo does not generate proxy classes for this class.
Marked on the method: Dubbo will generate proxy logic for the method, indicating that the current method needs to call the corresponding extension point implementation according to the parameter URL.
Each extension point in Dubbo has an adaptive class. If it is not explicitly provided, Dubbo will automatically create one for us. Javaassist is used by default. Let's take a look at the code to create an adaptive extension class
//1. Let's take a look at the acquisition method of extensionLoader ExtensionLoader<Robot>extensionLoader=ExtensionLoader.getExtensionLoader(Robot.class); //2. The final call is the construction method of ExtensionLoader private ExtensionLoader(Class<?> type) { this.type = type; objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); } //3. getAdaptiveExtension() to see what you've done public T getAdaptiveExtension() { //Get the adaptive extension class. If not, start initializing one Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if (createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { //An adaptive extension class is created here instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; } //Look at createadaptive extension() private T createAdaptiveExtension() { try { return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e); } } //Then go to getAdaptiveExtensionClass() private Class<?> getAdaptiveExtensionClass() { getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); } //Continue to catch up with createadaptive extensionclass() private Class<?> createAdaptiveExtensionClass() { String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate(); ClassLoader classLoader = findClassLoader(); org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); } //Look at compiler @SPI("javassist") public interface Compiler { /** * Compile java source code. * * @param code Java source code * @param classLoader classloader * @return Compiled class */ Class<?> compile(String code, ClassLoader classLoader); } //In fact, I know from here that by generating a string of a class, and then generating an object through javassist
The createadaptive extensionclasscode () method uses a StringBuilder to build the Java source code of the adaptive class. The method implementation is relatively long, so the code will not be pasted here. This way of generating bytecode is also very interesting. You can generate java source code, compile it, and load it into the jvm. In this way, you can better control the generated Java classes. Moreover, there is no need to care about the api of each bytecode generation framework. Because the xxx.java file is common to Java and we are most familiar with it. But the readability of the code is not strong. You need to build the content of xx.java bit by bit.
Service exposure and discovery
Service exposure
Noun interpretation
In Dubbo's core domain model:
- Invoker is an entity domain. It is the core model of Dubbo. Other models rely on it or convert it. It represents an executable and can initiate invoke calls to it. It may be a local implementation, a remote implementation or a cluster implementation. At the service provider, invoker is used to call the service provider class. On the service consumer, invoker is used to perform remote calls.
- Protocol is a service domain. It is the main function entrance exposed and referenced by Invoker. It is responsible for the lifecycle management of Invoker.
export: expose remote services
refer: refers to a remote service - proxyFactory: get the proxy class of an interface
getInvoker: for the server side, wrap the service object, such as DemoServiceImpl, into an Invoker object
getProxy: for the client side, create a proxy object of the interface, such as the interface of DemoService. - Invocation is a session domain that holds variables in the calling process, such as method name, parameters, etc
Overall process
Before discussing the details of service exposure in detail, let's take a look at the overall service exposure principle of duubo
On the whole, the service exposure of Dubbo framework is divided into two parts. The first step is to convert the held service instances into invokers through agents, and the second step is to convert the invokers into exporters through specific protocols (such as Dubbo). This abstraction of the framework also greatly facilitates function expansion.
The service provider exposes the blue initialization chain of the service. The sequence diagram is as follows:
Source code analysis
The entry method of service export is onApplicationEvent of ServiceBean. onApplicationEvent is an event response method that performs the service export operation after receiving the Spring context refresh event. The method code is as follows:
@Override public void onApplicationEvent(ContextRefreshedEvent event) { if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } }
Finally find the doExportUrls() method through export
private void doExportUrls() { //All registries in the configuration file are loaded and encapsulated as a list of URL objects inside dubbo List<URL> registryURLs = loadRegistries(true); //Cycle all protocol configurations and initiate registration in the registry according to different protocols for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); //Service exposure method doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
The doExportUrlsFor1Protocol() method code is much older. We only focus on the core of the relationship
... if (!SCOPE_NONE.equalsIgnoreCase(scope)) { //Local exposure, and record the service data to the local JVM if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } //Remote exposure, sending data to the registry if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (!isOnlyInJvm() && logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // Generate Invoker for service provider class (ref) Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker is used to hold Invoker and ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // Export the service and generate the Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { //No registry exists, only export services .... } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url);
The above code determines the service export method according to the scope parameter in the url, as follows:
scope = none, do not export service
scope != remote, export to local
scope != local, export to remote
Whether exported locally or remotely. Before exporting services, you need to create an invoker, which is a very important step. Therefore, let's first analyze the creation process of invoker. Invoker is created from ProxyFactory. Dubbo's default ProxyFactory implementation class is JavassistProxyFactory. Let's explore the creation process of invoker in the JavassistProxyFactory code. As follows:
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // Create a warpper for the target class final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //Create an anonymous invoker object and implement the doinvoke method return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // Call the invokeMethod method of the Wrapper, and the invokeMethod will eventually call the target method return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
After the Invoke is created successfully, let's look at the local export
/** * always export injvm */ private void exportLocal(URL url) { URL local = URLBuilder.from(url) .setProtocol(LOCAL_PROTOCOL) // Set the protocol header to injvm .setHost(LOCALHOST_VALUE)//Local ip:127.0.0.1 .setPort(0) .build(); // Create an Invoker and export the service. The protocol here will call the export method of InjvmProtocol at runtime Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local); }
The exportLocal method is relatively simple. First, determine whether to export the service according to the URL protocol header. To export, create a new URL and set the protocol header, hostname, and port to the new values. Then create an Invoker and call the export method of InjvmProtocol to export the service. Let's take a look at what the export method of InjvmProtocol does.
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
As mentioned above, the export method of the InjvmProtocol creates only one InjvmExporter without any other logic. After exporting the service to the local, the analysis is finished.
Take another look at exporting services to remote
Next, we continue to analyze the process of exporting services to remote. Exporting services to remote includes two processes: service export and service registration. First, analyze the service export logic. We turn our attention to the export method of RegistryProtocol.
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // Get registry URL URL registryUrl = getRegistryUrl(originInvoker); URL providerUrl = getProviderUrl(originInvoker); final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //Export service final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // Load the Registry implementation class according to the URL, such as ZookeeperRegistry final Registry registry = getRegistry(originInvoker); //Get the registered service provider URL, final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // Register services with the registry register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Subscribe to the registry for override data registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); // Create and return to the DestroyableExporter return new DestroyableExporter<>(exporter); }
The above code looks complex. It mainly does the following operations:
- Call doLocalExport export service
- Register services with the registry
- Subscribe to the registry for override data
- Create and return to the DestroyableExporter
Look what doLocalExport did
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); //Protocol is related to the configured protocol (dubbo: DubboProtocol) return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
Next, we will focus on the export method of protocol. Assuming that the runtime protocol is dubbo, the protocol variable here will load DubboProtocol at runtime and call the export method of DubboProtocol.
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. Obtain the service ID and interpret it as service coordinates. It consists of service group name, service name, service version number and port. For example: demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 String key = serviceKey(url); //Create DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //key: interface (DemoService) //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //Start service openServer(url); //Optimized sequencer optimizeSerialization(url); return exporter; }
As mentioned above, we focus on the creation of DubboExporter and the openServer method. It doesn't matter if we don't understand other logic, and it doesn't affect the understanding of the service export process. The openServer method is analyzed below.
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { //Access cache ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //Create server instance serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }
Next, analyze the creation process of the server instance. as follows
private ExchangeServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); // Check whether the Transporter extension represented by the server parameter exists through SPI, and throw an exception if it does not exist if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { // Create exchange server server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // Get the client parameter. You can specify netty, mina str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { // Get the collection of all Transporter implementation class names, such as supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // Detect that the name of the Transporter implementation class currently supported by Dubbo is in the list, // Whether to include the Transporter represented by the client. If not, an exception will be thrown if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
As mentioned above, createServer contains three core logic.
The first is to check whether the Transporter extension represented by the server parameter exists. If it does not exist, an exception will be thrown.
The second is to create a server instance.
The third is to check whether the Transporter extension represented by the client parameter is supported. If it does not exist, an exception is thrown. The code corresponding to the two detection operations is more straightforward, so there is no need to say more. However, the operation of creating a server is not very clear. Let's continue to look at it.
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // Get the exchange. The default is HeaderExchanger. // Next, call the bind method of HeaderExchanger to create an exchange server instance return getExchanger(url).bind(url, handler); }
The above code is relatively simple, so I won't say more. Let's take a look at the bind method of HeaderExchanger.
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // Create a HeaderExchangeServer instance. This method contains multiple logic, as follows: // 1. new HeaderExchangeHandler(handler) // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))})); }
HeaderExchanger's bind method contains a lot of logic, but at present, we only need to care about the logic of Transporters' bind method
Just edit it. The code of this method is as follows:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } else if (handlers != null && handlers.length != 0) { Object handler; if (handlers.length == 1) { handler = handlers[0]; } else { // If the number of handlers elements is greater than 1, a ChannelHandler distributor is created handler = new ChannelHandlerDispatcher(handlers); } // Get the adaptive Transporter instance and call the instance method return getTransporter().bind(url, (ChannelHandler)handler); } else { throw new IllegalArgumentException("handlers == null"); } }
As mentioned above, the Transporter obtained by the getTransporter() method is dynamically created at runtime. The class name is TransporterAdaptive, that is, the adaptive extension class. TransporterAdaptive will determine what type of Transporter to load at runtime according to the incoming URL parameters. The default is NettyTransporter. Call the NettyTransporter.bind(URL,ChannelHandler) method. Create a NettyServer instance. By calling the NettyServer.doOPen() method, the server is turned on and the service is exposed.
Service registration
The content of this section takes Zookeeper registration center as the analysis target, and other types of registration centers can be analyzed by yourself. Now register from the service
Starting with the analysis of the entry method of, we turn our attention to the export method of RegistryProtocol again. As follows:
Enter the register() method
public void register(URL registryUrl, URL registeredProviderUrl) { //Get registry instance Registry registry = registryFactory.getRegistry(registryUrl); //Register registry.register(registeredProviderUrl); }
Look at the getRegistry() method
@Override public Registry getRegistry(URL url) { url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); String key = url.toServiceStringWithoutResolving(); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //create registry by spi/ioc registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } }
Enter the createRegistry() method
@Override public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } ////Gets the group name, which defaults to dubbo String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; } this.root = group; // Create a Zookeeper client, which is curatorzookeeper transporter by default zkClient = zookeeperTransporter.connect(url); // Add status listener zkClient.addStateListener(state -> { if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } }); }
In the above code, we focus on the connect method call of ZookeeperTransporter, which is used to create
Zookeeper client. Creating a zookeeper client means that the creation process of the registry is over.
After understanding the essence of service registration, we can read the code of service registration.
public void doRegister(URL url) { try { // Create a node through the Zookeeper client. The node path is generated by the toUrlPath method. The path format is as follows: // /${group}/${serviceInterface}/providers/${url} // For example / dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
@Override public void create(String path, boolean ephemeral) { if (!ephemeral) { // If the node type to be created is not a temporary node, check whether the node exists here if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { // Recursively create the upper level path create(path.substring(0, i), false); } // Create temporary or persistent nodes based on the value of ephemeral if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
OK, now the process of service registration is analyzed. The whole process can be summarized as follows: first create a registry instance, and then register the service through the registry instance.
summary
- When there is a registry and the provider address needs to be registered, the URL format parsed by ServiceConfig is: Registry: / / registry host / org. Apache. Dubbo. Registry. Registryservice? Export = URL. Encode(“ dubbo://service-host/ {service name} / {version number} ")
- Based on the adaptive mechanism of Dubbo SPI, the RegistryProtocol#export() method is called after the URL registry: / / protocol header is identified
- Wrap the specific service class name, such as DubboServiceRegistryImpl, into an Invoker instance through ProxyFactory
- Call doLocalExport method, use DubboProtocol to convert Invoker into Exporter instance, and open Netty server to listen to customer requests
- Create a Registry instance, connect to Zookeeper, write the URL address of the provider under the service node, and register the service
- Subscribe to the registry for override data and return an Exporter instance
- According to the in the URL format“ dubbo://service-host/ The {service name} / {version number} "is identified by the negotiation header Dubbo: / / and calls the DubboProtocol#export() method to develop the service port
- The Exporter instance returned by RegistryProtocol#export() is stored in the list < Exporter > exporters of ServiceConfig