[annual salary 60W watershed] advanced part of RPC framework based on Netty handwriting (with registration center and notes)

Before reading this article, it is recommended to read the content associated with this article.

1. Analyze the underlying implementation principle of network communication under distributed micro service architecture in detail (illustration)

2. (skills with an annual salary of 60W) after working for 5 years, do you really understand Netty and why to use it? (deep dry goods)

3. Deeply analyze the core components in Netty (illustration + example)

4. Details required for bat interview: detailed explanation of ByteBuf in Netty

5. Through a large number of practical cases, how to solve the problem of unpacking and sticking in Netty?

6. Implement custom message communication protocol based on Netty (protocol design and analysis application practice)

7. The most detailed and complete serialization technology, in-depth analysis and application practice in the whole network

8. Teach you how to implement a basic RPC framework based on Netty (easy to understand)

In this article, we continue to focus on Netty handwritten implementation of RPC Foundation For optimization, several points are mainly introduced

  • Integrating spring to realize annotation driven configuration
  • Integrate zookeeper to realize service registration
  • Increased load balancing implementation

Source code, add "follow Mic architecture" micro signal and reply to "rpc" for access.

Add annotation driven

Modification module mainly involved

  • netty-rpc-protocol
  • netty-rpc-provider

netty-rpc-protocol

The main modified classes of the current module are as follows.

Figure 7-1

The following modifications are made to the netty RPC protocol module

Add annotation driven

This annotation is used to specify some services as remote services

@Target(ElementType.TYPE)// Target describes the object scope modified by Annotation. TYPE: used to describe class, interface (including Annotation TYPE) or enum declaration
@Retention(RetentionPolicy.RUNTIME)// The function of Reteniton is to define how long the annotation annotated by Reteniton will be retained until runtime. So we can get annotation information through reflection.
@Component
public @interface GpRemoteService {

}

SpringRpcProviderBean

This class is mainly used to start NettyServer and save the mapping relationship of bean s

@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;
    private final String serverAddress; 
    public SpringRpcProviderBean(int serverPort) throws UnknownHostException {
        this.serverPort = serverPort;
        InetAddress address=InetAddress.getLocalHost();
        this.serverAddress=address.getHostAddress();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
        new Thread(()->{
            try {
                new NettyServer(this.serverAddress,this.serverPort).startNettyServer();
            } catch (Exception e) {
                log.error("start Netty Server Occur Exception,",e);
                e.printStackTrace();
            }
        }).start();
    }

    //bean after instantiation calls
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //Publish for the service where the annotation exists
            Method[] methods=bean.getClass().getDeclaredMethods();
            for(Method method: methods){ //Save the mapping of the bean s to be published
                String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName();
                BeanMethod beanMethod=new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key,beanMethod);
            }
        }
        return bean;
    }
}

Mediator

It mainly manages bean s and calls

BeanMethod

@Data
public class BeanMethod {
    private Object bean;
    private Method method;
}

Mediator

Responsible for the management of the publishing bean and the reflection call of the bean

public class Mediator {
    public static Map<String,BeanMethod> beanMethodMap=new ConcurrentHashMap<>();

    private volatile static Mediator instance=null;

    private Mediator(){
    }

    public static Mediator getInstance(){
        if(instance==null){
            synchronized (Mediator.class){
                if(instance==null){
                    instance=new Mediator();
                }
            }
        }
        return instance;
    }
    public Object processor(RpcRequest rpcRequest){
        String key=rpcRequest.getClassName()+"."+rpcRequest.getMethodName();
        BeanMethod beanMethod=beanMethodMap.get(key);
        if(beanMethod==null){
            return null;
        }
        Object bean=beanMethod.getBean();
        Method method=beanMethod.getMethod();
        try {
            return method.invoke(bean,rpcRequest.getParams());
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }
}

RpcServerProperties

Define configuration properties

@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {

    private int servicePort;
}

RpcProviderAutoConfiguration

Define autoconfiguration classes

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
        return new SpringRpcProviderBean(rpcServerProperties.getServicePort());
    }
}

Modify RpcServerHandler

