21 - handwritten Dubbo RPC based on Netty

Basic introduction to RPC

  1. RPC(Remote Procedure Call) -- remote procedure call is a computer communication protocol that allows a program running on one computer to call a subroutine of another computer without additional programming for this interactive procedure
  2. Two or more applications are distributed on different servers, and the calls between them are like local method calls (as shown in the figure)
  1. Common RPC frameworks include: Alibaba's Dubbo,Nacos,Google's gRpc, Go's rpcx, Apache's thrift, and spring's spring cloud (Eureka)

RPC call flow chart

Description of terms:

In RPC, the Client side is called a service consumer and the Server side is called a service provider

RPC call process description

  1. The service consumer (Client) invokes the service locally
  2. After receiving the call, ClientStub is responsible for encapsulating the method and parameters into a message body that can be transmitted over the network
  3. ClientStub encodes the message and sends it to the server
  4. ServerStub decodes the message after receiving it
  5. ServerStub calls the local service according to the decoding result
  6. The local service executes and returns the result to ServerStub
  7. ServerStub encodes the returned result and sends it to the consumer
  8. ClientSub receives the message and decodes it
  9. The service consumer gets the result

Summary: the goal of RPC is to encapsulate 2-8 these steps. Users do not need to care about these details. They can complete the service call of remote services like calling local methods

Implementation of Dubbo RPC based on Netty

Requirement description

  1. The bottom layer of Dubbo uses Netty as the network communication framework, and requires Netty to implement a simple RPC framework
  2. Imitating Dubbo, the consumer and the provider agree on the interface and protocol. The consumer remotely calls the provider service, the provider returns a string, the consumer prints the data returned by the provider, and the underlying communication uses Netty 4.1.20

Design description

  1. Create an interface and define abstract methods for the contract between consumers and providers
  2. Create a provider that needs to listen to consumer requests and return data as agreed
  3. To create a consumer, this class needs to transparently call methods that do not exist, and internally use Netty to request the provider to return data
  4. Development analysis diagram

code implementation

Define interface

package com.dance.netty.netty.dubbo.common.api;

/**
 * Service interface for providing external services
 */
public interface HelloService {

    String printHello(String msg);

}

Create a new interface implementation class (as a provider)

package com.dance.netty.netty.dubbo.provider.service.impl;

import com.dance.netty.netty.dubbo.common.api.HelloService;

public class HelloServiceImpl implements HelloService {

    private static final HelloService helloService = new HelloServiceImpl();

    @Override
    public String printHello(String msg) {
        System.out.println("Parameter received: " + msg);
        return "hello msg!";
    }

    private HelloServiceImpl(){}

    public static HelloService getInstance(){
        return helloService;
    }
}

New NettyServer

package com.dance.netty.netty.dubbo.common.netty;

import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.nio.charset.StandardCharsets;

public class NettyServer {

    public static void startServer0() {
        startServer0("127.0.0.1", 7000);
    }

    public static void startServer0(String hostname, int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // Use string codec
                            ch.pipeline()
                                    .addLast(new StringDecoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture sync = serverBootstrap.bind(hostname, port).sync();
            System.out.println("netty server is starting, ip: " + hostname + ", port: " + port);
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

New NettyServerHandler

package com.dance.netty.netty.dubbo.common.netty;

import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Someone registered");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String msgStr = msg.toString();
        // Get the data sent by the client
        System.out.println("msg is " + msgStr);
        // When the client calls the API of the server, we can customize a protocol
        // For example, we require that every message sent must start with a fixed format
        // Service name # method name # parameter
        // For example: HelloService#printHello#this is msg
        if(msgStr.startsWith("HelloService#printHello#")){
            // Remove the protocol header when passing in parameters
            ctx.writeAndFlush(HelloServiceImpl.getInstance().printHello(msgStr.substring(msgStr.lastIndexOf('#') + 1)));
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

Create a new ServerBootstrap

package com.dance.netty.netty.dubbo.provider.server;

import com.dance.netty.netty.dubbo.common.netty.NettyServer;

/**
 * Service startup class
 */
public class ServerBootstrap {

    public static void main(String[] args) {
        NettyServer.startServer0();
    }

}

New NettyClientHandler

package com.dance.netty.netty.dubbo.common.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.SocketAddress;
import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> {

    /**
     * Context object
     */
    private  ChannelHandlerContext context;

    /**
     * Return result of remote call
     */
    private String result;

    public void setParams(String params) {
        this.params = params;
    }

    /**
     * Parameters passed in when the client calls the method
     */
    private String params;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // It will be used in other methods
        context = ctx;
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        // Wake up the thread waiting on this method
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    /**
     * Called by the proxy object, send data to the server = > wait = > wait for the read wake-up (that is, when the server has data return) = > return the result
     *  In fact, it turns asynchronous wait and notify into synchronous wait
     * @return Return results
     * @throws Exception Thread exception
     */
    @Override
    public synchronized String call() throws Exception {
        // Send parameters to server
        context.writeAndFlush(params);
        // Wait for read to read the data
        wait();
        // Return the data of the response
        return result;
    }
}

New NettyClient

package com.dance.netty.netty.dubbo.common.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;



public class NettyClient {

    /**
     * Create thread pool
     */
    private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    /**
     * Processor for performing remote calls
     */
    private static NettyClientHandler nettyClientHandler;

    // The writing method uses the proxy mode to obtain a proxy object
    public Object getBean(final Class<?> serviceClass, final String protocol){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {
            if (nettyClientHandler == null) {
                initClient();
            }
            // Set the information to be sent to the server
            // Adopt the format of protocol header + parameter [0]
            nettyClientHandler.setParams(protocol + args[0]);
            // Thread pool submits a task
            return executorService.submit(nettyClientHandler).get();
        });
    }



    private static void initClient(){
        nettyClientHandler = new NettyClientHandler();
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast("stringDecoder", new StringDecoder())
                                    .addLast("stringEncoder", new StringEncoder())
                                    .addLast("nettyClientHandler", nettyClientHandler);
                        }
                    });
            bootstrap.connect("127.0.0.1", 7000).sync();
            // Note that the Client side cannot be blocked
//            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Don't close the key
//            eventExecutors.shutdownGracefully();
        }
    }



}

Create a new ClientBootstrap

package com.dance.netty.netty.dubbo.consumer.client;

import com.dance.netty.netty.dubbo.common.api.HelloService;
import com.dance.netty.netty.dubbo.common.netty.NettyClient;

public class ClientBootstrap {

    private static final String PROTOCOL = "HelloService#printHello#";

    public static void main(String[] args) {

        // Create a consumer
        NettyClient nettyClient = new NettyClient();

        // Create proxy object
        HelloService helloService = (HelloService) nettyClient.getBean(HelloService.class, PROTOCOL);

        // Call the method (service) of the service provider through the proxy object
        String result = helloService.printHello("hi dubbo rpc");

        System.out.println("Call method execution and return results: " + result);
    }
}

test

Server side

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
netty server is starting, ip: 127.0.0.1, port: 7000
msg is HelloService#printHello#hi dubbo rpc
 Parameter received: hi dubbo rpc

Client side

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Call method execution and return results: hello msg!

We only call the interface provided by the provider through the interface, and realize the remote call of RPC through Netty

Netty has finished writing here. If you find a good video to explain netty's source code later, come back and write it. Next, let's take a look at the data structure and algorithm

Added by arundathi on Fri, 18 Feb 2022 12:40:10 +0200