Nearly 20000 words of Dubbo principle analysis, thoroughly understand Dubbo

preface

Earlier, we studied the principle of RPC. There are many frameworks based on RPC, such as Dubbo. Today, we will deeply analyze Dubbo from its SPI mechanism, service registration and discovery source code and network communication process.

Dubbo architecture

summary

Dubbo is a high-performance and excellent open source service framework of Alibaba company, which enables applications to realize service output and input functions through high-performance RPC, and can be seamlessly integrated with Spring framework.
Dubbo is a high-performance and lightweight open source Java RPC framework. It provides three core capabilities: interface oriented remote method invocation, intelligent fault tolerance and load balancing, and automatic service registration and discovery.

Call process:

  1. The service container is responsible for starting, loading and running the service provider.
  2. When a service provider starts, it registers its services with the registry.
  3. When the service consumer starts, it subscribes to the service it needs from the registry.
  4. The registry returns the service provider address list to the consumer. If there is any change, the registry will push the change data to the consumer based on the long connection.
  5. From the provider address list, the service consumer selects one provider to call based on the soft load balancing algorithm. If the call fails, it selects another provider to call.
  6. Service consumers and providers accumulate call times and call times in memory, and regularly send statistical data to the monitoring center every minute.

Architecture system

Source code structure

  • Dubbo common: common logic module: including Util class and general model
  • dubbo remoting remote communication module: equivalent to the implementation of dubbo protocol. If RPC uses RMI protocol, this package is not required
  • Dubbo RPC remote call module: Abstract various protocols and dynamic agents, including one-to-one calls, and do not care about the principle of cluster.
  • Dubbo cluster cluster module: disguise multiple service providers as one provider, including load balancing, fault tolerance, routing, etc. the cluster address list can be statically configured or distributed by the registry
  • Dubbo registry registry module: cluster mode based on registry distribution and abstraction of various registries
  • Dubbo monitor monitoring module: count the number of service calls, call time, and services tracked by the call chain
  • dubbo config configuration module: it is the external api of dubbo. Users use dubbo through config to hide all details of dubbo
  • Dubbo container module: it is a standone container. It is started by simply loading spring in main. Because services usually do not need the characteristics of Web containers such as Tomcat/Jboss, it is not necessary to load services with web containers

Overall design

  • In the figure, the light blue background on the left is the interface used by the service consumer, the light green background on the right is the interface used by the service provider, and the interface located on the central axis is the interface used by both sides.
  • In the figure, there are ten layers from bottom to top. Each layer is unidirectional dependency. Each layer can be stripped and the upper layer can be reused. Among them, the Service and Config layers are API and the other layers are SPI.
  • In the figure, the green block is the extension interface, and the blue block is the implementation class. Only the implementation classes used to associate each layer are shown in the figure.
  • In the figure, the blue dotted line is the initialization process, that is, the assembly chain at startup, the red solid line is the method calling process, that is, the runtime timing chain, the purple triangular arrow is inheritance, the subclass can be regarded as the same node of the parent class, and the text on the line is the called method.

Description of each layer

  • config configuration layer: external configuration interface. Centered on serviceconfig and referenceconfig, configuration classes can be initialized directly or generated by spring parsing configuration
  • Proxy service proxy layer: transparent proxy of service interface, generating client Stub and server Skeleton of the service, with ServiceProxy as the center and ProxyFactory as the extension interface
  • Registry registry layer: encapsulates the registration and discovery of service addresses, takes the service URL as the center, and the extension interfaces are registryfactory, registry and registryservice
  • Cluster routing layer: encapsulates the routing and load balancing of multiple providers, bridges the registry, takes Invoker as the center, and extends the interfaces to cluster, directory, router, and loadbalance
  • Monitor monitoring layer: RPC call times and call time monitoring. Statistics is the center, and the extended interfaces are monitorfactory, monitor and monitorservice
  • Protocol remote call layer: encapsulates RPC calls, takes invocation and result as the center, and the extension interfaces are protocol, invoker and exporter
  • Exchange information exchange layer: encapsulates the request response mode, from synchronous to asynchronous, with request and response as the center, and the extension interfaces are exchange, exchangechannel, exchangeclient and exchangeserver
  • Transport network transport layer: Abstract mina and netty as unified interfaces, Message as the center, and extended interfaces as channel, transporter, client, server and codec
  • serialize data serialization layer: some reusable tools. The extension interfaces are serialization, objectinput, objectoutput and ThreadPool

Call process

Compared with the overall architecture diagram above, it can be roughly divided into the following steps:

1. The service provider starts, starts the Netty service, creates a Zookeeper client, and registers the service with the registry.

2. The service consumer starts, obtains the list of service providers from the registry through Zookeeper, and establishes a long connection with the service provider through Netty.

3. The service consumer starts to call the service remotely through the interface. ProxyFactory initializes the Proxy object and Proxy creates the dynamic Proxy object.

4. The dynamic proxy object is wrapped layer by layer through the invoke method to generate an Invoker object, which contains the proxy object.

5. Invoker selects the most appropriate service provider through routing and load balancing. By adding various filters, the protocol layer wrapper generates a new DubboInvoker object.

6. Then, the DubboInvoker object is wrapped into a Reuqest object through exchange, which is transmitted to the NettyServer side of the service provider through serialization through NettyClient.

7. At the service provider side, a DubboExporter object is generated through deserialization, protocol decryption and other operations, and then passed layer by layer to generate an Invoker object at the service provider side

8. This Invoker object will call the local service to obtain the result, and then return it to the service consumer through layers of callback. After the service consumer obtains the result, it will parse it to obtain the final result.

SPI mechanism in Dubbo

What is SPI

summary

SPI is a very important module in Dubbo. Based on SPI, we can easily expand Dubbo. If you want to learn Dubbo's source code, you must understand the SPI mechanism. Next, let's first understand the usage of Java SPI and Dubbo SPI, and then analyze the source code of Dubbo SPI.

SPI is the abbreviation of Service Provider Interface. It is a service discovery mechanism. The essence of SPI is to define the fully qualified name of the implementation class of the interface in the configuration file, and the server reads the configuration file and loads the implementation class. In this way, you can dynamically replace the implementation class for the interface at run time.

SPI in JDK

Java SPI is actually a dynamic loading mechanism realized by the combination of "interface based programming + policy mode + configuration file".

Let's understand SPI through a case

Define an interface:

package com.laowang;

/**
 * @author primary
 * @date 2021/3/27
 * @since 1.0
 **/
public interface User {

    String showName();
}

Define two implementation classes

package com.laowang.impl;

import com.laowang.User;

/**
 * @author primary
 * @date 2021/3/27
 * @since 1.0
 **/
public class Student implements User {
    @Override
    public String showName() {
        System.out.println("my name is laowang");
        return null;
    }
}
package com.laowang.impl;

import com.laowang.User;

/**
 * @author primary
 * @date 2021/3/27
 * @since 1.0
 **/
public class Teacher implements User {
    @Override
    public String showName() {
        System.out.println("my name is zhangsan");
        return null;
    }
}

Create a folder META-INF.services under the resources directory, and create a file com.laowang.User with the same name as the full path of User under this folder

Write the full pathnames of the two implementation classes in the file

Write test class:

package com.laowang;

import java.util.ServiceLoader;

/**
 * @author primary
 * @date 2021/3/27
 * @since 1.0
 **/
public class SpiTest {
    public static void main(String[] args) {
        ServiceLoader<User> serviceLoader = ServiceLoader.load(User.class);
        serviceLoader.forEach(User::showName);
    }
}

Operation results:

We found that the SPI mechanism helped us automatically run two implementation classes.

Check the ServiceLoader source code:

In fact, by reading the full path class name of the implementation class in the configuration file, create an object through reflection and put it into the providers container.

Summary:

Call procedure
The application calls the ServiceLoader.load method to create a new ServiceLoader and instantiate the member variables in the class
The application obtains the object instance through the iterator interface. ServiceLoader first determines whether there is a cache instance object in the member variable providers object (LinkedHashMap < string, s > type). If there is a cache, it returns directly. If there is no cache, perform class loading,
advantage
The advantage of using Java SPI mechanism is to realize decoupling, so that the definition of interface is separated from the specific business implementation, rather than coupled together. The application process can enable or replace specific components according to the actual business situation.
shortcoming
Cannot load on demand. Although ServiceLoader does delayed loading, it can only be obtained by traversing all, that is, the implementation classes of the interface have to be loaded and instantiated. If you don't want to use some implementation classes, or some class instantiation is time-consuming, it is also loaded and instantiated, which is a waste.
The method of obtaining an implementation class is not flexible enough. It can only be obtained in the form of Iterator, and the corresponding implementation class cannot be obtained according to a parameter.
It is not safe for multiple concurrent threads to use instances of the ServiceLoader class.
Throw an exception that is not the real reason when the implementation class cannot be loaded, and the error is difficult to locate.