Modify the calling method and directly use the call of Mediator.

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        RpcProtocol resProtocol=new RpcProtocol<>();
        Header header=msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        Object result=Mediator.getInstance().processor(msg.getContent()); //Mainly modify this part
        resProtocol.setHeader(header);
        RpcResponse response=new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }
}

netty-rpc-provider

This module mainly modifies two parts

  • application.properties
  • NettyRpcProviderMain

NettyRpcProviderMain

@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"})
@SpringBootApplication
public class NettyRpcProviderMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(NettyRpcProviderMain.class, args);
        //Remove the original instantiation part
    }
}

application.properties

Add a configuration attribute.

gp.rpc.servicePort=20880

UserServiceImpl

Publish the current service.

@GpRemoteService //Indicates that the current service is published as a remote service
@Slf4j
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {
        log.info("begin saveUser:"+name);
        return "Save User Success!";
    }
}

Modify the annotation driver of the client

The client also needs to reference the service through annotation, so that the details of remote communication can be completely shielded. The code structure is shown in Figure 7-2

Figure 7-2

Add client annotation

Create the following annotation in the annotation directory of the netty RPC protocol module.

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface GpRemoteReference {
}

SpringRpcReferenceBean

Define a factory Bean to build a proxy for remote communication

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;
    private Object object;
    private String serviceAddress;
    private int servicePort;

    @Override
    public Object getObject() throws Exception {
        return object;
    }

    public void init(){
        this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
                new Class<?>[]{this.interfaceClass},
                new RpcInvokerProxy(this.serviceAddress,this.servicePort));
    }

    @Override
    public Class<?> getObjectType() {
        return this.interfaceClass;
    }

    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

    public void setServiceAddress(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public void setServicePort(int servicePort) {
        this.servicePort = servicePort;
    }
}

SpringRpcReferencePostProcessor

Dynamic proxy injection for remote beans:

  • BeanClassLoaderAware: get the class loader of the Bean

  • Beanfactoryprocessor: after the spring container loads the bean definition file, it is executed before the bean instantiation

  • ApplicationContextAware: get context object ApplicationContext

@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private ApplicationContext context;
    private ClassLoader classLoader;
    private RpcClientProperties clientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {
        this.clientProperties = clientProperties;
    }

    //Save published reference bean information
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader=classLoader;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context=applicationContext;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
            //Traverse the bean definition, then get the loaded beans, traverse the fields in these beans, and whether to carry the GpRemoteReference annotation
            //If so, you need to build a dynamic proxy implementation
            BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
            String beanClassName=beanDefinition.getBeanClassName();
            if(beanClassName!=null){
                //Like the forName method, the internal is the forName method called directly
                Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
                //Dynamically create a Bean for the specified field in the current class
                ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
            }
        }
        //Construct a dynamic proxy object using the bean annotated with @ GpRemoteReference
        BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{
            if(context.containsBean(beanName)){
                log.warn("SpringContext already register bean {}",beanName);
                return;
            }
            //Register the dynamically created bean into the container
            registry.registerBeanDefinition(beanName,beanDefinition);
            log.info("registered RpcReferenceBean {} success.",beanName);
        });
    }
    private void parseRpcReference(Field field){
        GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
        if(gpRemoteReference!=null) {
            BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass",field.getType());
            builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort",clientProperties.getServicePort());
            BeanDefinition beanDefinition=builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
        }
    }
}

You need to add an init to the RpcConstant constant_ METHOD_ Name attribute

public class RpcConstant {
    //Total bytes of header section
    public final static int HEAD_TOTAL_LEN=16;
    //magic number
    public final static short MAGIC=0xca;

    public static final String INIT_METHOD_NAME = "init";
}

RpcClientProperties

@Data
public class RpcClientProperties {

    private String serviceAddress="192.168.1.102";

    private int servicePort=20880;
}

RpcRefernceAutoConfiguration

@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{

    @Bean
    public SpringRpcReferencePostProcessor postProcessor(){
        String address=environment.getProperty("gp.serviceAddress");
        int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
        RpcClientProperties rc=new RpcClientProperties();
        rc.setServiceAddress(address);
        rc.setServicePort(port);
        return new SpringRpcReferencePostProcessor(rc);
    }

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment=environment;
    }
}

