Roll it up and take you to write a distributed RPC framework that middle and senior programmers will know

I summary

What is RPC?

  • Remote service call
  • Official: the idea of requesting services from remote computer programs through the network without understanding the underlying network technology
  • A little more popular: without knowing the details of the call, the client calls an object that exists on the remote computer, just like calling an object in the local application.
  • Common rpc frameworks on the market: dobbo, springCloud, gRPC

Then why RPC? Isn't HTTP good?

  • Because RPC and HTTP are not at the same level, they are not strictly comparable and should not be compared.
  • HTTP is only a transmission protocol, which only standardizes a certain communication format
  • Compared with local procedure calls, RPC is used for communication between distributed systems. It can be transmitted by HTTP or based on TCP custom protocol.
  • HTTP protocol is redundant, so RPC is mostly based on TCP custom protocol, and customization is the most suitable for yourself.

Overall structure of the project

Overall architecture

Next, explain the above processes respectively

II Custom annotation

Service providers and consumers share an interface, @ ServiceExpose is to expose the service and place it on an implementation class of the producer@ ServiceReference refers to a service, which is placed on the attribute that needs to be injected by the consumer. For novices, Xiaobai wants to learn java promotion, Java architecture, web development, big data, data analysis, artificial intelligence and other technologies more easily. Here we share the system teaching resources and expand my Wei (Tongying): 1253431195 [tutorials / tools / methods / troubleshooting]

  • Target: specify the location where the modified Annotation can be placed (modified target)

  • @Target(ElementType.TYPE) / / interface and class

  • @Target(ElementType.FIELD) / / attribute

  • @Target(ElementType.METHOD) / / method

  • @Target(ElementType.PARAMETER) / / method parameters

  • @Target(ElementType.CONSTRUCTOR) / / constructor

  • @Target(ElementType.LOCAL_VARIABLE) / / local variable

  • @Target(ElementType.ANNOTATION_TYPE) / / annotation

  • @Target(ElementType.PACKAGE) / / package

  • Retention: defines the retention policy for annotations

  • @Retention(RetentionPolicy.SOURCE) / / the annotation only exists in the source code, not in the class bytecode file

  • @/ / when the policy is run, the byte of retention.class will be reserved in the policy file, but cannot be obtained

  • @Retention(RetentionPolicy.RUNTIME) / / the annotation will exist in the class bytecode file and can be obtained through reflection at runtime

  • Documented: Specifies that the modified Annotation can be extracted into a document by javadoc tool

  • Inherited: Specifies that the decorated Annotation will be inherited

II Start configuration

It mainly loads some rpc related configuration classes and uses SpringBoot for automatic assembly. You can use SPI mechanism to add some custom classes and put them in the specified folder.

III rpc interface injection / rpc service scanning

Here, we mainly obtain the corresponding annotated attributes / classes through reflection to expose / reference services. What needs attention here is when to expose / reference services? As follows:

  • Client: there are generally two schemes

  • Hungry man style: hungry man style is to implement the afterPropertiesSet method in the InitializingBean interface of Spring, and the container introduces services by calling the afterPropertiesSet method of ReferenceBean. (when Spring starts, inject implementation classes into all attributes, including remote and local implementation classes) lazy: the introduction process is started only when this service is injected into other classes, that is, the service introduction will be started only when it is used. After the Spring IOC container of the application is refreshed (spring Context initialization), scan all beans, obtain the field with @ ServiceExpose/@ServiceReference annotation in the Bean, and then create a proxy object of field type. After creation, set the proxy object to this field. Subsequently, a server-side connection is created through the proxy object and a call is initiated. (dubbo default)

  • Server side: the same as lazy type.

So how do we know that the Spring IOC refresh is complete? Here we use a listener provided by Spring. When the Spring IOC refresh is completed, the listener will be triggered.

IV Service registration to / from Zk

Zookeeper adopts the data model of node tree, which is similar to linux file system. It is relatively simple for /, / node1, / node2. If you don't understand zookeeper, please move on: zookeeper principle

We create a persistent node for each service name. When registering a service, we actually create a temporary node under the persistent node in zookeeper, which stores the IP, port, serialization mode, etc. of the service. For novices, Xiaobai wants to learn java promotion, Java architecture, web development, big data, data analysis, artificial intelligence and other technologies more easily. Here we share the system teaching resources and expand my Wei (Tongying): 1253431195 [tutorials / tools / methods / troubleshooting]

