Basic introduction to RPC
- RPC(Remote Procedure Call) -- remote procedure call is a computer communication protocol that allows a program running on one computer to call a subroutine of another computer without additional programming for this interactive procedure
- Two or more applications are distributed on different servers, and the calls between them are like local method calls (as shown in the figure)
- Common RPC frameworks include: Alibaba's Dubbo,Nacos,Google's gRpc, Go's rpcx, Apache's thrift, and spring's spring cloud (Eureka)
RPC call flow chart
Description of terms:
In RPC, the Client side is called a service consumer and the Server side is called a service provider
RPC call process description
- The service consumer (Client) invokes the service locally
- After receiving the call, ClientStub is responsible for encapsulating the method and parameters into a message body that can be transmitted over the network
- ClientStub encodes the message and sends it to the server
- ServerStub decodes the message after receiving it
- ServerStub calls the local service according to the decoding result
- The local service executes and returns the result to ServerStub
- ServerStub encodes the returned result and sends it to the consumer
- ClientSub receives the message and decodes it
- The service consumer gets the result
Summary: the goal of RPC is to encapsulate 2-8 these steps. Users do not need to care about these details. They can complete the service call of remote services like calling local methods
Implementation of Dubbo RPC based on Netty
Requirement description
- The bottom layer of Dubbo uses Netty as the network communication framework, and requires Netty to implement a simple RPC framework
- Imitating Dubbo, the consumer and the provider agree on the interface and protocol. The consumer remotely calls the provider service, the provider returns a string, the consumer prints the data returned by the provider, and the underlying communication uses Netty 4.1.20
Design description
- Create an interface and define abstract methods for the contract between consumers and providers
- Create a provider that needs to listen to consumer requests and return data as agreed
- To create a consumer, this class needs to transparently call methods that do not exist, and internally use Netty to request the provider to return data
- Development analysis diagram
code implementation
Define interface
package com.dance.netty.netty.dubbo.common.api; /** * Service interface for providing external services */ public interface HelloService { String printHello(String msg); }
Create a new interface implementation class (as a provider)
package com.dance.netty.netty.dubbo.provider.service.impl; import com.dance.netty.netty.dubbo.common.api.HelloService; public class HelloServiceImpl implements HelloService { private static final HelloService helloService = new HelloServiceImpl(); @Override public String printHello(String msg) { System.out.println("Parameter received: " + msg); return "hello msg!"; } private HelloServiceImpl(){} public static HelloService getInstance(){ return helloService; } }
New NettyServer
package com.dance.netty.netty.dubbo.common.netty; import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.nio.charset.StandardCharsets; public class NettyServer { public static void startServer0() { startServer0("127.0.0.1", 7000); } public static void startServer0(String hostname, int port) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // Use string codec ch.pipeline() .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new NettyServerHandler()); } }); ChannelFuture sync = serverBootstrap.bind(hostname, port).sync(); System.out.println("netty server is starting, ip: " + hostname + ", port: " + port); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
New NettyServerHandler
package com.dance.netty.netty.dubbo.common.netty; import com.dance.netty.netty.dubbo.provider.service.impl.HelloServiceImpl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Someone registered"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String msgStr = msg.toString(); // Get the data sent by the client System.out.println("msg is " + msgStr); // When the client calls the API of the server, we can customize a protocol // For example, we require that every message sent must start with a fixed format // Service name # method name # parameter // For example: HelloService#printHello#this is msg if(msgStr.startsWith("HelloService#printHello#")){ // Remove the protocol header when passing in parameters ctx.writeAndFlush(HelloServiceImpl.getInstance().printHello(msgStr.substring(msgStr.lastIndexOf('#') + 1))); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Create a new ServerBootstrap
package com.dance.netty.netty.dubbo.provider.server; import com.dance.netty.netty.dubbo.common.netty.NettyServer; /** * Service startup class */ public class ServerBootstrap { public static void main(String[] args) { NettyServer.startServer0(); } }
New NettyClientHandler
package com.dance.netty.netty.dubbo.common.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.SocketAddress; import java.util.concurrent.Callable; public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> { /** * Context object */ private ChannelHandlerContext context; /** * Return result of remote call */ private String result; public void setParams(String params) { this.params = params; } /** * Parameters passed in when the client calls the method */ private String params; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // It will be used in other methods context = ctx; } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { result = msg.toString(); // Wake up the thread waiting on this method notify(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } /** * Called by the proxy object, send data to the server = > wait = > wait for the read wake-up (that is, when the server has data return) = > return the result * In fact, it turns asynchronous wait and notify into synchronous wait * @return Return results * @throws Exception Thread exception */ @Override public synchronized String call() throws Exception { // Send parameters to server context.writeAndFlush(params); // Wait for read to read the data wait(); // Return the data of the response return result; } }
New NettyClient
package com.dance.netty.netty.dubbo.common.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NettyClient { /** * Create thread pool */ private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); /** * Processor for performing remote calls */ private static NettyClientHandler nettyClientHandler; // The writing method uses the proxy mode to obtain a proxy object public Object getBean(final Class<?> serviceClass, final String protocol){ return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> { if (nettyClientHandler == null) { initClient(); } // Set the information to be sent to the server // Adopt the format of protocol header + parameter [0] nettyClientHandler.setParams(protocol + args[0]); // Thread pool submits a task return executorService.submit(nettyClientHandler).get(); }); } private static void initClient(){ nettyClientHandler = new NettyClientHandler(); NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("stringDecoder", new StringDecoder()) .addLast("stringEncoder", new StringEncoder()) .addLast("nettyClientHandler", nettyClientHandler); } }); bootstrap.connect("127.0.0.1", 7000).sync(); // Note that the Client side cannot be blocked // sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // Don't close the key // eventExecutors.shutdownGracefully(); } } }
Create a new ClientBootstrap
package com.dance.netty.netty.dubbo.consumer.client; import com.dance.netty.netty.dubbo.common.api.HelloService; import com.dance.netty.netty.dubbo.common.netty.NettyClient; public class ClientBootstrap { private static final String PROTOCOL = "HelloService#printHello#"; public static void main(String[] args) { // Create a consumer NettyClient nettyClient = new NettyClient(); // Create proxy object HelloService helloService = (HelloService) nettyClient.getBean(HelloService.class, PROTOCOL); // Call the method (service) of the service provider through the proxy object String result = helloService.printHello("hi dubbo rpc"); System.out.println("Call method execution and return results: " + result); } }
test
Server side
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. netty server is starting, ip: 127.0.0.1, port: 7000 msg is HelloService#printHello#hi dubbo rpc Parameter received: hi dubbo rpc
Client side
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Call method execution and return results: hello msg!
We only call the interface provided by the provider through the interface, and realize the remote call of RPC through Netty
Netty has finished writing here. If you find a good video to explain netty's source code later, come back and write it. Next, let's take a look at the data structure and algorithm