netty-rpc-consumer

Modify netty RPC consumer module

  • Turn the module into a spring boot project
  • Increase web dependency
  • Add test class

Figure 7-3 netty RPC consumer module

Introducing jar package dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

HelloController

@RestController
public class HelloController {

    @GpRemoteReference
    private IUserService userService;

    @GetMapping("/test")
    public String test(){
        return userService.saveUser("Mic");
    }
}

NettyConsumerMain

@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"})
@SpringBootApplication
public class NettyConsumerMain {
    public static void main(String[] args) {
        SpringApplication.run(NettyConsumerMain.class, args);
    }
}

application.properties

gp.serviceAddress=192.168.1.102
servicePort.servicePort=20880

Access test

  • Start netty RPC server
  • Start netty RPC consumer

If there are no problems with the startup process, you can access HelloController to test access to the remote service.

Import registry

Create a netty RPC registry module. The code structure is shown in Figure 7-4.

Introduce related dependencies

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>4.2.0</version>
</dependency>

IRegistryService

public interface IRegistryService {

    /**
     * Registration service
     * @param serviceInfo
     * @throws Exception
     */
    void register(ServiceInfo serviceInfo) throws Exception;

    /**
     * Unregister
     * @param serviceInfo
     * @throws Exception
     */
    void unRegister(ServiceInfo serviceInfo) throws Exception;

    /**
     * Dynamic discovery service
     * @param serviceName
     * @return
     * @throws Exception
     */
    ServiceInfo discovery(String serviceName) throws Exception;
}

ServiceInfo

@Data
public class ServiceInfo {
    private String serviceName;
    private String serviceAddress;
    private int servicePort;
}

ZookeeperRegistryService

@Slf4j
public class ZookeeperRegistryService implements IRegistryService {

    private static final String REGISTRY_PATH="/registry";
    //The component encapsulation of service registration and discovery provided in cursor abstracts ServiceInstance
    // ServiceProvider and ServiceDiscovery are three interfaces through which we can easily implement Service Discovery
    private final ServiceDiscovery<ServiceInfo> serviceDiscovery;

    private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;

    public ZookeeperRegistryService(String registryAddress) throws Exception {
        CuratorFramework client= CuratorFrameworkFactory
                .newClient(registryAddress,new ExponentialBackoffRetry(1000,3));
        JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class);
        this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class)
                .client(client)
                .serializer(serializer)
                .basePath(REGISTRY_PATH)
                .build();
        this.serviceDiscovery.start();
        loadBalance=new RandomLoadBalance();
    }

    @Override
    public void register(ServiceInfo serviceInfo) throws Exception {
        log.info("Start the registration service,{}",serviceInfo);
        ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance
                .<ServiceInfo>builder().name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        serviceDiscovery.registerService(serviceInstance);
    }

    @Override
    public void unRegister(ServiceInfo serviceInfo) throws Exception {
        ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder()
                .name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        serviceDiscovery.unregisterService(serviceInstance);
    }

    @Override
    public ServiceInfo discovery(String serviceName) throws Exception {
        Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery
                .queryForInstances(serviceName);
        //Return a specific instance through load balancing
        ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances);
        if(serviceInstance!=null){
            return serviceInstance.getPayload();
        }
        return null;
    }
}

Load balancing algorithm is introduced

Since there may be more than one service found by the server, the load balancing algorithm needs to be used

ILoadBalance

public interface ILoadBalance<T> {

    T select(List<T> servers);
}

AbstractLoadBalance

public abstract class AbstractLoadBanalce implements ILoadBalance<ServiceInstance<ServiceInfo>> {

    @Override
    public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers){
        if(servers==null||servers.size()==0){
            return null;
        }
        if(servers.size()==1){
            return servers.get(0);
        }
        return doSelect(servers);
    }

    protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);
}

RandomLoadBalance

public class RandomLoadBalance extends AbstractLoadBanalce {
    @Override
    protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) {
        int length=servers.size();
        Random random=new Random();
        return servers.get(random.nextInt(length));
    }
}

