- explain
- Project link
- What does the microservice framework include?
- How to implement RPC remote call?
- Open source RPC framework
- Restricted language
- Cross language RPC framework
- Build ZooKeeper with local Docker
- Download Image
- Start container
- View container log
- RPC interface
- Netty RPC server
- Interface implementation
- Service startup
- Registration service
- ZooKeeper implementation
- Netty RPC Client
- Create proxy
- Remote call
- Codec
- RpcDecoder
- RpcEncoder
- RpcServerInboundHandler
- Path of Server in ZooKeeper
- explain
- Reference link
explain
Coupon m.fenfaw netUse Netty, ZooKeeper and Spring Boot to create a microservice framework.
Project link
GitHub source address
What does the microservice framework include?
For details, please refer to RPC practice and principle
The project can be divided into caller (client) and provider (server). The client only needs to call the interface. The final call information will be transmitted to the server through the network. The server calls the corresponding method through decoding and reflection, and returns the result to the client through the network. For the client side, you can completely ignore the existence of the network and call the rpc service just like calling the local method.
The model structure of the whole project is as follows:
How to implement RPC remote call?
- How to establish network connection between client and server: HTTP and Socket
- How the server handles requests: NIO (using Netty)
- What protocol is used for data transmission
- How to serialize and deserialize data: JSON, PB, Thrift
Open source RPC framework
Restricted language
- Dubbo: Java, Ali
- Motan: Java, Weibo
- Tars: C + +, Tencent (multi language supported)
- Spring Cloud: Java
- Gateway Zuul
- Registration Center Eureka
- Service timeout: Hystrix
- Call chain monitoring Sleuth
- Log analysis ELK
Cross language RPC framework
- gRPC: HTTP/2
- Thrift: TCP
Build ZooKeeper with local Docker
Download Image
Download the image and start ZooKeeper. See details https://hub.docker.com/_/zookeeper
Start container
The startup command is as follows. The name of the container is zookeeper RPC demo, and ports 8080, 2181, 2888 and 3888 are exposed to the machine at the same time:
docker run --name zookeeper-rpc-demo --restart always -p 8080:8080 -p 2181:2181 -p 2888:2888 -p 3888:3888 -d zookeeper
This image includes EXPOSE 2181 2888 3888 8080 (the zookeeper client port, follower port, election port, AdminServer port respectively), so standard container linking will make it automatically available to the linked containers. Since the Zookeeper "fails fast" it's better to always restart it.
View container log
You can enter the container through the following command, where fb6f95cde6ba is the Docker ZooKeeper container id of my machine.
docker exec -it fb6f95cde6ba /bin/bash
In the container, enter the directory: / apache-zookeeper-3.7.0-bin/bin and execute the command zkcli SH - server 0.0.0.0:2181 link zk service.
RPC interface
This example provides two interfaces: HelloService and HiService, in which there is an interface method respectively. The client only needs to refer to RPC sample API, only knows the interface definition, and does not know the specific implementation.
public interface HelloService { String hello(String msg); }
public interface HiService { String hi(String msg); }
Netty RPC server
Start a Server service, implement the above two RPC interfaces, and register the service with ZooKeeper.
Interface implementation
/** * @author yano * GitHub Project: https://github.com/LjyYano/Thinking_in_Java_MindMapping * @date 2021-05-07 */ @RpcServer(cls = HelloService.class) public class HelloServiceImpl implements HelloService { @Override public String hello(String msg) { return "hello echo: " + msg; } }
/** * @author yano * GitHub Project: https://github.com/LjyYano/Thinking_in_Java_MindMapping * @date 2021-05-07 */ @RpcServer(cls = HiService.class) public class HiServiceImpl implements HiService { public String hi(String msg) { return "hi echo: " + msg; } }
There are two issues involved:
- Server should decide which interface implementations to register with ZooKeeper?
- What should be the path of HelloServiceImpl and HiService in ZooKeeper?
Service startup
This sample Server uses Spring Boot, but we don't need to start a web service, just keep the background running, so set the web to webapplicationtype NONE
@SpringBootApplication public class RpcServerApplication { public static void main(String[] args) { new SpringApplicationBuilder(RpcServerApplication.class) .web(WebApplicationType.NONE) .run(args); } }
Registration service
NettyApplicationContextAware is an implementation class of ApplicationContextAware. When the program starts, it registers the implementation class with RpcServer (explained below) annotation on ZooKeeper.
@Component public class NettyApplicationContextAware implements ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(NettyApplicationContextAware.class); @Value("${zk.address}") private String zookeeperAddress; @Value("${zk.port}") private int zookeeperPort; public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> rpcBeanMap = new HashMap<>(); for (Object object : applicationContext.getBeansWithAnnotation(RpcServer.class).values()) { rpcBeanMap.put("/" + object.getClass().getAnnotation(RpcServer.class).cls().getName(), object); } try { NettyServer.start(zookeeperAddress, zookeeperPort, rpcBeanMap); } catch (Exception e) { logger.error("register error !", e); } } }
The RpcServer annotation is defined as follows:
@Retention(RetentionPolicy.RUNTIME) @Documented @Target(ElementType.TYPE) @Component public @interface RpcServer { /** * Interface class for interface registration */ Class<?> cls(); }
applicationContext.getBeansWithAnnotation(RpcServer.class).values() is to get the class with rpcserver annotation in the project and put it into an rpcBeanMap, where key is the path to be registered in ZooKeeper. Note that the path uses the name of the interface, not the name of the class.
The advantage of using annotations is that Server A can only provide HelloService and Server B can only provide HiService, which will not affect each other and is more flexible.
Service registration is mainly on COM yano. server. In nettyserver #start.
public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); public static void start(String ip, int port, Map<String, Object> params) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() .addLast(new RpcDecoder(Request.class)) .addLast(new RpcEncoder(Response.class)) .addLast(new RpcServerInboundHandler(params)); } }); ChannelFuture future = serverBootstrap.bind(ip, port).sync(); if (future.isSuccess()) { params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port)); } future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
This class is used to:
- Start a Socket service through Netty, and the port number is passed in through parameters
- Register the above interface implementation in ZooKeeper
params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));
ZooKeeper implementation
The main task is to maintain zk connection and register the ip and port of the Server in the corresponding ZooKeeper. Ephemeral node is used here, so that ZooKeeper can automatically delete the node after the Server loses its connection offline, so that the Client will not get the offline Server address.
public class ZooKeeperOp { private static final String zkAddress = "localhost:2181"; private static final ZkClient zkClient = new ZkClient(zkAddress); public static void register(String serviceName, String serviceAddress) { if (!zkClient.exists(serviceName)) { zkClient.createPersistent(serviceName); } zkClient.createEphemeral(serviceName + "/" + serviceAddress); System.out.printf("create node %s ", serviceName + "/" + serviceAddress); } public static String discover(String serviceName) { List<String> children = zkClient.getChildren(serviceName); if (CollectionUtils.isEmpty(children)) { return ""; } return children.get(ThreadLocalRandom.current().nextInt(children.size())); } }
Netty RPC Client
Netty RPC Client mainly calls the above two interfaces like calling local methods to verify that it can return normally.
public class RpcClientApplication { public static void main(String[] args) { HiService hiService = RpcProxy.create(HiService.class); String msg = hiService.hi("msg"); System.out.println(msg); HelloService helloService = RpcProxy.create(HelloService.class); msg = helloService.hello("msg"); System.out.println(msg); } }
Run the above code, and the final console will output:
hi echo: msg hello echo: msg
Create proxy
HiService hiService = RpcProxy.create(HiService.class); String msg = hiService.hi("msg");
Client needs to go through COM yano. Rpcproxy#create creates a proxy, and then you can call the hi method of hiService.
public class RpcProxy { public static <T> T create(final Class<?> cls) { return (T) Proxy.newProxyInstance(cls.getClassLoader(), new Class<?>[] {cls}, (o, method, objects) -> { Request request = new Request(); request.setInterfaceName("/" + cls.getName()); request.setRequestId(UUID.randomUUID().toString()); request.setParameter(objects); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); Response response = new NettyClient().client(request); return response.getResult(); }); } }
If the Server side wants to call the method requested by the Client side through reflection, at least:
- Class name interfaceName
- Method namemethodname
- Parameter type class <? > [] parameterTypes
- Pass in the parameter Object parameter []
@Data public class Request { private String requestId; private String interfaceName; private String methodName; private Class<?>[] parameterTypes; private Object parameter[]; }
Remote call
Finally, it is called remotely through the following code, where request contains all the information of the calling method.
Response response = new NettyClient().client(request);
/** * @author yano * GitHub Project: https://github.com/LjyYano/Thinking_in_Java_MindMapping * @date 2021-05-07 */ public class NettyClient extends SimpleChannelInboundHandler<Response> { private Response response; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) { this.response = response; } public Response client(Request request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // Create and initialize the Netty client Bootstrap object Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) { channel.pipeline() .addLast(new RpcDecoder(Response.class)) .addLast(new RpcEncoder(Request.class)) .addLast(NettyClient.this); } }); // Connect to RPC server String[] discover = ZooKeeperOp.discover(request.getInterfaceName()).split(":"); ChannelFuture future = bootstrap.connect(discover[0], Integer.parseInt(discover[1])).sync(); // Write RPC request data and close the connection Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); return response; } finally { group.shutdownGracefully(); } } }
This code is the core and mainly does two things:
- Request ZooKeeper and find the Server address under the corresponding node. If there are multiple service providers, zookeeperop Discover will randomly return the Server address
- Establish a Socket connection with the obtained Server address, request and wait for the return
Codec
channel.pipeline() .addLast(new RpcDecoder(Response.class)) .addLast(new RpcEncoder(Request.class)) .addLast(NettyClient.this);
Both Client and Server need to encode and decode Request and Response. This example uses the simplest Json format. Netty's message encoding and decoding will not be explained in detail. The specific codes are as follows.
RpcDecoder
RpcDecoder is a ChannelInboundHandler that decodes the Response on the Client side.
public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { private final Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { if (msg.readableBytes() < 4) { return; } msg.markReaderIndex(); int dataLength = msg.readInt(); if (msg.readableBytes() < dataLength) { msg.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; msg.readBytes(data); out.add(JSON.parseObject(data, genericClass)); } }
RpcEncoder
Rpcoder is a ChannelOutboundHandler that encodes requests on the Client side.
public class RpcEncoder extends MessageToByteEncoder { private final Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { if (genericClass.isInstance(msg)) { byte[] data = JSON.toJSONBytes(msg); out.writeInt(data.length); out.writeBytes(data); } } }
RpcServerInboundHandler
This is the core of Server reflection call, which will be explained separately here. When the Netty Server is started, RpcServerInboundHandler has been added to the pipeline.
socketChannel.pipeline() .addLast(new RpcDecoder(Request.class)) .addLast(new RpcEncoder(Response.class)) .addLast(new RpcServerInboundHandler(params));
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; logger.info("request data {}", JSON.toJSONString(request)); // jdk reflection call Object bean = handle.get(request.getInterfaceName()); Method method = bean.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); method.setAccessible(true); Object result = method.invoke(bean, request.getParameter()); Response response = new Response(); response.setRequestId(request.getRequestId()); response.setResult(result); // After receiving the information, the client actively closes the connection ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); }
Path of Server in ZooKeeper
The output after Server startup is as follows:
There are two lines of log:
create node /com.yano.service.HelloService/127.0.0.1:3000 create node /com.yano.service.HiService/127.0.0.1:3000
Check the node in ZooKeeper and find that the service has been registered.
[zk: 0.0.0.0:2181(CONNECTED) 0] ls /com.yano.service.HelloService [127.0.0.1:3000] [zk: 0.0.0.0:2181(CONNECTED) 1] ls /com.yano.service.HiService [127.0.0.1:3000]
explain
Use Netty, ZooKeeper and Spring Boot to create a microservice RPC framework. This demo can only be used as an example. Manual implementation can deepen understanding. It should not be used in production environment.
The code of this article can be found in the GitHub source code address. Welcome to star and fork.
Reference link
https://github.com/yanzhenyidai/netty-rpc-example