SPI in Dubbo

Dubbo does not use Java SPI, but re implements a more powerful SPI mechanism. The relevant logic of Dubbo SPI is encapsulated in the ExtensionLoader class. Through ExtensionLoader, we can load the specified implementation class.

Chestnuts

Different from Java SPI implementation class configuration, Dubbo SPI is configured through key value pairs, so that we can load the specified implementation class on demand. Let's demonstrate the usage of Dubbo SPI:
The configuration files required by Dubbo SPI need to be placed in the META-INF/dubbo path. Different from the Java SPI implementation class configuration, Dubbo SPI is configured through key value pairs. The configuration contents are as follows.

optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee

When using Dubbo SPI, you need to mark @ SPI annotation on the interface.

@SPI
public interface Robot {
void sayHello();
}

Through ExtensionLoader, we can load the specified implementation class. The following is a demonstration of Dubbo SPI:

public class DubboSPITest {
   @Test
   public void sayHello() throws Exception {
       ExtensionLoader<Robot> extensionLoader =
           ExtensionLoader.getExtensionLoader(Robot.class);
       Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
       optimusPrime.sayHello();
       Robot bumblebee = extensionLoader.getExtension("bumblebee");
       bumblebee.sayHello();
   }
}

In addition to supporting on-demand loading of interface implementation classes, Dubbo SPI also adds features such as IOC and AOP, which will be introduced one by one in the next chapter of source code analysis.

Source code analysis

The getExtensionLoader method of ExtensionLoader obtains an ExtensionLoader instance, and then obtains the extension class object through the getExtension method of ExtensionLoader. Next, we take the getExtension method of ExtensionLoader as the entry to analyze the acquisition process of extended class objects in detail.

public T getExtension(String name) {
       if (StringUtils.isEmpty(name)) {
           throw new IllegalArgumentException("Extension name == null");
       }
       if ("true".equals(name)) {
            // Get the default extension implementation class
           return getDefaultExtension();
       }
       // Holder, as the name suggests, is used to hold the target object from the container. If there is no direct new holder
       Holder<Object> holder = getOrCreateHolder(name);
       //Get target object instance
       Object instance = holder.get();
        // If the target object instance is null, you need to create an instance through double checking
       if (instance == null) {
           synchronized (holder) {
               instance = holder.get();
               if (instance == null) {
                   // Create extension instance
                   instance = createExtension(name);
                   // Set instance to holder
                   holder.set(instance);
               }
           }
       }
       return (T) instance;
   }

The logic of the above code is relatively simple. First, check the cache. If the cache misses, create an extension object. Let's take a look at the process of creating extension objects.

private T createExtension(String name) {
   // Load all extension classes from the configuration file to get the mapping relationship table from "configuration item name" to "configuration class"
   Class<?> clazz = getExtensionClasses().get(name);
   if (clazz == null) {
       throw findException(name);
   }
   try {
       //Get the corresponding instance object from the container. If it does not exist, it will be created through reflection
       T instance = (T) EXTENSION_INSTANCES.get(clazz);
       if (instance == null) {
           // Create instances by reflection
           EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
           instance = (T) EXTENSION_INSTANCES.get(clazz);
       }
       // Inject dependencies into the instance. The following is the implementation of IOC and AOP
       injectExtension(instance);
       Set<Class<?>> wrapperClasses = cachedWrapperClasses;
       if (CollectionUtils.isNotEmpty(wrapperClasses)) {
             // Loop to create Wrapper instance
           for (Class<?> wrapperClass : wrapperClasses) {
               // Pass the current instance as a parameter to the constructor of the Wrapper, and create the Wrapper instance through reflection.
               // Then inject dependencies into the Wrapper instance, and finally assign the Wrapper instance to the instance variable again
               instance = injectExtension(
                   (T)
wrapperClass.getConstructor(type).newInstance(instance));
           }
       }

The logic of the createExtension method is slightly more complex, including the following steps:

  1. Get all extension classes through getExtensionClasses
  2. Creating extended objects through reflection
  3. Injecting dependencies into extended objects
  4. Wrap the extension object in the corresponding Wrapper object

    Among the above steps, the first step is the key to loading the extension class, and the third and fourth steps are the specific implementation of Dubbo IOC and AOP. Due to the large number of design source codes, here is a brief summary of the whole execution logic of ExtensionLoader:

    getExtension(String name)  #Get the extension object according to the key
        -->createExtension(String name) #Create extension instance
            -->getExtensionClasses #Get all extension classes according to the path
                -->loadExtensionClasses #Load extension class
                    -->cacheDefaultExtensionName #Parse @ SPI annotation
                -->loadDirectory #Method to load the specified folder configuration file
                    -->loadResource #load resources
                        -->loadClass #Load the class and cache the class through the loadClass method

How does Dubbo's SPI implement IOC and AOP

Dubbo IOC

Dubbo IOC injects dependencies through the setter method. Dubbo first obtains all the methods of the instance through reflection, and then traverses the method list to detect whether the method name has the characteristics of setter method. If yes, obtain the dependent object through ObjectFactory, and finally call the setter method through reflection to set the dependency into the target object. The codes corresponding to the whole process are as follows:

    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                //Get all methods of the instance
                for (Method method : instance.getClass().getMethods()) {
                    //What isSetter does: check whether the method starts with set, the method has only one parameter, and the method access level is public
                    if (isSetter(method)) {
                        /**
                         * Check {@link DisableInject} to see if we need auto injection for this property
                         */
                        if (method.getAnnotation(DisableInject.class) != null) {
                            continue;
                        }
                        Class<?> pt = method.getParameterTypes()[0];
                        if (ReflectUtils.isPrimitives(pt)) {
                            continue;
                        }
                        try {
                            String property = getSetterProperty(method);
                            //Get dependent objects
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                //set a property
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("Failed to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

Dubbo Aop

Before we say this, we need to know the decorator mode

Decorator mode: dynamically attach responsibility to an object without changing the original class file and using inheritance, so as to dynamically expand the function of an object. It wraps the real object by creating a wrapper object, that is, decoration.

When using Spring, we often use the AOP function. Insert additional logic before and after the method of the target class. For example, Spring AOP is usually used to realize logging, monitoring, authentication and other functions. Does Dubbo's extension mechanism support similar functions? The answer is yes. In Dubbo, there is a special class called Wrapper class. Through decorator mode, wrap the original extension point instance with a Wrapper class. Insert other logic before and after the implementation of the original extension point to realize the AOP function.

Generally speaking, the decorator mode has the following participants:
Component: the common parent class of decorator and decoratee. It is an interface or abstract class used to define basic behavior
ConcreteComponent: defines a concrete object, that is, the decorator
Decorator: Abstract decorator, which inherits from Component and extends ConcreteComponent from external classes. For ConcreteComponent, you don't need to know the existence of decorator. Decorator is an interface or abstract class
ConcreteDecorator: concrete decorator, used to extend ConcreteComponent

//Get all classes that need to be wrapped
Set<Class<?>> wrapperClasses = cachedWrapperClasses;

Let's see what cachedWrapperClasses are?

private Set<Class<?>> cachedWrapperClasses;

Is a set set. When did the set add elements?

    /**
     * cache wrapper class
     * <p>
     * like: ProtocolFilterWrapper, ProtocolListenerWrapper
     */
    private void cacheWrapperClass(Class<?> clazz) {
        if (cachedWrapperClasses == null) {
            cachedWrapperClasses = new ConcurrentHashSet<>();
        }
        cachedWrapperClasses.add(clazz);
    }

Added through this method, and then see who called this private method:

    /**
     * test if clazz is a wrapper class
     * <p>
     * which has Constructor with given class type as its only argument
     */
    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }

Originally, the isWrapperClass method was used to determine whether the object is held in the constructor of other objects. If so, dubbo considers it a decorator class, calls the constructor of the decorator class, and returns the instance object

Then replace the class to be loaded by instantiating the wrapper class. The method executed in this way is the method of the wrapper class.

Dynamic compilation in Dubbo

We know that in Dubbo, many extensions are loaded through SPI mechanism, such as Protocol, Cluster, LoadBalance, ProxyFactory, etc. Sometimes, some extensions do not want to be loaded at the framework startup stage, but want to be loaded according to the runtime parameters when the extension method is called, that is, dynamically load the implementation class according to the parameters.

At runtime, the specific extension is dynamically determined according to the method parameters. In Dubbo, it is called the extension point adaptive instance. It is actually an extension point proxy that delays the selection of extension from Dubbo startup to RPC call. Each extension point in Dubbo has an adaptive class. If it is not explicitly provided, Dubbo will automatically create one for us. Javaassist is used by default.

The implementation logic of adaptive expansion mechanism is as follows

  1. Firstly, Dubbo will generate code with proxy function for the extended interface;
  2. Compile this code through javassist or jdk to get the Class class;
  3. Create proxy classes through reflection;
  4. In the proxy class, determine which implementation class to call through the parameters of the URL object;

javassist

Javassist is an open source class library for analyzing, editing and creating Java bytecode. It was founded by Shigeru Chiba of the Department of mathematics and computer science of Tokyo University of technology. It has joined the open source JBoss application server project to implement a dynamic AOP framework for JBoss by using javassist to operate on bytecode. Javassist is a sub project of JBoss. Its main advantage is that it is simple and fast. Directly use the form of java coding without understanding the virtual machine instructions, you can dynamically change the class structure or dynamically generate classes.

/**
*  Javassist Is an open source class library for analyzing, editing and creating Java bytecode
*  Can dynamically change the structure of a class, or dynamically generate a class
*/
public class CompilerByJavassist {
public static void main(String[] args) throws Exception {
// ClassPool: class object container
ClassPool pool = ClassPool.getDefault();
// Generate a User class through ClassPool
CtClass ctClass = pool.makeClass("com.itheima.domain.User");
// Add attribute -- private String username
CtField enameField = new CtField(pool.getCtClass("java.lang.String"),
"username", ctClass);
enameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enameField);
// Add attribute -- private int age
CtField enoField = new CtField(pool.getCtClass("int"), "age", ctClass);
enoField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enoField);
//Add method
ctClass.addMethod(CtNewMethod.getter("getUsername", enameField));
ctClass.addMethod(CtNewMethod.setter("setUsername", enameField));
ctClass.addMethod(CtNewMethod.getter("getAge", enoField));
ctClass.addMethod(CtNewMethod.setter("setAge", enoField));
// Parameterless constructor
CtConstructor constructor = new CtConstructor(null, ctClass);
constructor.setBody("{}");
ctClass.addConstructor(constructor);
// Add constructor
//ctClass.addConstructor(new CtConstructor(new CtClass[] {}, ctClass));
CtConstructor ctConstructor = new CtConstructor(new CtClass[]
{pool.get(String.class.getName()),CtClass.intType}, ctClass);
ctConstructor.setBody("{\n this.username=$1; \n this.age=$2;\n}");
ctClass.addConstructor(ctConstructor);
// Add custom method
CtMethod ctMethod = new CtMethod(CtClass.voidType, "printUser",new
CtClass[] {}, ctClass);
// Set modifiers for custom methods
ctMethod.setModifiers(Modifier.PUBLIC);
// Set function body for custom method
StringBuffer buffer2 = new StringBuffer();
buffer2.append("{\nSystem.out.println(\"User information is as follows\");\n")
.append("System.out.println(\"user name=\"+username);\n")
.append("System.out.println(\"Age=\"+age);\n").append("}");
ctMethod.setBody(buffer2.toString());
ctClass.addMethod(ctMethod);
//Generate a class
Class<?> clazz = ctClass.toClass();
Constructor cons2 =
clazz.getDeclaredConstructor(String.class,Integer.TYPE);
Object obj = cons2.newInstance("itheima",20);
//Reflection execution method
obj.getClass().getMethod("printUser", new Class[] {})
.invoke(obj, new Object[] {});
// Write the generated class file to the file
byte[] byteArr = ctClass.toBytecode();
FileOutputStream fos = new FileOutputStream(new File("D://User.class"));
fos.write(byteArr);
fos.close();
}
}

Source code analysis

Adaptive annotation

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
   String[] value() default {};
}

Adaptive can be annotated on classes or methods.
Label on class: Dubbo does not generate proxy classes for this class.
Marked on the method: Dubbo will generate proxy logic for the method, indicating that the current method needs to call the corresponding extension point implementation according to the parameter URL.

Each extension point in Dubbo has an adaptive class. If it is not explicitly provided, Dubbo will automatically create one for us. Javaassist is used by default. Let's take a look at the code to create an adaptive extension class

//1. Let's take a look at the acquisition method of extensionLoader
ExtensionLoader<Robot>extensionLoader=ExtensionLoader.getExtensionLoader(Robot.class);
//2. The final call is the construction method of ExtensionLoader
private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
//3. getAdaptiveExtension() to see what you've done
    public T getAdaptiveExtension() {
        //Get the adaptive extension class. If not, start initializing one
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            //An adaptive extension class is created here
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            } else {
                throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }
//Look at createadaptive extension()
 private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
//Then go to getAdaptiveExtensionClass()
    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
//Continue to catch up with createadaptive extensionclass()
private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

//Look at compiler
@SPI("javassist")
public interface Compiler {

    /**
     * Compile java source code.
     *
     * @param code        Java source code
     * @param classLoader classloader
     * @return Compiled class
     */
    Class<?> compile(String code, ClassLoader classLoader);

}
//In fact, I know from here that by generating a string of a class, and then generating an object through javassist

The createadaptive extensionclasscode () method uses a StringBuilder to build the Java source code of the adaptive class. The method implementation is relatively long, so the code will not be pasted here. This way of generating bytecode is also very interesting. You can generate java source code, compile it, and load it into the jvm. In this way, you can better control the generated Java classes. Moreover, there is no need to care about the api of each bytecode generation framework. Because the xxx.java file is common to Java and we are most familiar with it. But the readability of the code is not strong. You need to build the content of xx.java bit by bit.

Service exposure and discovery

Service exposure

Noun interpretation

In Dubbo's core domain model:

  • Invoker is an entity domain. It is the core model of Dubbo. Other models rely on it or convert it. It represents an executable and can initiate invoke calls to it. It may be a local implementation, a remote implementation or a cluster implementation. At the service provider, invoker is used to call the service provider class. On the service consumer, invoker is used to perform remote calls.
  • Protocol is a service domain. It is the main function entrance exposed and referenced by Invoker. It is responsible for the lifecycle management of Invoker.
    export: expose remote services
    refer: refers to a remote service
  • proxyFactory: get the proxy class of an interface
    getInvoker: for the server side, wrap the service object, such as DemoServiceImpl, into an Invoker object
    getProxy: for the client side, create a proxy object of the interface, such as the interface of DemoService.
  • Invocation is a session domain that holds variables in the calling process, such as method name, parameters, etc

Overall process

Before discussing the details of service exposure in detail, let's take a look at the overall service exposure principle of duubo

On the whole, the service exposure of Dubbo framework is divided into two parts. The first step is to convert the held service instances into invokers through agents, and the second step is to convert the invokers into exporters through specific protocols (such as Dubbo). This abstraction of the framework also greatly facilitates function expansion.

The service provider exposes the blue initialization chain of the service. The sequence diagram is as follows:

Source code analysis

The entry method of service export is onApplicationEvent of ServiceBean. onApplicationEvent is an event response method that performs the service export operation after receiving the Spring context refresh event. The method code is as follows:

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

Finally find the doExportUrls() method through export

private void doExportUrls() {
        //All registries in the configuration file are loaded and encapsulated as a list of URL objects inside dubbo
        List<URL> registryURLs = loadRegistries(true);
        //Cycle all protocol configurations and initiate registration in the registry according to different protocols
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            //Service exposure method
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

The doExportUrlsFor1Protocol() method code is much older. We only focus on the core of the relationship

...
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            //Local exposure, and record the service data to the local JVM
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
           //Remote exposure, sending data to the registry
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (!isOnlyInJvm() && logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        // Generate Invoker for service provider class (ref)
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                       // DelegateProviderMetaDataInvoker is used to hold Invoker and ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // Export the service and generate the Exporter
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    //No registry exists, only export services
                    ....
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);

The above code determines the service export method according to the scope parameter in the url, as follows:
scope = none, do not export service
scope != remote, export to local
scope != local, export to remote

Whether exported locally or remotely. Before exporting services, you need to create an invoker, which is a very important step. Therefore, let's first analyze the creation process of invoker. Invoker is created from ProxyFactory. Dubbo's default ProxyFactory implementation class is JavassistProxyFactory. Let's explore the creation process of invoker in the JavassistProxyFactory code. As follows:

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // Create a warpper for the target class
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //Create an anonymous invoker object and implement the doinvoke method
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // Call the invokeMethod method of the Wrapper, and the invokeMethod will eventually call the target method
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

After the Invoke is created successfully, let's look at the local export

    /**
     * always export injvm
     */
    private void exportLocal(URL url) {
        URL local = URLBuilder.from(url)
                .setProtocol(LOCAL_PROTOCOL) // Set the protocol header to injvm
                .setHost(LOCALHOST_VALUE)//Local ip:127.0.0.1
                .setPort(0)
                .build();
        // Create an Invoker and export the service. The protocol here will call the export method of InjvmProtocol at runtime
        Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
        exporters.add(exporter);
        logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
    }

The exportLocal method is relatively simple. First, determine whether to export the service according to the URL protocol header. To export, create a new URL and set the protocol header, hostname, and port to the new values. Then create an Invoker and call the export method of InjvmProtocol to export the service. Let's take a look at what the export method of InjvmProtocol does.

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

As mentioned above, the export method of the InjvmProtocol creates only one InjvmExporter without any other logic. After exporting the service to the local, the analysis is finished.

Take another look at exporting services to remote

Next, we continue to analyze the process of exporting services to remote. Exporting services to remote includes two processes: service export and service registration. First, analyze the service export logic. We turn our attention to the export method of RegistryProtocol.

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // Get registry URL
        URL registryUrl = getRegistryUrl(originInvoker);
        URL providerUrl = getProviderUrl(originInvoker);
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

        //Export service
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // Load the Registry implementation class according to the URL, such as ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);

        //Get the registered service provider URL,
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            // Register services with the registry
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }

        //  Subscribe to the registry for override data
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        // Create and return to the DestroyableExporter
        return new DestroyableExporter<>(exporter);
    }

The above code looks complex. It mainly does the following operations:

  1. Call doLocalExport export service
  2. Register services with the registry
  3. Subscribe to the registry for override data
  4. Create and return to the DestroyableExporter

Look what doLocalExport did

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            //Protocol is related to the configured protocol (dubbo: DubboProtocol)
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

Next, we will focus on the export method of protocol. Assuming that the runtime protocol is dubbo, the protocol variable here will load DubboProtocol at runtime and call the export method of DubboProtocol.

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service. Obtain the service ID and interpret it as service coordinates. It consists of service group name, service name, service version number and port. For example: demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        String key = serviceKey(url);
        //Create DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter); //key: interface (DemoService)

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        //Start service
        openServer(url);
        //Optimized sequencer
        optimizeSerialization(url);

        return exporter;
    }

As mentioned above, we focus on the creation of DubboExporter and the openServer method. It doesn't matter if we don't understand other logic, and it doesn't affect the understanding of the service export process. The openServer method is analyzed below.

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            //Access cache
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        //Create server instance
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

Next, analyze the creation process of the server instance. as follows

    private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
        
        // Check whether the Transporter extension represented by the server parameter exists through SPI, and throw an exception if it does not exist
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // Create exchange server
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        
        // Get the client parameter. You can specify netty, mina
        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            // Get the collection of all Transporter implementation class names, such as supportedTypes = [netty, mina]
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            // Detect that the name of the Transporter implementation class currently supported by Dubbo is in the list,
               // Whether to include the Transporter represented by the client. If not, an exception will be thrown
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

As mentioned above, createServer contains three core logic.

The first is to check whether the Transporter extension represented by the server parameter exists. If it does not exist, an exception will be thrown.

The second is to create a server instance.

The third is to check whether the Transporter extension represented by the client parameter is supported. If it does not exist, an exception is thrown. The code corresponding to the two detection operations is more straightforward, so there is no need to say more. However, the operation of creating a server is not very clear. Let's continue to look at it.

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // Get the exchange. The default is HeaderExchanger.
           // Next, call the bind method of HeaderExchanger to create an exchange server instance
        return getExchanger(url).bind(url, handler);
    }

