Manually provide an RPC service through Netty and ZooKeeper

  • explain
    • Project link
    • What does the microservice framework include?
    • How to implement RPC remote call?
    • Open source RPC framework
      • Restricted language
      • Cross language RPC framework
  • Build ZooKeeper with local Docker
    • Download Image
    • Start container
    • View container log
  • RPC interface
  • Netty RPC server
    • Interface implementation
    • Service startup
    • Registration service
    • ZooKeeper implementation
  • Netty RPC Client
    • Create proxy
    • Remote call
  • Codec
    • RpcDecoder
    • RpcEncoder
  • RpcServerInboundHandler
  • Path of Server in ZooKeeper
  • explain
  • Reference link

explain

Coupon m.fenfaw net

Use Netty, ZooKeeper and Spring Boot to create a microservice framework.

Project link

GitHub source address

What does the microservice framework include?

For details, please refer to RPC practice and principle

The project can be divided into caller (client) and provider (server). The client only needs to call the interface. The final call information will be transmitted to the server through the network. The server calls the corresponding method through decoding and reflection, and returns the result to the client through the network. For the client side, you can completely ignore the existence of the network and call the rpc service just like calling the local method.

The model structure of the whole project is as follows:

How to implement RPC remote call?

  • How to establish network connection between client and server: HTTP and Socket
  • How the server handles requests: NIO (using Netty)
  • What protocol is used for data transmission
  • How to serialize and deserialize data: JSON, PB, Thrift

Open source RPC framework

Restricted language

  • Dubbo: Java, Ali
  • Motan: Java, Weibo
  • Tars: C + +, Tencent (multi language supported)
  • Spring Cloud: Java
    • Gateway Zuul
    • Registration Center Eureka
    • Service timeout: Hystrix
    • Call chain monitoring Sleuth
    • Log analysis ELK

Cross language RPC framework

  • gRPC: HTTP/2
  • Thrift: TCP

Build ZooKeeper with local Docker

Download Image

Download the image and start ZooKeeper. See details https://hub.docker.com/_/zookeeper

Start container

The startup command is as follows. The name of the container is zookeeper RPC demo, and ports 8080, 2181, 2888 and 3888 are exposed to the machine at the same time:

docker run --name zookeeper-rpc-demo --restart always -p 8080:8080 -p 2181:2181 -p 2888:2888 -p 3888:3888  -d zookeeper
This image includes EXPOSE 2181 2888 3888 8080 (the zookeeper client port, follower port, election port, AdminServer port respectively), so standard container linking will make it automatically available to the linked containers. Since the Zookeeper "fails fast" it's better to always restart it.

View container log

You can enter the container through the following command, where fb6f95cde6ba is the Docker ZooKeeper container id of my machine.

docker exec -it fb6f95cde6ba /bin/bash

In the container, enter the directory: / apache-zookeeper-3.7.0-bin/bin and execute the command zkcli SH - server 0.0.0.0:2181 link zk service.

RPC interface

This example provides two interfaces: HelloService and HiService, in which there is an interface method respectively. The client only needs to refer to RPC sample API, only knows the interface definition, and does not know the specific implementation.

public interface HelloService {
    String hello(String msg);
}
public interface HiService {
    String hi(String msg);
}

Netty RPC server

Start a Server service, implement the above two RPC interfaces, and register the service with ZooKeeper.

Interface implementation

/**
 * @author yano
 * GitHub Project: https://github.com/LjyYano/Thinking_in_Java_MindMapping
 * @date 2021-05-07
 */
@RpcServer(cls = HelloService.class)
public class HelloServiceImpl implements HelloService {

    @Override
    public String hello(String msg) {
        return "hello echo: " + msg;
    }
}
/**
 * @author yano
 * GitHub Project: https://github.com/LjyYano/Thinking_in_Java_MindMapping
 * @date 2021-05-07
 */
@RpcServer(cls = HiService.class)
public class HiServiceImpl implements HiService {

    public String hi(String msg) {
        return "hi echo: " + msg;
    }
}

There are two issues involved:

  1. Server should decide which interface implementations to register with ZooKeeper?
  2. What should be the path of HelloServiceImpl and HiService in ZooKeeper?

Service startup

This sample Server uses Spring Boot, but we don't need to start a web service, just keep the background running, so set the web to webapplicationtype NONE

@SpringBootApplication
public class RpcServerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RpcServerApplication.class)
                .web(WebApplicationType.NONE)
                .run(args);
    }
}

Registration service

NettyApplicationContextAware is an implementation class of ApplicationContextAware. When the program starts, it registers the implementation class with RpcServer (explained below) annotation on ZooKeeper.

@Component
public class NettyApplicationContextAware implements ApplicationContextAware {

    private static final Logger logger = LoggerFactory.getLogger(NettyApplicationContextAware.class);

    @Value("${zk.address}")
    private String zookeeperAddress;

    @Value("${zk.port}")
    private int zookeeperPort;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> rpcBeanMap = new HashMap<>();
        for (Object object : applicationContext.getBeansWithAnnotation(RpcServer.class).values()) {
            rpcBeanMap.put("/" + object.getClass().getAnnotation(RpcServer.class).cls().getName(), object);
        }
        try {
            NettyServer.start(zookeeperAddress, zookeeperPort, rpcBeanMap);
        } catch (Exception e) {
            logger.error("register error !", e);
        }
    }
}

The RpcServer annotation is defined as follows:

@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.TYPE)
@Component
public @interface RpcServer {

    /**
     * Interface class for interface registration
     */
    Class<?> cls();

}

applicationContext.getBeansWithAnnotation(RpcServer.class).values() is to get the class with rpcserver annotation in the project and put it into an rpcBeanMap, where key is the path to be registered in ZooKeeper. Note that the path uses the name of the interface, not the name of the class.

The advantage of using annotations is that Server A can only provide HelloService and Server B can only provide HiService, which will not affect each other and is more flexible.

Service registration is mainly on COM yano. server. In nettyserver #start.

public class NettyServer {

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    public static void start(String ip, int port, Map<String, Object> params) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.pipeline()
                                    .addLast(new RpcDecoder(Request.class))
                                    .addLast(new RpcEncoder(Response.class))
                                    .addLast(new RpcServerInboundHandler(params));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(ip, port).sync();
            if (future.isSuccess()) {
                params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));
            }
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

This class is used to:

  1. Start a Socket service through Netty, and the port number is passed in through parameters
  2. Register the above interface implementation in ZooKeeper
params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));

ZooKeeper implementation

The main task is to maintain zk connection and register the ip and port of the Server in the corresponding ZooKeeper. Ephemeral node is used here, so that ZooKeeper can automatically delete the node after the Server loses its connection offline, so that the Client will not get the offline Server address.

public class ZooKeeperOp {

    private static final String zkAddress = "localhost:2181";
    private static final ZkClient zkClient = new ZkClient(zkAddress);

    public static void register(String serviceName, String serviceAddress) {
        if (!zkClient.exists(serviceName)) {
            zkClient.createPersistent(serviceName);
        }
        zkClient.createEphemeral(serviceName + "/" + serviceAddress);
        System.out.printf("create node %s 
", serviceName + "/" + serviceAddress);
    }

    public static String discover(String serviceName) {
        List<String> children = zkClient.getChildren(serviceName);
        if (CollectionUtils.isEmpty(children)) {
            return "";
        }
        return children.get(ThreadLocalRandom.current().nextInt(children.size()));
    }
}

Netty RPC Client

Netty RPC Client mainly calls the above two interfaces like calling local methods to verify that it can return normally.

public class RpcClientApplication {

    public static void main(String[] args) {
        HiService hiService = RpcProxy.create(HiService.class);
        String msg = hiService.hi("msg");
        System.out.println(msg);

        HelloService helloService = RpcProxy.create(HelloService.class);
        msg = helloService.hello("msg");
        System.out.println(msg);
    }
}

Run the above code, and the final console will output:

hi echo: msg
hello echo: msg

Create proxy

HiService hiService = RpcProxy.create(HiService.class);
String msg = hiService.hi("msg");

Client needs to go through COM yano. Rpcproxy#create creates a proxy, and then you can call the hi method of hiService.

public class RpcProxy {

    public static <T> T create(final Class<?> cls) {
        return (T) Proxy.newProxyInstance(cls.getClassLoader(), new Class<?>[] {cls}, (o, method, objects) -> {

            Request request = new Request();
            request.setInterfaceName("/" + cls.getName());
            request.setRequestId(UUID.randomUUID().toString());
            request.setParameter(objects);
            request.setMethodName(method.getName());
            request.setParameterTypes(method.getParameterTypes());

            Response response = new NettyClient().client(request);
            return response.getResult();
        });
    }
}

If the Server side wants to call the method requested by the Client side through reflection, at least:

  1. Class name interfaceName
  2. Method namemethodname
  3. Parameter type class <? > [] parameterTypes
  4. Pass in the parameter Object parameter []
@Data
public class Request {

    private String requestId;
    private String interfaceName;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object parameter[];

}

Remote call

Finally, it is called remotely through the following code, where request contains all the information of the calling method.

Response response = new NettyClient().client(request);
/**
 * @author yano
 * GitHub Project: https://github.com/LjyYano/Thinking_in_Java_MindMapping
 * @date 2021-05-07
 */
public class NettyClient extends SimpleChannelInboundHandler<Response> {

    private Response response;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) {
        this.response = response;
    }

    public Response client(Request request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // Create and initialize the Netty client Bootstrap object
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) {
                            channel.pipeline()
                                    .addLast(new RpcDecoder(Response.class))
                                    .addLast(new RpcEncoder(Request.class))
                                    .addLast(NettyClient.this);
                        }
                    });

            // Connect to RPC server
            String[] discover = ZooKeeperOp.discover(request.getInterfaceName()).split(":");
            ChannelFuture future = bootstrap.connect(discover[0], Integer.parseInt(discover[1])).sync();

            // Write RPC request data and close the connection
            Channel channel = future.channel();
            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();

            return response;
        } finally {
            group.shutdownGracefully();
        }
    }

}

This code is the core and mainly does two things:

  • Request ZooKeeper and find the Server address under the corresponding node. If there are multiple service providers, zookeeperop Discover will randomly return the Server address
  • Establish a Socket connection with the obtained Server address, request and wait for the return

Codec

channel.pipeline()
    .addLast(new RpcDecoder(Response.class))
    .addLast(new RpcEncoder(Request.class))
    .addLast(NettyClient.this);

Both Client and Server need to encode and decode Request and Response. This example uses the simplest Json format. Netty's message encoding and decoding will not be explained in detail. The specific codes are as follows.

RpcDecoder

RpcDecoder is a ChannelInboundHandler that decodes the Response on the Client side.

public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {

    private final Class<?> genericClass;

    public RpcDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
        if (msg.readableBytes() < 4) {
            return;
        }
        msg.markReaderIndex();
        int dataLength = msg.readInt();
        if (msg.readableBytes() < dataLength) {
            msg.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        msg.readBytes(data);

        out.add(JSON.parseObject(data, genericClass));
    }
}

RpcEncoder

Rpcoder is a ChannelOutboundHandler that encodes requests on the Client side.

public class RpcEncoder extends MessageToByteEncoder {

    private final Class<?> genericClass;

    public RpcEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
        if (genericClass.isInstance(msg)) {
            byte[] data = JSON.toJSONBytes(msg);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

RpcServerInboundHandler

This is the core of Server reflection call, which will be explained separately here. When the Netty Server is started, RpcServerInboundHandler has been added to the pipeline.

socketChannel.pipeline()
    .addLast(new RpcDecoder(Request.class))
    .addLast(new RpcEncoder(Response.class))
    .addLast(new RpcServerInboundHandler(params));
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Request request = (Request) msg;
    logger.info("request data {}", JSON.toJSONString(request));

    // jdk reflection call
    Object bean = handle.get(request.getInterfaceName());
    Method method = bean.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
    method.setAccessible(true);
    Object result = method.invoke(bean, request.getParameter());

    Response response = new Response();
    response.setRequestId(request.getRequestId());
    response.setResult(result);

    // After receiving the information, the client actively closes the connection
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

Path of Server in ZooKeeper

The output after Server startup is as follows:

There are two lines of log:

create node /com.yano.service.HelloService/127.0.0.1:3000 
create node /com.yano.service.HiService/127.0.0.1:3000 

Check the node in ZooKeeper and find that the service has been registered.

[zk: 0.0.0.0:2181(CONNECTED) 0] ls /com.yano.service.HelloService
[127.0.0.1:3000]
[zk: 0.0.0.0:2181(CONNECTED) 1] ls /com.yano.service.HiService
[127.0.0.1:3000]

explain

Use Netty, ZooKeeper and Spring Boot to create a microservice RPC framework. This demo can only be used as an example. Manual implementation can deepen understanding. It should not be used in production environment.

The code of this article can be found in the GitHub source code address. Welcome to star and fork.

Reference link

https://github.com/yanzhenyidai/netty-rpc-example

Added by VFRoland on Thu, 17 Feb 2022 12:30:06 +0200