When the client obtains the service, it parses the service address data by obtaining the temporary node list under the persistent node:

Client monitoring service changes:

V Generate proxy class object

The dynamic proxy of JDK is used here. cglib or Javassist(dobbo) can also be used.

public class ClientProxyFactory 
{   
 /**     * Get proxy object, bind invoke behavior     *  
   * @param clazz Interface class object     * @param <T>   type     * 
@return Proxy object     *
/public <T> T getProxyInstance(Class<T> clazz) 
{        
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, 
new InvocationHandler() 
{            
final Random random = new Random();            
@Override            
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable 
{                
// Step 1: select a service exposed by a service provider through the service discovery mechanism                
String serviceName = clazz.getName();                
final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName);                
logger.info("Rpc server instance list: {}", serviceInfos);                
if (CollectionUtils.isEmpty(serviceInfos)) 
{                    
throw new RpcException("No rpc servers found.");                
}                
// TODO: load balancing is simulated here. One of the services exposed by multiple service providers is randomly selected, and the later writing method is used to realize load balancing                
final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));                
// Step 2: construct rpc request object                
final RpcRequest rpcRequest = new RpcRequest();                
rpcRequest.setServiceName(serviceName);                
rpcRequest.setMethod(method.getName());                rpcRequest.setParameterTypes(method.getParameterTypes());                
rpcRequest.setParameters(args);                
// Step 3: encode the request message. TODO: multiple encoding methods can be configured here                
byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);                
// Step 4: call rpc client to start sending messages                
byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);                
// Step 5: decode the response message                
final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);                
// Step 6: parse the returned results for processing                
if (rpcResponse.getException() != null) 
{                    
throw rpcResponse.getException();                
}                
return rpcResponse.getRetValue();            
}        
});    
}
}

Vi load balancing

This implementation supports two main load balancing strategies, random and polling. Both of them support weighted random and polling, which are actually four strategies.

VII Netty communication

The server is basically the same as the client. Only the code of the server is shown here. The proxy object is generated when Spring starts, but there is no call. Every call (request) will generate a Netty connection. For novices, Xiaobai wants to learn java promotion, Java architecture, web development, big data, data analysis, artificial intelligence and other technologies more easily. Here we share the system teaching resources and expand my Wei (Tongying): 1253431195 [tutorials / tools / methods / troubleshooting]

public class NettyRpcServer extends RpcServer
 {       
@Override    public void start() 
{        
// Create two thread groups        
EventLoopGroup bossGroup = new NioEventLoopGroup();        
EventLoopGroup workerGroup = new NioEventLoopGroup();        
try {            
// Create the startup object of the server            
ServerBootstrap serverBootstrap = new ServerBootstrap()                    
// Set up two thread groups                    
.group(bossGroup, workerGroup)                    
// Set the server channel implementation type                    
.channel(NioServerSocketChannel.class)                    
// The server is used to receive incoming connections, that is, boosGroup threads, and the thread queue size                    
.option(ChannelOption.SO_BACKLOG, 100)                    
.childOption(ChannelOption.SO_KEEPALIVE, true)                    
// child channel, worker thread processor                    
.childHandler(new ChannelInitializer<SocketChannel>() {                        
// Set a custom processor for the pipeline                        
@Override                        
public void initChannel(SocketChannel channel) 
{                            
ChannelPipeline pipeline = channel.pipeline();                            
pipeline.addLast(new NettyServerHandler());                        
}                    
});            
// Bind the port number and start the service synchronously            
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();           
 channel = channelFuture.channel();            
// Monitor the closed channel and change to synchronization            
channelFuture.channel().closeFuture().sync();        
} 
catch (Exception e) 
{            
logger.error("server error.", e);        
} 
finally 
{           
 // Free thread group resources            
bossGroup.shutdownGracefully();            
workerGroup.shutdownGracefully();        
}    
}

Implement specific handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter 
{    
//This method is triggered when the channel is ready    
@Override    
public void channelActive(ChannelHandlerContext ctx) 
throws Exception 
{        
//Record        
logger.info("channel active: {}", ctx);    
}    
//Read the actual data (here we can read the messages sent by the client)    
@Override    
public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception 
{        
//Read data into buffer        
final ByteBuf msgBuf = (ByteBuf) msg;        
final byte[] reqBytes = new byte[msgBuf.readableBytes()];        
msgBuf.readBytes(reqBytes);    
}    
//Data reading completed    
@Override    
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 
{        
//Use reflection to find the target method to return        
final byte[] respBytes = requestHandler.handleRequest(reqBytes);        
ctx.writeAndFlush(respBytes);    
}    
//To handle exceptions, it is generally necessary to close the channel    
@Override    
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception 
{        
ctx.close();    
}
}

VIII Serialization protocol

Students who have a little knowledge of computer network know that data transmission in the network is binary: 01010101010101010. Similar to this, only binary data can be transmitted in the network. However, we usually serialize before encoding in order to optimize the amount of data transmitted. Because some data are too large, space optimization is needed.

So let's distinguish between serialization and coding: I draw a picture, and everyone understands it

Define a serialization protocol and put it into the pipeline as a handler.

Netty supports a variety of serialization, such as jdk, Jason, ProtoBuf, etc. ProtoBuf is used here. After serialization, it has a small code stream and high performance, which is very suitable for RPC calls. Next, how to use ProtoBuf?

  • 1. Write the class XXX to be serialized Proto: ProtoBuf has its own grammar rules (Baidu)

  • 2. Protocol provided through the official website Exe generates the corresponding Java code
  • 3. The code generated by the tool (AnimalProto) has helped us encapsulate the serialization and deserialization methods. We only need to call the corresponding methods

Introducing Protobuf dependency

<dependency>    
<groupId>com.google.protobuf</groupId>    
<artifactId>protobuf-java</artifactId>    
<version>2.4.1</version></dependency>

Serialization:

/** * Call the Builder constructed by the object to complete attribute assignment and serialization* 
@return */public static byte[] protobufSerializer()
{    
AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();    
builder.setId(1L);    
builder.setName("Little pig");   
 List<String> actions = new ArrayList<>();    
actions.add("eat");    
actions.add("run");    
builder.addAllActions(actions);    
return builder.build().toByteArray();}

Deserialization:

/** * By calling parseFrom Deserialization is complete * 
@param bytes * 
@return * @throws InvalidProtocolBufferException *
/public static Animal deserialize(byte[] bytes) throws Exception 
{    
AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);    
Animal animal = new Animal();    
animal.setId(pAnimal.getId());    
animal.setName(pAnimal.getName());    
animal.setActions(pAnimal.getActionsList());    
return animal;
}

Test:

public static void main(String[] args) 
throws Exception 
{    
byte[] bytes = serializer();    
Animal animal = deserialize(bytes);    
System.out.println(animal);
}

The following can be seen for normal serialization and deserialization:

IX communication protocol

Communication protocol is mainly used to solve network transmission problems, such as TCP unpacking and sticking.

TCP problem:

  • TCP unpacking and sticking is mainly to combine or separate some data for transmission. At this time, some data are incomplete and some data are more than a part, which will cause problems. Generally, the problem of unpacking and sticking packets should be considered when using TCP protocol
  • tcp sticky packets and half packets are caused by sliding windows. Because no matter how long your data is, how to segment each piece of data. But tcp is only sent according to the length of my sliding window. For novices, Xiaobai wants to learn java promotion, Java architecture, web development, big data, data analysis, artificial intelligence and other technologies more easily. Here we share the system teaching resources and expand my Wei (Tongying): 1253431195 [tutorials / tools / methods / troubleshooting]
  • The essence is that TCP is a streaming protocol and messages have no boundaries.

Solution: the solutions of the mainstream protocols in the industry can be summarized as follows

  • Fixed message length: for example, the size of each message is a fixed length of 100 bytes. If it is not enough, make up with spaces. (fixed length decoder)
  • Add a special terminator at the end of the package for segmentation. (separator encoder)

  • Message length + message: the message is divided into message header and message body. The message header contains a field indicating the total length of the message (or the length of the message body). Netty comes with:

  • Custom codec

Here is just a listing of the encoding process. Decoding is an inverse process. (to put it bluntly, encoding is to write in a fixed format, and decoding is to read in a fixed format)

Congratulations, you have learned to write RPC framework. Friends who want to know more can refer to the source code. Learn and upgrade.

Keywords: Java Database MySQL Back-end Distribution

Added by donald on Sun, 06 Mar 2022 16:46:07 +0200