Before reading this article, it is recommended to read the content associated with this article.
[[1] detailed analysis of the underlying implementation principle of network communication under distributed microservice architecture (illustration)]( https://mp.weixin.qq.com/s?__...)
[2 after working for 5 years, do you really understand Netty and why to use it? (deep dry goods)]( https://mp.weixin.qq.com/s?__...)
[[3] deep analysis of core components in Netty (illustration + example)]( https://mp.weixin.qq.com/s?__...)
[[4] details required for bat interview: detailed explanation of ByteBuf in Netty]( https://mp.weixin.qq.com/s?__...)
[[5] through a large number of actual combat cases, how to solve the problem of unpacking and sticking in Netty?]( https://mp.weixin.qq.com/s?__...)
[[6] implement custom message communication protocol based on Netty (protocol design and analysis application practice)]( https://mp.weixin.qq.com/s?__...)
[[7] the most detailed and complete serialization technology, in-depth analysis and application practice in the whole network]( https://mp.weixin.qq.com/s?__...)
In the previous content, we have understood the basic knowledge and implementation principle of Netty from simple to deep, and I believe you have a more comprehensive understanding of Netty. Then, we will take you through a practical case of handwritten RPC communication to understand the practical application of Netty.
Why choose RPC as the actual combat? Because Netty itself is to solve the communication problem, and in the practical application, the RPC Protocol Framework is the one we contact most. Therefore, this actual combat can let you understand that in addition to Netty's practical application, you can also understand the underlying principles of RPC.
What is RPC
The full name of RPC (Remote Procedure Call) is a protocol that requests services from remote computer programs through the network without understanding the underlying network technology. The simple understanding is to enable developers to call remote services like calling local services.
Since it is a protocol, it must have a protocol specification, as shown in Figure 6-1.
In order to achieve the goal of "enabling developers to call remote services like calling local services", RPC Protocol needs to realize remote interaction as shown in Figure 6-1.
- When the client invokes the remote service, the details of network communication must be shielded through the local dynamic proxy module, so the dynamic proxy module needs to be responsible for assembling the request parameters, methods and other data into data packets and sending them to the target server
- When sending this packet, it also needs to follow the agreed message protocol and serialization protocol, and finally convert it into binary data stream transmission
- After receiving the data packet, the server decodes it according to the agreed message protocol to obtain the request information.
- The server then routes the call to the target service according to the request information, obtains the result and returns it to the client.
< center > figure 6-1 < / center >
Mainstream RPC framework in the industry
For any framework that meets the RPC Protocol, we become the RPC framework. In actual development, we can use the open source and relatively mature RPC framework to solve the remote communication problem under the microservice architecture. The common RPC framework:
- Thrift: thrift is a software framework for developing extensible and cross language services. It combines a powerful software stack and code generation engine to build seamless and efficient services between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml.
- Dubbo: Dubbo is a distributed service framework and SOA governance scheme. Its functions mainly include: high-performance NIO communication and multi protocol integration, service dynamic addressing and routing, soft load balancing and fault tolerance, dependency analysis and degradation, etc. Dubbo is the core framework of Alibaba's internal SOA Service governance scheme. Dubbo has been used by many non Alibaba companies since it was opened in 2011 Use.
Notes for handwritten RPC
Based on the above understanding of RPC Protocol, what technologies should we consider if we implement it ourselves? In fact, the whole process based on figure 6-1 should have a general understanding.
- Communication protocol, RPC framework has very high performance requirements, so the simpler the communication protocol should be, the better, which can reduce the performance loss caused by encoding and decoding. Most mainstream RPC frameworks will directly choose TCP and HTTP protocols.
- Serialization and deserialization: data needs to be serialized and deserialized for network transmission. As we said earlier, the so-called serialization and deserialization is the process of not converting objects into binary streams and converting binary streams into objects. In the selection of serialization framework, we generally choose efficient and general algorithms, such as fastjason, Protobuf Hessian et al. These serialization technologies are more efficient than native serialization operations and have higher compression ratio.
- Dynamic proxy: when a client invokes a remote service, it needs to shield the details of network communication through the dynamic proxy. The dynamic proxy is generated during operation, so the generation speed and bytecode size of the dynamic proxy class will affect the performance and resource consumption of the overall RPC framework. Common dynamic proxy technologies: Javassist, Cglib, JDK dynamic proxy, etc.
Implementation of RPC based on Netty handwriting
After understanding the RPC Protocol, we implement an RPC communication framework based on Netty.
See Annex netty RPC example for code details
< center > figure 6-2 project module composition < / center >
jar packages to be imported:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency>
Module dependencies:
- The provider relies on netty RPC Protocol and netty RPC API
- cosumer relies on netty RPC Protocol and netty RPC API
Netty RPC API module
< center > figure 6-3 composition of netty RPC API module < / center >
IUserService
public interface IUserService { String saveUser(String name); }
Netty RPC provider module
< center > figure 6-4 composition of netty RPC provider module < / center >
UserServiceImpl
@Service @Slf4j public class UserServiceImpl implements IUserService { @Override public String saveUser(String name) { log.info("begin saveUser:"+name); return "Save User Success!"; } }
NettyRpcProviderMain
Note that in the current step, the part describing the case is not added for the time being, and it will be added later
@ComponentScan(basePackages = {"com.example.spring","com.example.service"}) //Case1 (add later) @SpringBootApplication public class NettyRpcProviderMain { public static void main(String[] args) throws Exception { SpringApplication.run(NettyRpcProviderMain.class, args); new NettyServer("127.0.0.1",8080).startNettyServer(); //Case2 (add later) } }
netty-rpc-protocol
Start writing the communication protocol module, which mainly does several things
- Define message protocol
- Define serialization and deserialization methods
- Establish netty communication
< center > figure 6-5 < / center >
Define message protocol
We talked about the custom message protocol before. We can define it here according to the following protocol format.
/* +----------------------------------------------+ | Magic number 2byte | serialization algorithm 1byte | request type 1byte| +----------------------------------------------+ | Message ID 8byte | data length 4byte| +----------------------------------------------+ */
Header
@AllArgsConstructor @Data public class Header implements Serializable { /* +----------------------------------------------+ | Magic number 2byte | serialization algorithm 1byte | request type 1byte| +----------------------------------------------+ | Message ID 8byte | data length 4byte| +----------------------------------------------+ */ private short magic; //Magic number - used to verify the identity of the message (2 bytes) private byte serialType; //Serialization type (1 byte) private byte reqType; //Operation type (1 byte) private long requestId; //Request id (8 bytes) private int length; //Data length (4 bytes) }
RpcRequest
@Data public class RpcRequest implements Serializable { private String className; private String methodName; private Object[] params; private Class<?>[] parameterTypes; }
RpcResponse
@Data public class RpcResponse implements Serializable { private Object data; private String msg; }
RpcProtocol
@Data public class RpcProtocol<T> implements Serializable { private Header header; private T content; }
Define related constants
The above message protocol definition involves several enumeration related classes, which are defined as follows
ReqType
Message type
public enum ReqType { REQUEST((byte)1), RESPONSE((byte)2), HEARTBEAT((byte)3); private byte code; private ReqType(byte code) { this.code=code; } public byte code(){ return this.code; } public static ReqType findByCode(int code) { for (ReqType msgType : ReqType.values()) { if (msgType.code() == code) { return msgType; } } return null; } }
SerialType
Serialization type
public enum SerialType { JSON_SERIAL((byte)0), JAVA_SERIAL((byte)1); private byte code; SerialType(byte code) { this.code=code; } public byte code(){ return this.code; } }
RpcConstant
public class RpcConstant { //Total bytes of header section public final static int HEAD_TOTAL_LEN=16; //magic number public final static short MAGIC=0xca; }
Define serialization related implementations
Here are two ways to demonstrate, one is JSON, and the other is Java Native
ISerializer
public interface ISerializer { <T> byte[] serialize(T obj); <T> T deserialize(byte[] data,Class<T> clazz); byte getType(); }
JavaSerializer
public class JavaSerializer implements ISerializer{ @Override public <T> byte[] serialize(T obj) { ByteArrayOutputStream byteArrayOutputStream= new ByteArrayOutputStream(); try { ObjectOutputStream outputStream= new ObjectOutputStream(byteArrayOutputStream); outputStream.writeObject(obj); return byteArrayOutputStream.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return new byte[0]; } @Override public <T> T deserialize(byte[] data, Class<T> clazz) { ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data); try { ObjectInputStream objectInputStream= new ObjectInputStream(byteArrayInputStream); return (T) objectInputStream.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } @Override public byte getType() { return SerialType.JAVA_SERIAL.code(); } }
JsonSerializer
public class JsonSerializer implements ISerializer{ @Override public <T> byte[] serialize(T obj) { return JSON.toJSONString(obj).getBytes(); } @Override public <T> T deserialize(byte[] data, Class<T> clazz) { return JSON.parseObject(new String(data),clazz); } @Override public byte getType() { return SerialType.JSON_SERIAL.code(); } }
SerializerManager
Realize the management of serialization mechanism
public class SerializerManager { private final static ConcurrentHashMap<Byte, ISerializer> serializers=new ConcurrentHashMap<Byte, ISerializer>(); static { ISerializer jsonSerializer=new JsonSerializer(); ISerializer javaSerializer=new JavaSerializer(); serializers.put(jsonSerializer.getType(),jsonSerializer); serializers.put(javaSerializer.getType(),javaSerializer); } public static ISerializer getSerializer(byte key){ ISerializer serializer=serializers.get(key); if(serializer==null){ return new JavaSerializer(); } return serializer; } }
Define encoding and decoding implementation
Since the message protocol is customized, you need to implement encoding and decoding yourself. The code is as follows
RpcDecoder
@Slf4j public class RpcDecoder extends ByteToMessageDecoder { /* +----------------------------------------------+ | Magic number 2byte | serialization algorithm 1byte | request type 1byte| +----------------------------------------------+ | Message ID 8byte | data length 4byte| +----------------------------------------------+ */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { log.info("==========begin RpcDecoder =============="); if(in.readableBytes()< RpcConstant.HEAD_TOTAL_LEN){ //The message is not long enough and does not need to be parsed return; } in.markReaderIndex();//Mark an index of read data for subsequent reset. short magic=in.readShort(); //Read magic if(magic!=RpcConstant.MAGIC){ throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic); } byte serialType=in.readByte(); //Read serialization algorithm type byte reqType=in.readByte(); //Request type long requestId=in.readLong(); //Request message id int dataLength=in.readInt(); //Request data length //The number of bytes in the readable area is less than the actual data length if(in.readableBytes()<dataLength){ in.resetReaderIndex(); return; } //Read message content byte[] content=new byte[dataLength]; in.readBytes(content); //Build header information Header header=new Header(magic,serialType,reqType,requestId,dataLength); ISerializer serializer=SerializerManager.getSerializer(serialType); ReqType rt=ReqType.findByCode(reqType); switch(rt){ case REQUEST: RpcRequest request=serializer.deserialize(content, RpcRequest.class); RpcProtocol<RpcRequest> reqProtocol=new RpcProtocol<>(); reqProtocol.setHeader(header); reqProtocol.setContent(request); out.add(reqProtocol); break; case RESPONSE: RpcResponse response=serializer.deserialize(content,RpcResponse.class); RpcProtocol<RpcResponse> resProtocol=new RpcProtocol<>(); resProtocol.setHeader(header); resProtocol.setContent(response); out.add(resProtocol); break; case HEARTBEAT: break; default: break; } } }
RpcEncoder
@Slf4j public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> { /* +----------------------------------------------+ | Magic number 2byte | serialization algorithm 1byte | request type 1byte| +----------------------------------------------+ | Message ID 8byte | data length 4byte| +----------------------------------------------+ */ @Override protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf out) throws Exception { log.info("=============begin RpcEncoder============"); Header header=msg.getHeader(); out.writeShort(header.getMagic()); //Write magic number out.writeByte(header.getSerialType()); //Write serialization type out.writeByte(header.getReqType());//Write request type out.writeLong(header.getRequestId()); //Write request id ISerializer serializer= SerializerManager.getSerializer(header.getSerialType()); byte[] data=serializer.serialize(msg.getContent()); //serialize header.setLength(data.length); out.writeInt(data.length); //Write message length out.writeBytes(data); } }
NettyServer
Implement NettyServer build.
@Slf4j public class NettyServer{ private String serverAddress; //address private int serverPort; //port public NettyServer(String serverAddress, int serverPort) { this.serverAddress = serverAddress; this.serverPort = serverPort; } public void startNettyServer() throws Exception { log.info("begin start Netty Server"); EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new RpcServerInitializer()); ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync(); log.info("Server started Success on Port:{}", this.serverPort); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ log.error("Rpc Server Exception",e); }finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
RpcServerInitializer
public class RpcServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0)) .addLast(new RpcDecoder()) .addLast(new RpcEncoder()) .addLast(new RpcServerHandler()); } }
RpcServerHandler
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception { RpcProtocol resProtocol=new RpcProtocol<>(); Header header=msg.getHeader(); header.setReqType(ReqType.RESPONSE.code()); Object result=invoke(msg.getContent()); resProtocol.setHeader(header); RpcResponse response=new RpcResponse(); response.setData(result); response.setMsg("success"); resProtocol.setContent(response); ctx.writeAndFlush(resProtocol); } private Object invoke(RpcRequest request){ try { Class<?> clazz=Class.forName(request.getClassName()); Object bean= SpringBeansManager.getBean(clazz); //Get instance object (CASE) Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes()); return declaredMethod.invoke(bean,request.getParams()); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return null; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
SpringBeansManager
@Component public class SpringBeansManager implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringBeansManager.applicationContext=applicationContext; } public static <T> T getBean(Class<T> clazz){ return applicationContext.getBean(clazz); } }
Note that after this class is built, you need to add compone scan to the main method of the netty RPC provider module for scanning
@ComponentScan(basePackages = {"com.example.spring","com.example.service"}) //Modify here @SpringBootApplication public class NettyRpcProviderMain { public static void main(String[] args) throws Exception { SpringApplication.run(NettyRpcProviderMain.class, args); new NettyServer("127.0.0.1",8080).startNettyServer(); // Modify here } }
netty-rpc-consumer
Next, start to realize the consumer side
RpcClientProxy
public class RpcClientProxy { public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){ return (T) Proxy.newProxyInstance (interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RpcInvokerProxy(host,port)); } }
RpcInvokerProxy
@Slf4j public class RpcInvokerProxy implements InvocationHandler { private String serviceAddress; private int servicePort; public RpcInvokerProxy(String serviceAddress, int servicePort) { this.serviceAddress = serviceAddress; this.servicePort = servicePort; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.info("begin invoke target server"); //Assembly parameters RpcProtocol<RpcRequest> protocol=new RpcProtocol<>(); long requestId= RequestHolder.REQUEST_ID.incrementAndGet(); Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0); protocol.setHeader(header); RpcRequest request=new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParams(args); protocol.setContent(request); //Send request NettyClient nettyClient=new NettyClient(serviceAddress,servicePort); //Building asynchronous data processing RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop())); RequestHolder.REQUEST_MAP.put(requestId,future); nettyClient.sendRequest(protocol); return future.getPromise().get().getData(); } }
Define client connections
In the protocol package path of netty RPC protocol module, create NettyClient
@Slf4j public class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup(); private String serviceAddress; private int servicePort; public NettyClient(String serviceAddress,int servicePort){ log.info("begin init NettyClient"); bootstrap=new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new RpcClientInitializer()); this.serviceAddress=serviceAddress; this.servicePort=servicePort; } public void sendRequest(RpcProtocol<RpcRequest> protocol) throws InterruptedException { ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync(); future.addListener(listener->{ if(future.isSuccess()){ log.info("connect rpc server {} success.",this.serviceAddress); }else{ log.error("connect rpc server {} failed .",this.serviceAddress); future.cause().printStackTrace(); eventLoopGroup.shutdownGracefully(); } }); log.info("begin transfer data"); future.channel().writeAndFlush(protocol); } }
RpcClientInitializer
@Slf4j public class RpcClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { log.info("begin initChannel"); ch.pipeline() .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0)) .addLast(new LoggingHandler()) .addLast(new RpcEncoder()) .addLast(new RpcDecoder()) .addLast(new RpcClientHandler()); } }
RpcClientHandler
It should be noted that the communication process of Netty is based on inbound and outbound separation, so we need to complete it with a Future object when obtaining results.
@Slf4j public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception { log.info("receive rpc server result"); long requestId=msg.getHeader().getRequestId(); RpcFuture<RpcResponse> future=RequestHolder.REQUEST_MAP.remove(requestId); future.getPromise().setSuccess(msg.getContent()); //Return results } }
Implementation of Future
Add the rpcFuture implementation in the netty RPC protocol module
RpcFuture
@Data public class RpcFuture<T> { //Promise is a writable future. Future itself does not have an interface related to write operations, // Netty extends Future through Promise to set the results of IO operations private Promise<T> promise; public RpcFuture(Promise<T> promise) { this.promise = promise; } }
RequestHolder
Save the corresponding results of requestid and future
public class RequestHolder { public static final AtomicLong REQUEST_ID=new AtomicLong(); public static final Map<Long,RpcFuture> REQUEST_MAP=new ConcurrentHashMap<>(); }
Students who need source code, please pay attention to the official account [following the Mic learning framework], and reply to the keyword [rpc].
Copyright notice: unless otherwise stated, all articles on this blog adopt CC BY-NC-SA 4.0 license agreement. Reprint please indicate from Mic to take you to learn architecture!
If this article is helpful to you, please pay attention and praise. Your persistence is the driving force of my continuous creation. Welcome to WeChat public official account for more dry cargo.