The battle of Netty

Basic introduction to RPC

Friendly note: Dubbo's RPC calling process document: https://www.cnblogs.com/caoxb/p/13140287.html

1) RPC (Remote Procedure Call) - Remote Procedure Call, which is a computer communication protocol. The protocol allows a program running on one computer to call a subroutine of another computer without extra programming for this interaction

2) Two or more applications are distributed on different servers, and the calls between them are like local method calls (as shown in the figure)

Basic introduction to RPC

3) Common RPC frameworks include: well-known ones such as Dubbo of Ali, gRPC of google, rpcx of Go, thrift of Apache, and Spring Cloud of Spring

RPC call process

Description of calling process in the figure above:

1) The service consumer (client) invokes the service locally

2) After receiving the call, the client stub is responsible for encapsulating the methods and parameters into a message body capable of network transmission

3) The client stub encodes the message and sends it to the server

4) The server stub decodes the message after receiving it

5) The server stub calls the local service according to the decoding result

6) The local service executes and returns the result to the server stub

7) The server stub encodes the returned import results and sends them to the consumer

8) The client stub receives the message and decodes it

9) The service consumer gets the result

Summary: the goal of RPC is to encapsulate 2-8 these steps. Users do not need to care about these details and can complete remote service calls like calling local methods

Implement Dubbo RPC (based on Netty)

Code flow chart:

Code package correspondence

publicinterface package

package com.atguigu.netty.dubborpc.publicinterface;

//This is an interface required by both service providers and service consumers
public interface HelloService {

    String hello(String mes);
}

provider package

HelloServiceImpl class

package com.atguigu.netty.dubborpc.provider;

import com.atguigu.netty.dubborpc.publicinterface.HelloService;

public class HelloServiceImpl implements HelloService{

    private static int count = 0;
    //When a consumer calls this method, a result is returned
    @Override
    public String hello(String mes) {
        System.out.println("Client message received=" + mes);
        //Different results are returned according to mes
        if(mes != null) {
            return "Hello, client, I have received your message [" + mes + "] The first" + (++count) + " second";
        } else {
            return "Hello, client, I have received your message ";
        }
    }
}

ServerBootstrap class

package com.atguigu.netty.dubborpc.provider;

import com.atguigu.netty.dubborpc.netty.NettyServer;

//Server bootstrap will start a service provider, that is, NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {

        //Code filling
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

netty package

Client services

Tips: learn java proxy: https://www.cnblogs.com/cenyu/p/6289209.html

package com.atguigu.netty.dubborpc.netty;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {

    //Create thread pool
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //The writing method uses the proxy mode to obtain a proxy object

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) get into...." + (++count) + " second");
                    //{} part of the code, the client will enter the code every time it calls hello
                    if (client == null) {
                        initClient();
                    }

                    //Set the information to be sent to the server
                    //providerName protocol header args[0] is the client calling api hello(??), The parameter args[0] is the parameter passed in by the Hello method
                    client.setPara(providerName + args[0]);

                    //
                    return executor.submit(client).get();

                });
    }

    //Initialize client
    private static void initClient() {
        client = new NettyClientHandler();
        //Create EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Client Handler

Friendly note: there is a call method. When the client calls the method, the message is sent to the server through call. The call method is called through the reflection mechanism. After the call method is called, the message will not return immediately, so wait with await. The server sends the message to the channelread method of the client handler, Adding notify to this method can wake up the call method and let its call method continue to execute. Channelread and call methods are synchronous, so they need to be locked

package com.atguigu.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//context
    private String result; //Returned results
    private String para; //The parameters passed in when the client calls the method


    //After the connection with the server is created, it will be called. This method is the first to be called (1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive Called  ");
        context = ctx; //Because we will use ctx in other methods
    }

    //After receiving the data from the server, call the method (4).
    //
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead Called  ");
        result = msg.toString();
        notify(); //Wake up waiting threads
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //Called by the proxy object, send data to the server, - > wait - > wait to be awakened (channelread) - > return result (3) - "5"
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 Called  ");
        context.writeAndFlush(para);
        //wait
        wait(); //Wake up after waiting for the channelRead method to obtain the results of the server
        System.out.println(" call2 Called  ");
        return  result; //Results returned by the service provider

    }
    //(2)
    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

Server

package com.atguigu.netty.dubborpc.netty;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {


    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //Write a method to complete the initialization and startup of NettyServer

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //Service processor

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("Service provider starts providing services~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

Server handler

package com.atguigu.netty.dubborpc.netty;


import com.atguigu.netty.dubborpc.customer.ClientBootstrap;
import com.atguigu.netty.dubborpc.provider.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

//The server handler is relatively simple
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Get the message sent by the client and call the service
        System.out.println("msg=" + msg);
        //When the client calls the server api, we need to define a protocol
        //For example, we require that every time we send a message, it must start with a string, "Hello service #hello# hello"
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

customer package

package com.atguigu.netty.dubborpc.customer;

import com.atguigu.netty.dubborpc.netty.NettyClient;
import com.atguigu.netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {


    //The protocol header is defined here
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //Create a consumer
        NettyClient customer = new NettyClient();

        //Create proxy object
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;; ) {
            Thread.sleep(2 * 1000);
            //Invoking a service provider's method (service) through a proxy object
            String res = service.hello("Hello dubbo~");
            System.out.println("Result of call res= " + res);
        }
    }
}

Code core description:

client:

Friendly tips: return executor submit(client). get() is to get the return value of the call method.

Client's handler

Server Handler:

Keywords: Java

Added by wardo on Tue, 04 Jan 2022 20:26:16 +0200