The above code is relatively simple, so I won't say more. Let's take a look at the bind method of HeaderExchanger.

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // Create a HeaderExchangeServer instance. This method contains multiple logic, as follows:
        //   1. new HeaderExchangeHandler(handler)
        //   2. new DecodeHandler(new HeaderExchangeHandler(handler))
        //   3. Transporters.bind(url, new DecodeHandler(new
HeaderExchangeHandler(handler)))
        return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
    }

HeaderExchanger's bind method contains a lot of logic, but at present, we only need to care about the logic of Transporters' bind method
Just edit it. The code of this method is as follows:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        } else if (handlers != null && handlers.length != 0) {
            Object handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                // If the number of handlers elements is greater than 1, a ChannelHandler distributor is created
                handler = new ChannelHandlerDispatcher(handlers);
            }
            // Get the adaptive Transporter instance and call the instance method
            return getTransporter().bind(url, (ChannelHandler)handler);
        } else {
            throw new IllegalArgumentException("handlers == null");
        }
    }

As mentioned above, the Transporter obtained by the getTransporter() method is dynamically created at runtime. The class name is TransporterAdaptive, that is, the adaptive extension class. TransporterAdaptive will determine what type of Transporter to load at runtime according to the incoming URL parameters. The default is NettyTransporter. Call the NettyTransporter.bind(URL,ChannelHandler) method. Create a NettyServer instance. By calling the NettyServer.doOPen() method, the server is turned on and the service is exposed.

