RocketMQ network communication

brief introduction

The remoting module of RocketMQ is the basis of communication among RocketMQ client, consumer, NameServer and Broker. It is implemented through the netty framework. The inheritance structure of the core class is as follows:

The RemotingService interface defines the start(), shutdown(), registerRPCHook() methods. The registerRPCHook() method can register hook methods for services.

RemotingServer provides the basic functions of the server: registration request processor and message sending (synchronous, asynchronous and Oneway).

RemotingClient provides the basic functions of the client: setting the address of NameServer, registering the request processor, and sending messages (synchronous, asynchronous, Oneway).

The above functions are implemented by nettyremotengabstract, nettyremotenglient and nettyremotenserver. Next, let's analyze the specific implementation of RocketMQ communication function.

Server implementation

We mainly focus on the implementation of server request processing and message sending functions. In the construction method of nettyremoteingserver, you can see the process of server creation:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        // Create a Bootstrap object started by Netty Server
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        // Add ChannelEventListener event listener
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }
		// Create a common thread pool to be used as the thread pool for default request processing
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
		
		// Decide whether to use Epoll model according to the server platform
        if (useEpoll()) {
        	// Create selector thread of netty server
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
			// Create selector thread for childChannel
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }

        loadSslContext();
    }

The construction method creates serverbootstart, BossEventLoopGroup and childEventLoopGroup required by the netty server.
The method of adding netty server logic is in the start() method:

@Override
    public void start() {
    	//Default worker pool
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

		// handler needed to initialize netty server
        prepareSharableHandlers();

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                    	//Handshake, encoding and decoding, idle detection, connection management and task distribution are all handed over to the defaultEventExecutorGroup
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

We can see that the following five handlers are added to Netty Server: HandshakeHandler, NettyEncoder, NettyDecoder, NettyConnectManageHandler and NettyServerHandler.

HandshakeHandler handles handshake information. The specific logic is as follows:

@Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

            // mark the current position so that we can peek the first byte to determine if the content is starting with
            // TLS handshake
            msg.markReaderIndex();

            byte b = msg.getByte(0);
			// The first byte of the TSL handshake request header is 0x16
            if (b == HANDSHAKE_MAGIC_CODE) {
                switch (tlsMode) {
                	// The server does not support TSL, and the connection is rejected
                    case DISABLED:
                        ctx.close();
                        log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
                        break;
                    case PERMISSIVE:
                    case ENFORCING:
                        if (null != sslContext) {
                        	// Connect normally
                            ctx.pipeline()
                                .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
                                .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                            log.info("Handlers prepended to channel pipeline to establish SSL connection");
                        } else {
                            // The server supports TSL, but there is no sslContext. The connection is rejected
                            ctx.close();
                            log.error("Trying to establish an SSL connection but sslContext is null");
                        }
                        break;

                    default:
                        log.warn("Unknown TLS mode");
                        break;
                }
            } else if (tlsMode == TlsMode.ENFORCING) {
            	// The server side is configured with mandatory TSL. The first byte of the request is not 0x16 TSL handshake header, and the connection is rejected
                ctx.close();
                log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
            }

            // reset the reader index so that handshake negotiation may proceed as normal.
            msg.resetReaderIndex();

            try {
            	// After the handshake is completed, remove the HandshakeHandler
                // Remove this handler
                ctx.pipeline().remove(this);
            } catch (NoSuchElementException e) {
                log.error("Error while removing HandshakeHandler", e);
            }

            // Hand over this message to the next .
            ctx.fireChannelRead(msg.retain());
        }

NettyEncoder and NettyDecoder are respectively responsible for the encoding and decoding of the request. The specific logic is implemented by the RemotingCommand class. The request protocol and message format are described in the official document, which will not be analyzed here.

NettyConnectManageHandler is responsible for handling the requested connection, disconnection, shutdown, Idel, exception and other events. The specific logic is relatively simple, and we will not analyze it here.

NettyServerHandler is responsible for forwarding requests and responses. Here is the main logic. Let's analyze the specific process:

    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {			// Methods in the parent class nettyremotengabstract
            processMessageReceived(ctx, msg);
        }
    }

	
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                	// Request processing logic
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                	// Response processing logic
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