RegistryType

public enum RegistryType {

    ZOOKEEPER((byte)0),
    EUREKA((byte)1);

    private byte code;

    RegistryType(byte code) {
        this.code=code;
    }

    public byte code(){
        return this.code;
    }

    public static RegistryType findByCode(byte code) {
        for (RegistryType rt : RegistryType.values()) {
            if (rt.code() == code) {
                return rt;
            }
        }
        return null;
    }
}

RegistryFactory

public class RegistryFactory {

    public static IRegistryService createRegistryService(String address,RegistryType registryType){
        IRegistryService registryService=null;
        try {
            switch (registryType) {
                case ZOOKEEPER:
                    registryService = new ZookeeperRegistryService(address);
                    break;
                case EUREKA:
                    //TODO
                    break;
                default:
                    registryService = new ZookeeperRegistryService(address);
                    break;
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return registryService;
    }
}

Modify the server and add service registration

Modify the netty RPC protocol module and add the support of the registry

SpringRpcProviderBean

Follow the case annotation section below to indicate the content to be modified

@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;
    private final String serverAddress;
    private final IRegistryService registryService; //Modify the part and add the implementation of registration center
    public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException {
        this.serverPort = serverPort;
        InetAddress address=InetAddress.getLocalHost();
        this.serverAddress=address.getHostAddress();
        this.registryService=registryService; //Modify the part and add the implementation of registration center
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
        new Thread(()->{
            try {
                new NettyServer(this.serverAddress,this.serverPort).startNettyServer();
            } catch (Exception e) {
                log.error("start Netty Server Occur Exception,",e);
                e.printStackTrace();
            }
        }).start();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ //Publish for the service where the annotation exists
            Method[] methods=bean.getClass().getDeclaredMethods();
            for(Method method: methods){
                String serviceName=bean.getClass().getInterfaces()[0].getName();
                String key=serviceName+"."+method.getName();
                BeanMethod beanMethod=new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key,beanMethod);
                try {
                    //Modify the part and add the implementation of registration center
                    ServiceInfo serviceInfo = new ServiceInfo();
                    serviceInfo.setServiceAddress(this.serverAddress);
                    serviceInfo.setServicePort(this.serverPort);
                    serviceInfo.setServiceName(serviceName);
                    registryService.register(serviceInfo);//Modify the part and add the implementation of registration center
                }catch (Exception e){
                    log.error("register service {} faild",serviceName,e);
                }
            }
        }
        return bean;
    }
}

RpcServerProperties

Modify RpcServerProperties and add the configuration of the registry

@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {

    private int servicePort;

    private byte registerType;

    private String registryAddress;
}

RpcProviderAutoConfiguration

Add registry injection.

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
        //Add registry
        IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType()));
        return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService);
    }
}

application.properties

Modify application.properties in netty RPC provider.

gp.rpc.servicePort=20880
gp.rpc.registerType=0
gp.rpc.registryAddress=192.168.221.128:2181

Modify the client and add service discovery

The client needs to modify many places. The following modified codes are all classes in the netty RPC protocol module.

RpcClientProperties

Add options for registry type and registry address

@Data
public class RpcClientProperties {

    private String serviceAddress="192.168.1.102";

    private int servicePort=20880;

    private byte registryType;

    private String registryAddress;

}

Modify NettyClient

It was originally a static address, but now it has been modified to obtain the address from the registry

@Slf4j
public class NettyClient {
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
   /* private String serviceAddress;
    private int servicePort;*/
    public NettyClient(){
        log.info("begin init NettyClient");
        bootstrap=new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());
       /* this.serviceAddress=serviceAddress;
        this.servicePort=servicePort;*/
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception {
        ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName());
        ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync();
        future.addListener(listener->{
            if(future.isSuccess()){
                log.info("connect rpc server {} success.",serviceInfo.getServiceAddress());
            }else{
                log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress());
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();
            }
        });
        log.info("begin transfer data");
        future.channel().writeAndFlush(protocol);
    }
}

Modify RpcInvokerProxy

