Basic introduction to RPC
Friendly note: Dubbo's RPC calling process document: https://www.cnblogs.com/caoxb/p/13140287.html
1) RPC (Remote Procedure Call) - Remote Procedure Call, which is a computer communication protocol. The protocol allows a program running on one computer to call a subroutine of another computer without extra programming for this interaction
2) Two or more applications are distributed on different servers, and the calls between them are like local method calls (as shown in the figure)
Basic introduction to RPC
3) Common RPC frameworks include: well-known ones such as Dubbo of Ali, gRPC of google, rpcx of Go, thrift of Apache, and Spring Cloud of Spring
RPC call process
Description of calling process in the figure above:
1) The service consumer (client) invokes the service locally
2) After receiving the call, the client stub is responsible for encapsulating the methods and parameters into a message body capable of network transmission
3) The client stub encodes the message and sends it to the server
4) The server stub decodes the message after receiving it
5) The server stub calls the local service according to the decoding result
6) The local service executes and returns the result to the server stub
7) The server stub encodes the returned import results and sends them to the consumer
8) The client stub receives the message and decodes it
9) The service consumer gets the result
Summary: the goal of RPC is to encapsulate 2-8 these steps. Users do not need to care about these details and can complete remote service calls like calling local methods
Implement Dubbo RPC (based on Netty)
Code flow chart:
Code package correspondence
publicinterface package
package com.atguigu.netty.dubborpc.publicinterface; //This is an interface required by both service providers and service consumers public interface HelloService { String hello(String mes); }
provider package
HelloServiceImpl class
package com.atguigu.netty.dubborpc.provider; import com.atguigu.netty.dubborpc.publicinterface.HelloService; public class HelloServiceImpl implements HelloService{ private static int count = 0; //When a consumer calls this method, a result is returned @Override public String hello(String mes) { System.out.println("Client message received=" + mes); //Different results are returned according to mes if(mes != null) { return "Hello, client, I have received your message [" + mes + "] The first" + (++count) + " second"; } else { return "Hello, client, I have received your message "; } } }
ServerBootstrap class
package com.atguigu.netty.dubborpc.provider; import com.atguigu.netty.dubborpc.netty.NettyServer; //Server bootstrap will start a service provider, that is, NettyServer public class ServerBootstrap { public static void main(String[] args) { //Code filling NettyServer.startServer("127.0.0.1", 7000); } }
netty package
Client services
Tips: learn java proxy: https://www.cnblogs.com/cenyu/p/6289209.html
package com.atguigu.netty.dubborpc.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NettyClient { //Create thread pool private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; //The writing method uses the proxy mode to obtain a proxy object public Object getBean(final Class<?> serivceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serivceClass}, (proxy, method, args) -> { System.out.println("(proxy, method, args) get into...." + (++count) + " second"); //{} part of the code, the client will enter the code every time it calls hello if (client == null) { initClient(); } //Set the information to be sent to the server //providerName protocol header args[0] is the client calling api hello(??), The parameter args[0] is the parameter passed in by the Hello method client.setPara(providerName + args[0]); // return executor.submit(client).get(); }); } //Initialize client private static void initClient() { client = new NettyClientHandler(); //Create EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } } ); try { bootstrap.connect("127.0.0.1", 7000).sync(); } catch (Exception e) { e.printStackTrace(); } } }
Client Handler
Friendly note: there is a call method. When the client calls the method, the message is sent to the server through call. The call method is called through the reflection mechanism. After the call method is called, the message will not return immediately, so wait with await. The server sends the message to the channelread method of the client handler, Adding notify to this method can wake up the call method and let its call method continue to execute. Channelread and call methods are synchronous, so they need to be locked
package com.atguigu.netty.dubborpc.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.Callable; public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context;//context private String result; //Returned results private String para; //The parameters passed in when the client calls the method //After the connection with the server is created, it will be called. This method is the first to be called (1) @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" channelActive Called "); context = ctx; //Because we will use ctx in other methods } //After receiving the data from the server, call the method (4). // @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(" channelRead Called "); result = msg.toString(); notify(); //Wake up waiting threads } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } //Called by the proxy object, send data to the server, - > wait - > wait to be awakened (channelread) - > return result (3) - "5" @Override public synchronized Object call() throws Exception { System.out.println(" call1 Called "); context.writeAndFlush(para); //wait wait(); //Wake up after waiting for the channelRead method to obtain the results of the server System.out.println(" call2 Called "); return result; //Results returned by the service provider } //(2) void setPara(String para) { System.out.println(" setPara "); this.para = para; } }
Server
package com.atguigu.netty.dubborpc.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyServer { public static void startServer(String hostName, int port) { startServer0(hostName,port); } //Write a method to complete the initialization and startup of NettyServer private static void startServer0(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //Service processor } } ); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync(); System.out.println("Service provider starts providing services~~"); channelFuture.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
Server handler
package com.atguigu.netty.dubborpc.netty; import com.atguigu.netty.dubborpc.customer.ClientBootstrap; import com.atguigu.netty.dubborpc.provider.HelloServiceImpl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; //The server handler is relatively simple public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //Get the message sent by the client and call the service System.out.println("msg=" + msg); //When the client calls the server api, we need to define a protocol //For example, we require that every time we send a message, it must start with a string, "Hello service #hello# hello" if(msg.toString().startsWith(ClientBootstrap.providerName)) { String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
customer package
package com.atguigu.netty.dubborpc.customer; import com.atguigu.netty.dubborpc.netty.NettyClient; import com.atguigu.netty.dubborpc.publicinterface.HelloService; public class ClientBootstrap { //The protocol header is defined here public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws Exception{ //Create a consumer NettyClient customer = new NettyClient(); //Create proxy object HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); for (;; ) { Thread.sleep(2 * 1000); //Invoking a service provider's method (service) through a proxy object String res = service.hello("Hello dubbo~"); System.out.println("Result of call res= " + res); } } }
Code core description:
client:
Friendly tips: return executor submit(client). get() is to get the return value of the call method.
Client's handler
Server Handler: