1, Remoting communication mechanism in RocketMQ
RocketMQ message queue cluster mainly includes NameServe, Broker(Master/Slave), Producer and consumer. The basic communication process is as follows:
- Register yourself with NameServer after the Broker is started; Then regularly report Topic routing information to NameServer every 30s;
- When sending a message, the message Producer needs to obtain routing information from the locally cached TopicPublishInfoTable according to the Topic of Msg (if not, it will be pulled again from the NameServer);
- The Producer selects a message queue to send messages according to the routing information obtained in (2); As the receiver of messages, the Broker receives messages and stores them on the disk.
- According to the routing information obtained in 2), and after completing the load balancing of the client, the message Consumer selects one or several message queues to pull messages and consume them.
2, Remoting communication module API in RocketMQ
- RemotingService: the top-level interface. The main methods are:
void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook);
RemotingServer/RemotingClient: remote server / client basic interface. The methods of the two are basically similar:
/** * requestCode Command code * processor RocketMQ Request service processor, for example, the processor for message sending is SendMessageProcessor, and PullMessageProcessor is the service processor for message pulling. * executor Thread pool in which NettyRequestProcessor executes specific business logic */ void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); //Obtain the corresponding request service processor and thread pool according to the request code Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
RomotingClient is mainly used when Producer sends messages and Consumer pulls messages; RomotingServer is mainly used when the Broker calls back and obtains the status of the Consumer.
Here, we need to focus on the method of registerProcessor registering command processor. RocketMQ will be split according to business logic, such as message sending and message pulling. Each network operation will define a request code, and then each type corresponds to a business processor NettyRequestProcessor. Different thread pools can be defined according to different request codes to realize thread pool isolation of different requests.- Nettyremotengabstract: an abstract implementation class of Netty remote service, which defines the processing logic of network remote call, request and response. Important attributes are:
Semaphore semaphoreOneway: control oneway The semaphore of the concurrency degree of the transmission mode. The default is 65535 licenses. Semaphore semaphoreAsync: The semaphore that controls the concurrency of asynchronous transmission. The default is 65535 licenses. ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable: Currently waiting The request processing table to be returned by the opposite end, where opaque It represents the number of the request, which is globally unique. It is usually incremented by atom, and the client sends it to When the peer sends a network request, it usually adopts a single long connection, so it will return to the caller immediately after sending the request ResponseFuture,At the same time, the request will be put into the mapping table. When the response processing is completed (the response will contain the request) code),Then get the corresponding from the mapping table ResponseFutre,Then notify the caller of the return result. Here is Future The classic application of pattern in network programming. HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable: Registered request processing command. RocketMQ Different methods are used in the design of The request command supports different thread pools, that is, it realizes the isolation of business thread pools.
- NettyRemotingClient: Based on Netty network programming client, implement RemotingClient interface and inherit NettyRemotingAbstract.
Important attributes are as follows:
NettyClientConfig nettyClientConfig: Network related configuration items. Bootstrap bootstrap: Netty The client starts the help class. EventLoopGroup eventLoopGroupWorker: Netty client Work Thread group, commonly known as IO Thread. ConcurrentMap<String /* addr */, ChannelWrapper> channelTables: Connections created by the current client (network channel Netty Cannel),Each address has a long connection. ExecutorService publicExecutor: Default task thread pool. ExecutorService callbackExecutor: Return the class request execution thread pool. DefaultEventExecutorGroup defaultEventExecutorGroup: Netty ChannelHandler Thread execution group, i.e Netty ChannelHandler Execute in these threads.
- NettyRemotingServer: Netty based network programming server.
Its core attributes are as follows:
ServerBootstrap serverBootstrap: Netty Server Start the help class on the client side. EventLoopGroup eventLoopGroupSelector: Netty Server Work Thread group, i.e. master-slave multi thread group Reactor From in Reactor,It is mainly responsible for handling read-write events. EventLoopGroup eventLoopGroupBoss: Netty Boss Thread group,Master-slave Reactor Main in thread model Reactor,Mainly responsible for OP_ACCEPT Event (create connection). NettyServerConfig nettyServerConfig: Netty Server configuration. Timer timer = new Timer("ServerHouseKeepingService", true): Timing scanner, right NettyRemotingAbstract Medium responseTable Scan to remove requests that have timed out. DefaultEventExecutorGroup defaultEventExecutorGroup: Netty ChannelHandler Thread execution group. int port: Server binding port. NettyEncoder encoder: RocketMQ Communication protocol (encoder). NettyDecoder decoder: RocketMQ Communication protocol (decoder). NettyConnectManageHandler connectionManageHandler: Netty Connect the pipe connector Handler,It mainly realizes the state tracking of the connection. NettyServerHandler serverHandler: NettyServer End core business processor.
Here, according to the calling relationship between classes, we will explain the sending and consumption of messages, and look at another figure:
1) . NettyRemotingClient will first create a Channel through Netty related API s and cache it when it needs to connect to the specified address. If the next request is still sent to the address, it can be reused.
2) . And then calling NettyRemotingClient's invokeAsync and other methods to send the network, a very important step in Netty is to send the request, which is mainly encoding the request that needs to be sent, such as RemotingCommand, to convert the object into a binary stream in a specific format.
3) . After the nettyremotingserver receives the binary, the network read request is ready to process the read request event. First, a complete request packet needs to be identified from the binary stream, which is the so-called decoding, that is, the binary stream is converted into a request object, decoded into a remoting command, and then the read event will be propagated to NettyServerHandler, and finally execute the processRequestCommand of nettyremotengabstract, It mainly obtains the specified command according to the requestCode, executes the thread pool and NettyRequestProcessor, executes the corresponding logic, and then returns the execution result to the client through the network.
4) . After receiving the response from the server, the client will trigger the read event and execute the decoding (NettyDecoder), and then the read event will be propagated to NettyClientHandler and process the response results.
3, Implementation of Remoting communication module in RocketMQ
- Creation of the client. The implementation class of the client in RocketMQ: nettyremotenglient. Its creation core code is encapsulated in the start method.
public void start() { //Create a default event execution thread group, and subsequent event processors (the event processor in addLast in ChannelPipeline) execute in this thread group this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); //Create Netty client group, specify Work thread group, and all read and write events will be executed in this thread group (i.e. IO thread); Channel specifies the channel type. NIO channel is used here Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)//Whether to disable Nagle. If set to true, it means to send immediately. If set to false, if a packet is small, it will try to wait for more packets to be sent together .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())//Connection timeout. If the connection is not successful after timeout, an exception will be thrown .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())//Together with the following, the size of socket send buffer and socket receive buffer is 64kb .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() {//Build event handling chain through handle @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } /**addLast If no EventExecutorGroup is passed in, the event is executed in the Work thread group by default*/ pipeline.addLast(//The core extension point of Netty, and the business logic of the application can be extended through the event processor defaultEventExecutorGroup, new NettyEncoder(),//RocketMQ request encoder, i.e. protocol encoder new NettyDecoder(),//RocketMQ request decoder, i.e. protocol decoder new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),//Idle detection new NettyConnectManageHandler(),//Connection manager new NettyClientHandler());//Netty client business processor, which processes business logic } }); //Delete scheduled tasks for expired requests this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
Specific steps:
1) , create DefaultEventExecutorGroup, which is the default event execution thread group
2) , call the group method of Bootstrap to specify a Work thread group. By default, read-write events are executed in this thread group, that is, IO threads; At the same time, specify the channel type through the channel method. NIO is used here
3) , specify the network reference through the option method of Bootstrap
4) Finally, the event processing chain is constructed through the hanle method of Bootstrap
- Establish connection
Step 1 above only creates the client and does not establish a connection. The connection will be established only when the message is sent. The relevant code is as follows:
if (createNewConnection) { ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); }
After the connection is established, it will be cached for subsequent reuse.
- For message sending, take synchronous sending as an example
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //Call back after response, which is asynchronous channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null);//The counter is decremented for wake-up operation log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//Call counter await wait operation
1) First, encapsulate a ResponseFuture, and then store it in the CurrentHashMap together with the request number. When the client receives the response from the server, it needs to find the corresponding ResponseFuture according to opaque, so as to wake up the client
2) Then, Netty calls the writeAndFlush method of the Channel to send the request. NettyEncoder will be used internally to encode the RemotingCommand request
3) , when the response returns, it will call back to the client and wake up the client
- Creation of service end. The service end here refers to Broker
1) , create two thread groups, Boss and Worker The Boss thread group is the master Reactor in the master-slave Reactor, which is used to listen to connections. The Worker thread group is the slave Reactor in the master-slave Reactor, which is used to handle read-write events.
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } });
2) , create a default event execution thread group
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } });
3) , use the Netty ServerBootstrap server startup class to build the server
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)//Specify two thread groups, boss and worker .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)//Specify channel type .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//Bind to the IP/PORT specified by the server .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } });
4) , call the bind method of ServerBootstrap to bind to the specified port
ChannelFuture sync = this.serverBootstrap.bind().sync();
4, Communication protocol
When a message is sent between the Client and the Server, a protocol agreement needs to be made for the sent message. The agreement can be divided into the following four parts:
(1) Message length: total length, four bytes storage, occupying an int type;
(2) Serialization type & message header length: it also occupies an int type. The first byte represents the serialization type, and the last three bytes represent the message header length;
(3) Header data: serialized header data;
(4) Message body data: binary byte data content of message body;
Contents to be supplemented:
Secondary encoding of netty
Thread isolation