Teach you how to implement a basic RPC framework based on Netty (easy to understand)

Before reading this article, it is recommended to read the content associated with this article.

[[1] detailed analysis of the underlying implementation principle of network communication under distributed microservice architecture (illustration)]( https://mp.weixin.qq.com/s?__...)

[2 after working for 5 years, do you really understand Netty and why to use it? (deep dry goods)]( https://mp.weixin.qq.com/s?__...)

[[3] deep analysis of core components in Netty (illustration + example)]( https://mp.weixin.qq.com/s?__...)

[[4] details required for bat interview: detailed explanation of ByteBuf in Netty]( https://mp.weixin.qq.com/s?__...)

[[5] through a large number of actual combat cases, how to solve the problem of unpacking and sticking in Netty?]( https://mp.weixin.qq.com/s?__...)

[[6] implement custom message communication protocol based on Netty (protocol design and analysis application practice)]( https://mp.weixin.qq.com/s?__...)

[[7] the most detailed and complete serialization technology, in-depth analysis and application practice in the whole network]( https://mp.weixin.qq.com/s?__...)

In the previous content, we have understood the basic knowledge and implementation principle of Netty from simple to deep, and I believe you have a more comprehensive understanding of Netty. Then, we will take you through a practical case of handwritten RPC communication to understand the practical application of Netty.

Why choose RPC as the actual combat? Because Netty itself is to solve the communication problem, and in the practical application, the RPC Protocol Framework is the one we contact most. Therefore, this actual combat can let you understand that in addition to Netty's practical application, you can also understand the underlying principles of RPC.

What is RPC

The full name of RPC (Remote Procedure Call) is a protocol that requests services from remote computer programs through the network without understanding the underlying network technology. The simple understanding is to enable developers to call remote services like calling local services.

Since it is a protocol, it must have a protocol specification, as shown in Figure 6-1.

In order to achieve the goal of "enabling developers to call remote services like calling local services", RPC Protocol needs to realize remote interaction as shown in Figure 6-1.

  • When the client invokes the remote service, the details of network communication must be shielded through the local dynamic proxy module, so the dynamic proxy module needs to be responsible for assembling the request parameters, methods and other data into data packets and sending them to the target server
  • When sending this packet, it also needs to follow the agreed message protocol and serialization protocol, and finally convert it into binary data stream transmission
  • After receiving the data packet, the server decodes it according to the agreed message protocol to obtain the request information.
  • The server then routes the call to the target service according to the request information, obtains the result and returns it to the client.

< center > figure 6-1 < / center >

Mainstream RPC framework in the industry

For any framework that meets the RPC Protocol, we become the RPC framework. In actual development, we can use the open source and relatively mature RPC framework to solve the remote communication problem under the microservice architecture. The common RPC framework:

  1. Thrift: thrift is a software framework for developing extensible and cross language services. It combines a powerful software stack and code generation engine to build seamless and efficient services between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml.
  2. Dubbo: Dubbo is a distributed service framework and SOA governance scheme. Its functions mainly include: high-performance NIO communication and multi protocol integration, service dynamic addressing and routing, soft load balancing and fault tolerance, dependency analysis and degradation, etc. Dubbo is the core framework of Alibaba's internal SOA Service governance scheme. Dubbo has been used by many non Alibaba companies since it was opened in 2011 Use.

Notes for handwritten RPC

Based on the above understanding of RPC Protocol, what technologies should we consider if we implement it ourselves? In fact, the whole process based on figure 6-1 should have a general understanding.

  • Communication protocol, RPC framework has very high performance requirements, so the simpler the communication protocol should be, the better, which can reduce the performance loss caused by encoding and decoding. Most mainstream RPC frameworks will directly choose TCP and HTTP protocols.
  • Serialization and deserialization: data needs to be serialized and deserialized for network transmission. As we said earlier, the so-called serialization and deserialization is the process of not converting objects into binary streams and converting binary streams into objects. In the selection of serialization framework, we generally choose efficient and general algorithms, such as fastjason, Protobuf Hessian et al. These serialization technologies are more efficient than native serialization operations and have higher compression ratio.
  • Dynamic proxy: when a client invokes a remote service, it needs to shield the details of network communication through the dynamic proxy. The dynamic proxy is generated during operation, so the generation speed and bytecode size of the dynamic proxy class will affect the performance and resource consumption of the overall RPC framework. Common dynamic proxy technologies: Javassist, Cglib, JDK dynamic proxy, etc.

Implementation of RPC based on Netty handwriting

After understanding the RPC Protocol, we implement an RPC communication framework based on Netty.

See Annex netty RPC example for code details

< center > figure 6-2 project module composition < / center >

jar packages to be imported:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.72</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

Module dependencies:

  • The provider relies on netty RPC Protocol and netty RPC API
  • cosumer relies on netty RPC Protocol and netty RPC API

Netty RPC API module

< center > figure 6-3 composition of netty RPC API module < / center >

IUserService

public interface IUserService {

    String saveUser(String name);
}

Netty RPC provider module

< center > figure 6-4 composition of netty RPC provider module < / center >

UserServiceImpl

@Service
@Slf4j
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {
        log.info("begin saveUser:"+name);
        return "Save User Success!";
    }
}

NettyRpcProviderMain

Note that in the current step, the part describing the case is not added for the time being, and it will be added later

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  //Case1 (add later)
@SpringBootApplication
public class NettyRpcProviderMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(NettyRpcProviderMain.class, args);
        new NettyServer("127.0.0.1",8080).startNettyServer();   //Case2 (add later)
    }
}

netty-rpc-protocol

Start writing the communication protocol module, which mainly does several things

  • Define message protocol
  • Define serialization and deserialization methods
  • Establish netty communication

< center > figure 6-5 < / center >

Define message protocol

We talked about the custom message protocol before. We can define it here according to the following protocol format.

    /*
    +----------------------------------------------+
    | Magic number 2byte | serialization algorithm 1byte | request type 1byte|
    +----------------------------------------------+
    | Message ID 8byte | data length 4byte|
    +----------------------------------------------+
    */

Header

@AllArgsConstructor
@Data
public class Header implements Serializable {
    /*
    +----------------------------------------------+
    | Magic number 2byte | serialization algorithm 1byte | request type 1byte|
    +----------------------------------------------+
    | Message ID 8byte | data length 4byte|
    +----------------------------------------------+
    */
    private short magic; //Magic number - used to verify the identity of the message (2 bytes)
    private byte serialType; //Serialization type (1 byte)
    private byte reqType; //Operation type (1 byte)
    private long requestId; //Request id (8 bytes)
    private int length; //Data length (4 bytes)

}

RpcRequest

@Data
public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    private Object[] params;
    private Class<?>[] parameterTypes;
}

RpcResponse

@Data
public class RpcResponse implements Serializable {

    private Object data;
    private String msg;
}

RpcProtocol

@Data
public class RpcProtocol<T> implements Serializable {
    private Header header;
    private T content;
}

Define related constants

The above message protocol definition involves several enumeration related classes, which are defined as follows

ReqType

Message type

public enum ReqType {

    REQUEST((byte)1),
    RESPONSE((byte)2),
    HEARTBEAT((byte)3);

    private byte code;

    private ReqType(byte code) {
        this.code=code;
    }

    public byte code(){
        return this.code;
    }
    public static ReqType findByCode(int code) {
        for (ReqType msgType : ReqType.values()) {
            if (msgType.code() == code) {
                return msgType;
            }
        }
        return null;
    }
}

SerialType

Serialization type

public enum SerialType {

    JSON_SERIAL((byte)0),
    JAVA_SERIAL((byte)1);

    private byte code;

    SerialType(byte code) {
        this.code=code;
    }

    public byte code(){
        return this.code;
    }
}

RpcConstant

public class RpcConstant {
    //Total bytes of header section
    public final static int HEAD_TOTAL_LEN=16;
    //magic number
    public final static short MAGIC=0xca;
}

Define serialization related implementations

Here are two ways to demonstrate, one is JSON, and the other is Java Native

ISerializer

public interface ISerializer {

    <T> byte[] serialize(T obj);

    <T> T deserialize(byte[] data,Class<T> clazz);

    byte getType();
}

JavaSerializer

public class JavaSerializer implements ISerializer{

    @Override
    public <T> byte[] serialize(T obj) {
        ByteArrayOutputStream byteArrayOutputStream=
                new ByteArrayOutputStream();
        try {
            ObjectOutputStream outputStream=
                    new ObjectOutputStream(byteArrayOutputStream);

            outputStream.writeObject(obj);

            return  byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {
        ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data);
        try {
            ObjectInputStream objectInputStream=
                    new ObjectInputStream(byteArrayInputStream);

            return (T) objectInputStream.readObject();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public byte getType() {
        return SerialType.JAVA_SERIAL.code();
    }
}

JsonSerializer

public class JsonSerializer implements ISerializer{
    @Override
    public <T> byte[] serialize(T obj) {
        return JSON.toJSONString(obj).getBytes();
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {
        return JSON.parseObject(new String(data),clazz);
    }

    @Override
    public byte getType() {
        return SerialType.JSON_SERIAL.code();
    }
}

SerializerManager

Realize the management of serialization mechanism

public class SerializerManager {

    private final static ConcurrentHashMap<Byte, ISerializer> serializers=new ConcurrentHashMap<Byte, ISerializer>();

    static {
        ISerializer jsonSerializer=new JsonSerializer();
        ISerializer javaSerializer=new JavaSerializer();
        serializers.put(jsonSerializer.getType(),jsonSerializer);
        serializers.put(javaSerializer.getType(),javaSerializer);
    }

    public static ISerializer getSerializer(byte key){
        ISerializer serializer=serializers.get(key);
        if(serializer==null){
            return new JavaSerializer();
        }
        return serializer;
    }
}

Define encoding and decoding implementation

Since the message protocol is customized, you need to implement encoding and decoding yourself. The code is as follows

RpcDecoder

@Slf4j
public class RpcDecoder extends ByteToMessageDecoder {


    /*
    +----------------------------------------------+
    | Magic number 2byte | serialization algorithm 1byte | request type 1byte|
    +----------------------------------------------+
    | Message ID 8byte | data length 4byte|
    +----------------------------------------------+
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("==========begin RpcDecoder ==============");
        if(in.readableBytes()< RpcConstant.HEAD_TOTAL_LEN){
            //The message is not long enough and does not need to be parsed
            return;
        }
        in.markReaderIndex();//Mark an index of read data for subsequent reset.
        short magic=in.readShort(); //Read magic
        if(magic!=RpcConstant.MAGIC){
            throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);
        }
        byte serialType=in.readByte(); //Read serialization algorithm type
        byte reqType=in.readByte(); //Request type
        long requestId=in.readLong(); //Request message id
        int dataLength=in.readInt(); //Request data length
        //The number of bytes in the readable area is less than the actual data length
        if(in.readableBytes()<dataLength){
            in.resetReaderIndex();
            return;
        }
        //Read message content
        byte[] content=new byte[dataLength];
        in.readBytes(content);

        //Build header information
        Header header=new Header(magic,serialType,reqType,requestId,dataLength);
        ISerializer serializer=SerializerManager.getSerializer(serialType);
        ReqType rt=ReqType.findByCode(reqType);
        switch(rt){
            case REQUEST:
                RpcRequest request=serializer.deserialize(content, RpcRequest.class);
                RpcProtocol<RpcRequest> reqProtocol=new RpcProtocol<>();
                reqProtocol.setHeader(header);
                reqProtocol.setContent(request);
                out.add(reqProtocol);
                break;
            case RESPONSE:
                RpcResponse response=serializer.deserialize(content,RpcResponse.class);
                RpcProtocol<RpcResponse> resProtocol=new RpcProtocol<>();
                resProtocol.setHeader(header);
                resProtocol.setContent(response);
                out.add(resProtocol);
                break;
            case HEARTBEAT:
                break;
            default:
                break;
        }

    }
}

RpcEncoder

@Slf4j
public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {

    /*
    +----------------------------------------------+
    | Magic number 2byte | serialization algorithm 1byte | request type 1byte|
    +----------------------------------------------+
    | Message ID 8byte | data length 4byte|
    +----------------------------------------------+
    */
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf out) throws Exception {
        log.info("=============begin RpcEncoder============");
        Header header=msg.getHeader();
        out.writeShort(header.getMagic()); //Write magic number
        out.writeByte(header.getSerialType()); //Write serialization type
        out.writeByte(header.getReqType());//Write request type
        out.writeLong(header.getRequestId()); //Write request id
        ISerializer serializer= SerializerManager.getSerializer(header.getSerialType());
        byte[] data=serializer.serialize(msg.getContent()); //serialize
        header.setLength(data.length);
        out.writeInt(data.length); //Write message length
        out.writeBytes(data);
    }
}

NettyServer

Implement NettyServer build.

@Slf4j
public class NettyServer{
    private String serverAddress; //address
    private int serverPort; //port

    public NettyServer(String serverAddress, int serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
    }

    public void startNettyServer() throws Exception {
        log.info("begin start Netty Server");
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new RpcServerInitializer());
            ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
            log.info("Server started Success on Port:{}", this.serverPort);
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            log.error("Rpc Server Exception",e);
        }finally {
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

RpcServerInitializer

public class RpcServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline()
            .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))
            .addLast(new RpcDecoder())
            .addLast(new RpcEncoder())
            .addLast(new RpcServerHandler());
    }
}

RpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        RpcProtocol resProtocol=new RpcProtocol<>();
        Header header=msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        Object result=invoke(msg.getContent());
        resProtocol.setHeader(header);
        RpcResponse response=new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }

    private Object invoke(RpcRequest request){
        try {
            Class<?> clazz=Class.forName(request.getClassName());
            Object bean= SpringBeansManager.getBean(clazz); //Get instance object (CASE)
            Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes());
            return declaredMethod.invoke(bean,request.getParams());
        } catch (ClassNotFoundException | NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

SpringBeansManager

@Component
public class SpringBeansManager implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringBeansManager.applicationContext=applicationContext;
    }

    public static <T> T getBean(Class<T> clazz){
        return applicationContext.getBean(clazz);
    }
}

Note that after this class is built, you need to add compone scan to the main method of the netty RPC provider module for scanning

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  //Modify here
@SpringBootApplication
public class NettyRpcProviderMain {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(NettyRpcProviderMain.class, args);
        new NettyServer("127.0.0.1",8080).startNettyServer();  // Modify here
    }
}

