Hand in hand to teach you to write an RPC framework

preface

I've been reading the source code of dubbo recently, so I refer to this book as an aid (this Yiji always makes me see Jiyi, um, Jiyi crazy warrior):

dubbo has 300000 lines of code in version 3.0. Just looking at the code of a core function is killing people. In order to strengthen understanding, I think it is a very good way to implement an rpc framework.

This article will focus on the basic functions of RPC. It is mainly about RPC. I will explain the implementation of SPI, registry, load balancing and netty information transmission in the next article.

Opening

There are many RPC frameworks on the market. Although there are many kinds, they focus on the same central idea. Let's look at the following book:

This figure shows the calling process of RPC: transfer the information that the client needs to call to the server. Some common information of each service call includes service call interface, method name, method parameter type and method parameter value. When transmitting the method parameter value, you need to serialize the object and transmit it to the server through the network, On the server side, you need to do another deserialization according to the client serialization order to read the information, then assemble it into a request object for service reflection call, and finally pass the call result to the client.

So to sum up, the whole RPC call process can be divided into four parts:

  1. Service provider: the service side, which needs to register the services that can be referenced in the registry for consumers to use.

  2. Service consumer: that is, the client, who can obtain the services he needs from the registry. It should be noted here that provider and consumer are only relative concepts. A service can be both a provider and a consumer.

  3. Registry: save the services provided by the provider. For example, the default registry in dubbo is zookeeper, but it also supports other middleware as the registry, such as redis and nacos. In the next article, I'll take zookeeper as the registry for an example.

  1. Monitoring center: it can monitor the response time of the interface and count the number of requests. This is not a necessity and can be omitted.

realization

In fact, to implement basic RPC, the above is enough. Let's look at the implementation directly. In order to know more about the parts of the whole call, I divided the whole call process into blocks:

If you want to see the result of RPC call, you can not divide modules like me, just copy my next code.

api module: the module that stores the interface, which is convenient for the provider to implement the interface.

public interface DemoService {
    String sendAndGetMessage(String message);
}

Common: it mainly stores some entity classes required in this process:

Invocation: because our client needs to send information to the server, we need to encapsulate the information:

public class Invocation implements Serializable {
  //Serialization is required because of network transmission.
    //Interface fully qualified name
    private String className;
    //Method name
    private String methodName;
    //Method parameter type
    private Class<?>[] paramTypes;
    //Method parameters
    private Object[] params;

    /*getter and setter*/

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?>[] getParamTypes() {
        return paramTypes;
    }

    public void setParamTypes(Class<?>[] paramTypes) {
        this.paramTypes = paramTypes;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }
}

URL: because network transmission is required, the requested ip and port need to be saved:

public class URL implements Serializable {
    private String hostname;
    private Integer port;

    public URL(String hostname, Integer port) {
        this.hostname = hostname;
        this.port = port;
    }

    public String getHostname() {
        return hostname;
    }

    public void setHostname(String hostname) {
        this.hostname = hostname;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }
}

Define return information:

public class RpcResponse implements Serializable {
    //abnormal
    private Throwable error;
    //Call result
    private Object result;

    /*getter and setter*/

    public Throwable getError() {
        return error;
    }

