brief introduction
The remoting module of RocketMQ is the basis of communication among RocketMQ client, consumer, NameServer and Broker. It is implemented through the netty framework. The inheritance structure of the core class is as follows:
The RemotingService interface defines the start(), shutdown(), registerRPCHook() methods. The registerRPCHook() method can register hook methods for services.
RemotingServer provides the basic functions of the server: registration request processor and message sending (synchronous, asynchronous and Oneway).
RemotingClient provides the basic functions of the client: setting the address of NameServer, registering the request processor, and sending messages (synchronous, asynchronous, Oneway).
The above functions are implemented by nettyremotengabstract, nettyremotenglient and nettyremotenserver. Next, let's analyze the specific implementation of RocketMQ communication function.
Server implementation
We mainly focus on the implementation of server request processing and message sending functions. In the construction method of nettyremoteingserver, you can see the process of server creation:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); // Create a Bootstrap object started by Netty Server this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; // Add ChannelEventListener event listener this.channelEventListener = channelEventListener; int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } // Create a common thread pool to be used as the thread pool for default request processing this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); // Decide whether to use Epoll model according to the server platform if (useEpoll()) { // Create selector thread of netty server this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); // Create selector thread for childChannel this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } loadSslContext(); }
The construction method creates serverbootstart, BossEventLoopGroup and childEventLoopGroup required by the netty server.
The method of adding netty server logic is in the start() method:
@Override public void start() { //Default worker pool this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); // handler needed to initialize netty server prepareSharableHandlers(); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //Handshake, encoding and decoding, idle detection, connection management and task distribution are all handed over to the defaultEventExecutorGroup ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
We can see that the following five handlers are added to Netty Server: HandshakeHandler, NettyEncoder, NettyDecoder, NettyConnectManageHandler and NettyServerHandler.
HandshakeHandler handles handshake information. The specific logic is as follows:
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // mark the current position so that we can peek the first byte to determine if the content is starting with // TLS handshake msg.markReaderIndex(); byte b = msg.getByte(0); // The first byte of the TSL handshake request header is 0x16 if (b == HANDSHAKE_MAGIC_CODE) { switch (tlsMode) { // The server does not support TSL, and the connection is rejected case DISABLED: ctx.close(); log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode"); break; case PERMISSIVE: case ENFORCING: if (null != sslContext) { // Connect normally ctx.pipeline() .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc())) .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder()); log.info("Handlers prepended to channel pipeline to establish SSL connection"); } else { // The server supports TSL, but there is no sslContext. The connection is rejected ctx.close(); log.error("Trying to establish an SSL connection but sslContext is null"); } break; default: log.warn("Unknown TLS mode"); break; } } else if (tlsMode == TlsMode.ENFORCING) { // The server side is configured with mandatory TSL. The first byte of the request is not 0x16 TSL handshake header, and the connection is rejected ctx.close(); log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode"); } // reset the reader index so that handshake negotiation may proceed as normal. msg.resetReaderIndex(); try { // After the handshake is completed, remove the HandshakeHandler // Remove this handler ctx.pipeline().remove(this); } catch (NoSuchElementException e) { log.error("Error while removing HandshakeHandler", e); } // Hand over this message to the next . ctx.fireChannelRead(msg.retain()); }
NettyEncoder and NettyDecoder are respectively responsible for the encoding and decoding of the request. The specific logic is implemented by the RemotingCommand class. The request protocol and message format are described in the official document, which will not be analyzed here.
NettyConnectManageHandler is responsible for handling the requested connection, disconnection, shutdown, Idel, exception and other events. The specific logic is relatively simple, and we will not analyze it here.
NettyServerHandler is responsible for forwarding requests and responses. Here is the main logic. Let's analyze the specific process:
@ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { // Methods in the parent class nettyremotengabstract processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // Request processing logic processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: // Response processing logic processResponseCommand(ctx, cmd); break; default: break; } } }
The request processing process is as follows:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // Find the corresponding processing logic according to the request type: NettyRequestProcessor final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { // Define request processing logic Runnable run = new Runnable() { @Override public void run() { try { // Pre hook before rpc execution (actually hook before request processing) doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); // Define the callback after task processing is completed final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { // Execute afterrpchs (actually after the request is processed and before the response result is written back) doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { // If it is a processor of type AsyncNettyRequestProcessor, all operations are performed asynchronously AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1(); processor.asyncProcessRequest(ctx, cmd, callback); } else { // NettyRequestProcessor performs all operations synchronously. After obtaining the execution results, write back the response through the callback defined above NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { // If you need to reject the request, write back the system busy response directly final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { // Submit the task to the corresponding thread pool final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { // Thread pool triggered reject policy if ((System.currentTimeMillis() % 10000) == 0) { // Sampling print log log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { // If the request is not a OneWay type request, the response system is busy final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { // The NettyRequestProcessor processing the request was not found, responding to unsupported requests String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
We can see that the logic of processing the request is also simple. First find the task processor and task thread pool of the request from the processorTable or defaultRequestProcessor. Once found, submit the task processor to the task thread pool for execution. There is an AsyncNettyRequestProcessor. It should be noted that AsyncNettyRequestProcessor is an abstract class that inherits NettyRequestProcessor:
public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor { public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception { RemotingCommand response = processRequest(ctx, request); responseCallback.callback(response); } }
Although it is an abstract class, there is no abstract method. It provides a default implementation of asyncProcessRequest. In fact, it wants subclasses to override the logic of asynchronous execution.
Through analysis, we can see that we can implement the NettyRequestProcessor interface and register it in the processorTable to realize the processing logic of the server. As we analyzed, NettyRemotingServer provides a method to register NettyRequestProcessor:
@Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis); this.processorTable.put(requestCode, pair); }
After analyzing the request processing logic, the response processing logic is related to message sending, so here we first look at the message sending logic, and then look back at the message response processing logic.
The logic of message sending is in the NettyRemotingAbstract abstract class, which realizes synchronous sending, asynchronous sending and OneWay sending by invokeSyncImpl, invokeAsyncImpl and invokeonwayimpl methods respectively.
invokeSyncImpl:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { // Create a custom ResponseFuture final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // Save ResponseFuture object to responseTable this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); // Send the request and register the write success callback channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } //Remove the responseFuture object from the responseTable responseTable.remove(opaque); responseFuture.setCause(f.cause()); //Release the blocking of the waitResponse method of responseFuture (implemented internally with CountDownLatch) responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); //Release CountDownLatch after waiting for write completion callback RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
We can see that the logic of synchronous write is also very simple. Before sending the request, create a custom ResponseFuture, then send the write request and register the write success callback. Then suspend the thread through the ResponseFuture object to realize synchronous call and wait for the write callback to release the main thread.
invokeOnewayImpl:
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); //Use semaphores to try to obtain execution permission. Here, semaphores are used to control the number of messages sent concurrently boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { //SemaphoreReleaseOnlyOnce ensures that semaphores are released only once to prevent thread safety problems caused by concurrent operations final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // Release semaphore in write request completion callback once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { //Cannot get semaphore resource if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
We can control the amount of concurrent messages sent by OneWay. The process is also very simple.
invokeSyncImpl:
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); //Control the number of concurrent messages sent asynchronously through semaphores boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); //Register responseFuture into responseTable this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
We can see that asynchronous transmission also needs to control the amount of requests sent concurrently through semaphores. Before sending, the response callback InvokeCallback is registered in the responseFuture object, and the responseFuture is stored in the responseTable. There is no logic to execute InvokeCallback in the invokeAsyncImpl method. Here, let's go back to the processResponseCommand method mentioned above, that is, the logic of NettyRemotingServer processing the response:
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); //Gets the ResponseFuture object registered in the responseTable when sending the request final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { //Fill the response of the request into ResponseFuture responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { //Callback method registered when sending request executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
We can see that in the response processing logic, the main task is to execute the callback method registered during asynchronous sending.
Client implementation
The main function of the client is to set the address of the NameServer, register the request processor, and send messages (synchronous, asynchronous, Oneway). Its function implementation and the logic of the server have been roughly the same, which is not analyzed here.
Thread model
We can see that on the server side, netty's parentGroup is responsible for listening to TCP connection requests, establishing connections, creating socketchannels and registering with childGroup; Netty's childGroup is responsible for monitoring and reading network data. After obtaining the network data, it will hand over the data to the defaultevent executor group for processing; defaultEventExecutorGroup is responsible for SSL authentication, encoding and decoding, idle detection, connection management and task distribution; The processing of business logic uses the thread pool registered by their respective processors. Except that there is no boos thread on the client, the other thread models are similar to those on the server. The official thread model architecture diagram is quoted here:
summary
After our analysis, the remoting module of netty realizes a general server and client through netty. Both client and server realize the functions of synchronous sending, asynchronous sending and OneWay sending. And you can register a custom NettyRequestProcessor to implement custom message processing logic. In RocketMQ, the communication among nameserver, broker, producer and consumer depends on these components we analyze today.