Service registration

The content of this section takes Zookeeper registration center as the analysis target, and other types of registration centers can be analyzed by yourself. Now register from the service
Starting with the analysis of the entry method of, we turn our attention to the export method of RegistryProtocol again. As follows:

Enter the register() method

    public void register(URL registryUrl, URL registeredProviderUrl) {
        //Get registry instance
        Registry registry = registryFactory.getRegistry(registryUrl);
        //Register
        registry.register(registeredProviderUrl);
    }

Look at the getRegistry() method

    @Override
    public Registry getRegistry(URL url) {
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = url.toServiceStringWithoutResolving();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

Enter the createRegistry() method

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        ////Gets the group name, which defaults to dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        // Create a Zookeeper client, which is curatorzookeeper transporter by default
        zkClient = zookeeperTransporter.connect(url);
        // Add status listener
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

In the above code, we focus on the connect method call of ZookeeperTransporter, which is used to create
Zookeeper client. Creating a zookeeper client means that the creation process of the registry is over.

After understanding the essence of service registration, we can read the code of service registration.

    public void doRegister(URL url) {
        try {
            // Create a node through the Zookeeper client. The node path is generated by the toUrlPath method. The path format is as follows:
       //  /${group}/${serviceInterface}/providers/${url}
       // For example / dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    @Override
    public void create(String path, boolean ephemeral) {
        if (!ephemeral) {
            // If the node type to be created is not a temporary node, check whether the node exists here
            if (checkExists(path)) {
                return;
            }
        }
        int i = path.lastIndexOf('/');
        if (i > 0) {
            // Recursively create the upper level path
            create(path.substring(0, i), false);
        }
        // Create temporary or persistent nodes based on the value of ephemeral
        if (ephemeral) {
            createEphemeral(path);
        } else {
            createPersistent(path);
        }
    }

OK, now the process of service registration is analyzed. The whole process can be summarized as follows: first create a registry instance, and then register the service through the registry instance.

summary

  1. When there is a registry and the provider address needs to be registered, the URL format parsed by ServiceConfig is: Registry: / / registry host / org. Apache. Dubbo. Registry. Registryservice? Export = URL. Encode(“ dubbo://service-host/ {service name} / {version number} ")
  2. Based on the adaptive mechanism of Dubbo SPI, the RegistryProtocol#export() method is called after the URL registry: / / protocol header is identified
  3. Wrap the specific service class name, such as DubboServiceRegistryImpl, into an Invoker instance through ProxyFactory
  4. Call doLocalExport method, use DubboProtocol to convert Invoker into Exporter instance, and open Netty server to listen to customer requests
  5. Create a Registry instance, connect to Zookeeper, write the URL address of the provider under the service node, and register the service
  6. Subscribe to the registry for override data and return an Exporter instance
  7. According to the in the URL format“ dubbo://service-host/ The {service name} / {version number} "is identified by the negotiation header Dubbo: / / and calls the DubboProtocol#export() method to develop the service port
  8. The Exporter instance returned by RegistryProtocol#export() is stored in the list < Exporter > exporters of ServiceConfig

Keywords: Java Dubbo Load Balance Spring Algorithm

Added by greengo on Tue, 23 Nov 2021 21:47:21 +0200