Change the static ip and address to IRegistryService

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

   /* private String serviceAddress;
    private int servicePort;*/

    IRegistryService registryService;

    public RpcInvokerProxy(IRegistryService registryService) {
       /* this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;*/
        this.registryService=registryService;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        log.info("begin invoke target server");
        //Assembly parameters
        RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
        protocol.setHeader(header);
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParams(args);
        protocol.setContent(request);
        //Send request
        NettyClient nettyClient=new NettyClient();
        //Building asynchronous data processing
        RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
        RequestHolder.REQUEST_MAP.put(requestId,future);
        nettyClient.sendRequest(protocol,this.registryService);
        return future.getPromise().get().getData();
    }
}

SpringRpcReferenceBean

Modify the reference bean and add the registry configuration

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;
    private Object object;
   /* private String serviceAddress;
    private int servicePort;*/
    //Modify and add Registration Center
    private byte registryType;
    private String registryAddress;

    @Override
    public Object getObject() throws Exception {
        return object;
    }

    public void init(){
        //Modify and add Registration Center
        IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType));
        this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
                new Class<?>[]{this.interfaceClass},
                new RpcInvokerProxy(registryService));
    }

    @Override
    public Class<?> getObjectType() {
        return this.interfaceClass;
    }

    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

   /* public void setServiceAddress(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    public void setServicePort(int servicePort) {
        this.servicePort = servicePort;
    }*/

    public void setRegistryType(byte registryType) {
        this.registryType = registryType;
    }

    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }
}

SpringRpcReferencePostProcessor

@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private ApplicationContext context;
    private ClassLoader classLoader;
    private RpcClientProperties clientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {
        this.clientProperties = clientProperties;
    }

    //Save published reference bean information
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader=classLoader;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context=applicationContext;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
            //Traverse the bean definition, then get the loaded beans, traverse the fields in these beans, and whether to carry the GpRemoteReference annotation
            //If so, you need to build a dynamic proxy implementation
            BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
            String beanClassName=beanDefinition.getBeanClassName();
            if(beanClassName!=null){
                Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
                ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
            }
        }
        //Construct a dynamic proxy object using the bean annotated with @ GpRemoteReference
        BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{
            if(context.containsBean(beanName)){
                log.warn("SpringContext already register bean {}",beanName);
                return;
            }
            registry.registerBeanDefinition(beanName,beanDefinition);
            log.info("registered RpcReferenceBean {} success.",beanName);
        });
    }
    private void parseRpcReference(Field field){
        GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
        if(gpRemoteReference!=null) {
            BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass",field.getType());
            /*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort",clientProperties.getServicePort());*/
            builder.addPropertyValue("registryType",clientProperties.getRegistryType());
            builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress());
            BeanDefinition beanDefinition=builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
        }
    }
}

RpcRefernceAutoConfiguration

@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{

    @Bean
    public SpringRpcReferencePostProcessor postProcessor(){
        String address=environment.getProperty("gp.serviceAddress");
        int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
        RpcClientProperties rc=new RpcClientProperties();
        rc.setServiceAddress(address);
        rc.setServicePort(port);
        rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType")));
        rc.setRegistryAddress(environment.getProperty("gp.registryAddress"));
        return new SpringRpcReferencePostProcessor(rc);
    }

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment=environment;
    }
}

application.properties

Modify the configuration in the netty RPC consumer module

gp.serviceAddress=192.168.1.102
gp.servicePort=20880

gp.registryType=0
gp.registryAddress=192.168.221.128:2181

Load balancing test

Add a server startup class and modify the port. Then refresh the browser without restarting the client to see the effect of load balancing.

Figure 7-5

Students who need source code, please pay attention to the official account [following the Mic learning framework], and reply to the keyword [rpc].

Copyright notice: unless otherwise stated, all articles on this blog adopt CC BY-NC-SA 4.0 license agreement. Reprint please indicate from mic to take you to learn architecture! If this article is helpful to you, please pay attention and praise. Your persistence is the driving force of my continuous creation. Welcome to official account official account of "Mic learning architecture" to get more dry cargo!

Keywords: Java

Added by hyngvesson on Thu, 18 Nov 2021 07:08:16 +0200