The request processing process is as follows:

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // Find the corresponding processing logic according to the request type: NettyRequestProcessor
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
        	// Define request processing logic
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                    	// Pre hook before rpc execution (actually hook before request processing)
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        // Define the callback after task processing is completed
                        final RemotingResponseCallback callback = new RemotingResponseCallback() {
                            @Override
                            public void callback(RemotingCommand response) {
                            	// Execute afterrpchs (actually after the request is processed and before the response result is written back)
                                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                                if (!cmd.isOnewayRPC()) {
                                    if (response != null) {
                                        response.setOpaque(opaque);
                                        response.markResponseType();
                                        try {
                                            ctx.writeAndFlush(response);
                                        } catch (Throwable e) {
                                            log.error("process request over, but response failed", e);
                                            log.error(cmd.toString());
                                            log.error(response.toString());
                                        }
                                    } else {
                                    }
                                }
                            }
                        };
                        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        	// If it is a processor of type AsyncNettyRequestProcessor, all operations are performed asynchronously
                            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                            processor.asyncProcessRequest(ctx, cmd, callback);
                        } else {
                        	// NettyRequestProcessor performs all operations synchronously. After obtaining the execution results, write back the response through the callback defined above
                            NettyRequestProcessor processor = pair.getObject1();
                            RemotingCommand response = processor.processRequest(ctx, cmd);
                            callback.callback(response);
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

            if (pair.getObject1().rejectRequest()) {
            	// If you need to reject the request, write back the system busy response directly
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
            	// Submit the task to the corresponding thread pool
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
            	// Thread pool triggered reject policy
                if ((System.currentTimeMillis() % 10000) == 0) {
                	// Sampling print log
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }
	
                if (!cmd.isOnewayRPC()) {
                	// If the request is not a OneWay type request, the response system is busy
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
        	// The NettyRequestProcessor processing the request was not found, responding to unsupported requests
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

We can see that the logic of processing the request is also simple. First find the task processor and task thread pool of the request from the processorTable or defaultRequestProcessor. Once found, submit the task processor to the task thread pool for execution. There is an AsyncNettyRequestProcessor. It should be noted that AsyncNettyRequestProcessor is an abstract class that inherits NettyRequestProcessor:

public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor {

    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
        RemotingCommand response = processRequest(ctx, request);
        responseCallback.callback(response);
    }
}

Although it is an abstract class, there is no abstract method. It provides a default implementation of asyncProcessRequest. In fact, it wants subclasses to override the logic of asynchronous execution.

Through analysis, we can see that we can implement the NettyRequestProcessor interface and register it in the processorTable to realize the processing logic of the server. As we analyzed, NettyRemotingServer provides a method to register NettyRequestProcessor:

    @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }

        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
        this.processorTable.put(requestCode, pair);
    }

After analyzing the request processing logic, the response processing logic is related to message sending, so here we first look at the message sending logic, and then look back at the message response processing logic.

The logic of message sending is in the NettyRemotingAbstract abstract class, which realizes synchronous sending, asynchronous sending and OneWay sending by invokeSyncImpl, invokeAsyncImpl and invokeonwayimpl methods respectively.

invokeSyncImpl:

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
        	// Create a custom ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            // Save ResponseFuture object to responseTable
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            // Send the request and register the write success callback
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {                
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
					//Remove the responseFuture object from the responseTable
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    //Release the blocking of the waitResponse method of responseFuture (implemented internally with CountDownLatch)
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
			//Release CountDownLatch after waiting for write completion callback
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

We can see that the logic of synchronous write is also very simple. Before sending the request, create a custom ResponseFuture, then send the write request and register the write success callback. Then suspend the thread through the ResponseFuture object to realize synchronous call and wait for the write callback to release the main thread.

invokeOnewayImpl:

	public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        request.markOnewayRPC();
        //Use semaphores to try to obtain execution permission. Here, semaphores are used to control the number of messages sent concurrently
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
        	//SemaphoreReleaseOnlyOnce ensures that semaphores are released only once to prevent thread safety problems caused by concurrent operations
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                    	// Release semaphore in write request completion callback
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
        	//Cannot get semaphore resource
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

We can control the amount of concurrent messages sent by OneWay. The process is also very simple.

invokeSyncImpl:

    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        final int opaque = request.getOpaque();
        //Control the number of concurrent messages sent asynchronously through semaphores
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                once.release();
                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
            }

            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            //Register responseFuture into responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

We can see that asynchronous transmission also needs to control the amount of requests sent concurrently through semaphores. Before sending, the response callback InvokeCallback is registered in the responseFuture object, and the responseFuture is stored in the responseTable. There is no logic to execute InvokeCallback in the invokeAsyncImpl method. Here, let's go back to the processResponseCommand method mentioned above, that is, the logic of NettyRemotingServer processing the response:

    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        //Gets the ResponseFuture object registered in the responseTable when sending the request
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
        	//Fill the response of the request into ResponseFuture
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
            	//Callback method registered when sending request
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

We can see that in the response processing logic, the main task is to execute the callback method registered during asynchronous sending.

Client implementation

The main function of the client is to set the address of the NameServer, register the request processor, and send messages (synchronous, asynchronous, Oneway). Its function implementation and the logic of the server have been roughly the same, which is not analyzed here.

Thread model

We can see that on the server side, netty's parentGroup is responsible for listening to TCP connection requests, establishing connections, creating socketchannels and registering with childGroup; Netty's childGroup is responsible for monitoring and reading network data. After obtaining the network data, it will hand over the data to the defaultevent executor group for processing; defaultEventExecutorGroup is responsible for SSL authentication, encoding and decoding, idle detection, connection management and task distribution; The processing of business logic uses the thread pool registered by their respective processors. Except that there is no boos thread on the client, the other thread models are similar to those on the server. The official thread model architecture diagram is quoted here:

summary

After our analysis, the remoting module of netty realizes a general server and client through netty. Both client and server realize the functions of synchronous sending, asynchronous sending and OneWay sending. And you can register a custom NettyRequestProcessor to implement custom message processing logic. In RocketMQ, the communication among nameserver, broker, producer and consumer depends on these components we analyze today.

Keywords: Java Netty

Added by Technex on Sat, 19 Feb 2022 11:57:32 +0200