netty-rpc-consumer

Next, start to realize the consumer side

RpcClientProxy

public class RpcClientProxy {
    
    public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){
        return (T) Proxy.newProxyInstance
                (interfaceCls.getClassLoader(),
                        new Class<?>[]{interfaceCls},
                        new RpcInvokerProxy(host,port));
    }
}

RpcInvokerProxy

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

    private String serviceAddress;
    private int servicePort;

    public RpcInvokerProxy(String serviceAddress, int servicePort) {
        this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        log.info("begin invoke target server");
        //Assembly parameters
        RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
        protocol.setHeader(header);
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParams(args);
        protocol.setContent(request);
        //Send request
        NettyClient nettyClient=new NettyClient(serviceAddress,servicePort);
        //Building asynchronous data processing
        RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
        RequestHolder.REQUEST_MAP.put(requestId,future);
        nettyClient.sendRequest(protocol);
        return future.getPromise().get().getData();
    }
}

Define client connections

In the protocol package path of netty RPC protocol module, create NettyClient

@Slf4j
public class NettyClient {
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
    private String serviceAddress;
    private int servicePort;
    public NettyClient(String serviceAddress,int servicePort){
        log.info("begin init NettyClient");
        bootstrap=new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());
        this.serviceAddress=serviceAddress;
        this.servicePort=servicePort;
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol) throws InterruptedException {
        ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync();
        future.addListener(listener->{
            if(future.isSuccess()){
                log.info("connect rpc server {} success.",this.serviceAddress);
            }else{
                log.error("connect rpc server {} failed .",this.serviceAddress);
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();
            }
        });
        log.info("begin transfer data");
        future.channel().writeAndFlush(protocol);
    }
}