    public void setError(Throwable error) {
        this.error = error;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

Framework: the function class of the whole delivery process.

RpcConsumer: consumer processing logic

public class RpcConsumer {
    public Object execute(Invocation invocation, String host, int port) throws Throwable {
        Socket server = new Socket(host, port);
        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        try{
            //Write the request to the output stream of the socket connected to the server
            oos = new ObjectOutputStream(server.getOutputStream());
            oos.writeObject(invocation);
            oos.flush();

            //Read the contents of the input stream
            ois = new ObjectInputStream(server.getInputStream());
            Object res = ois.readObject();
            RpcResponse response = (RpcResponse) res;

            return response.getResult();

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw e;
        } finally {
            if (ois!=null)ois.close();
            if (oos != null) oos.close();
            if (server != null) server.close();
        }
    }
}

Processing logic of RpcProvider provider:

public class RpcProvider {
    public void start(int port, Map<String, Object> services){
        ServerSocket server = null;
        try {
            //1. Create socket connection
            server = new ServerSocket(port);
            //2. Get all service classes
//            Map<String,Object> services = getService(clazz);


            //3. Create thread pool

            Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() {
            });
            while(true){
                //4. Get client connection
                Socket client = server.accept();
                //5. Put the service called by the server into the thread pool for asynchronous execution
                RpcProviderHandler service = new RpcProviderHandler(client,services);
                executor.execute(service);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (server!=null){
                try {
                    server.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

public class RpcProviderHandler implements Runnable {
    private Socket clientSocket;
    private Map<String,Object> serviceMap;
    public RpcProviderHandler(Socket client, Map<String, Object> services) {
        this.clientSocket = client;
        this.serviceMap = services;
    }

    @Override
    public void run() {
        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        RpcResponse response = new RpcResponse();
        try{
            ois = new ObjectInputStream(clientSocket.getInputStream());
            oos = new ObjectOutputStream(clientSocket.getOutputStream());

            //Deserialization
            Object object = ois.readObject();
            Invocation invocation = invocation = (Invocation) object;


            //Find and execute services
            Class<?> clazz = (Class<?>) serviceMap.get(invocation.getClassName());
            Method method = clazz.getMethod(invocation.getMethodName(),invocation.getParamTypes());
            Object result = method.invoke(clazz.newInstance(),invocation.getParams());

            response.setResult(result);
            oos.writeObject(response);
            oos.flush();

        } catch (Exception  e) {
            if (oos != null) {
                response.setError(e);
                try {
                    oos.writeObject(response);
                    oos.flush();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        } finally {
            try {
            if (ois!=null) ois.close();
            if (oos != null) oos.close();
            if (clientSocket != null) clientSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

A client proxy class can be used without writing the method name, parameter type, etc. in the code:

public class RpcProviderProxy {
    Logger log= LoggerFactory.getLogger(RpcProviderProxy.class);
    private URL url;

    public RpcProviderProxy(URL url) {
        this.url=url;
    }

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Class<T> clazz){
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                log.info("Some pre-processing of execution methods can be done");
                Invocation invocation = new Invocation();
                invocation.setClassName(method.getDeclaringClass().getName());
                invocation.setMethodName(method.getName());
                invocation.setParamTypes(method.getParameterTypes());
                invocation.setParams(args);
                Object result = new RpcConsumer().execute(invocation, url.getHostname(), url.getPort());
                //Function enhancement, such as recording pipeline information
                log.info("After the method in reflection is executed, the returned result is:"+result);
                log.info("You can do some post-processing of execution methods");
                return result;
            }
        });
    }

}

OK, then we just need another client test class and server test class:

public class ConsumerMain {
    public static void main(String[] args) {
        URL url=new URL("127.0.0.1", 7777);
        RpcProviderProxy clientProxy = new RpcProviderProxy(url);

        //proxy class
        DemoService proxy = clientProxy.getProxy(DemoService.class);
        System.out.println(proxy.sendAndGetMessage("Test send message"));
    }
}

public class ProviderMain {
    public static void main(String[] args) {
        Map<String,Object> services = new HashMap<>();
        services.put(DemoService.class.getName(), DemoServiceImpl.class);

        RpcProvider server= new RpcProvider();
        server.start(7777,services);
    }
}
public class DemoServiceImpl implements DemoService {
    Logger log=LoggerFactory.getLogger(DemoServiceImpl.class);
    @Override
    public String sendAndGetMessage(String message) {
        log.info("The method of the client is executed. The parameters are:"+message);
        return message;
    }
}

In this way, we have completed a simple RPC. run to see the effect. Remember to start the server (ProviderMain) and then the client (ConsumerMain)


Ok, perfect. This example can be run by directly copying the above code. As for SPI and registry, we will implement them in the next article. It may be as long as parsing the Springboot startup process before.

Keywords: Dubbo rpc Middleware

Added by Elhcay on Wed, 24 Nov 2021 03:18:24 +0200