RpcClientInitializer

@Slf4j
public class RpcClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        log.info("begin initChannel");
        ch.pipeline()
                .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))
                .addLast(new LoggingHandler())
                .addLast(new RpcEncoder())
                .addLast(new RpcDecoder())
                .addLast(new RpcClientHandler());
    }
}

RpcClientHandler

It should be noted that the communication process of Netty is based on inbound and outbound separation, so we need to complete it with a Future object when obtaining results.

@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {
        log.info("receive rpc server result");
        long requestId=msg.getHeader().getRequestId();
        RpcFuture<RpcResponse> future=RequestHolder.REQUEST_MAP.remove(requestId);
        future.getPromise().setSuccess(msg.getContent()); //Return results
    }
}

Implementation of Future

Add the rpcFuture implementation in the netty RPC protocol module

RpcFuture

@Data
public class RpcFuture<T> {
    //Promise is a writable future. Future itself does not have an interface related to write operations,
    // Netty extends Future through Promise to set the results of IO operations
    private Promise<T> promise;

    public RpcFuture(Promise<T> promise) {
        this.promise = promise;
    }
}

RequestHolder

Save the corresponding results of requestid and future

public class RequestHolder {

    public static final AtomicLong REQUEST_ID=new AtomicLong();

    public static final Map<Long,RpcFuture> REQUEST_MAP=new ConcurrentHashMap<>();
}

Students who need source code, please pay attention to the official account [following the Mic learning framework], and reply to the keyword [rpc].

Copyright notice: unless otherwise stated, all articles on this blog adopt CC BY-NC-SA 4.0 license agreement. Reprint please indicate from Mic to take you to learn architecture!
If this article is helpful to you, please pay attention and praise. Your persistence is the driving force of my continuous creation. Welcome to WeChat public official account for more dry cargo.

Keywords: Java

Added by dekeb55 on Wed, 17 Nov 2021 04:21:20 +0200