Rocketmq series NameServer for rocketmq source code analysis

1. RocketMQ component overview

NameServer
Nameserver is equivalent to the configuration center, maintaining Broker cluster, Broker information, Broker survival information, subject and queue information, etc. Nameservers do not communicate with each other. Each Broker maintains a long connection with all nameservers in the cluster.

2. Source code analysis NameServer

2.1 source code analysis NamesrvController

NameserController is the core control class of NameServer module.

2.1.1 NamesrvConfig

NamesrvConfig, which mainly specifies the relevant configuration properties of nameserver:

kvConfigPath(kvConfig.json).
mqhome/namesrv/namesrv.properties.
orderMessageEnable: whether to enable the sequence message function. The default value is false.

2.1.2 ScheduledExecutorService

private final ScheduledExecutorService scheduledExecutorService = Executors.
NameServer timed task execution thread pool. By default, two tasks are executed regularly:

Task 1. Scan brokers every 10s to maintain the current surviving Broker information.
Task 2. Print KVConfig information every 10s.

2.1.3 KVConfigManager

Read or change the configuration properties of NameServer, and load the configuration file configured in NamesrvConfig into memory. One highlight of this class is the use of lightweight non thread safe containers, combined with read-write locks to protect resource reads and writes. Maximize thread concurrency.

2.1.4 RouteInfoManager

NameServer is the carrier of data, recording Broker, Topic and other information.

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;                                         //@1
private final ReadWriteLock lock = new ReentrantReadWriteLock();                                                      //@2
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;                                   //@3
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;                                  //@4
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;    //@5
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;                                //@6

Code @ 1: the idle time between Nameserver and Broker is 2 minutes by default. If Nameserver does not receive the heartbeat packet from Broker within 2 minutes, the connection will be closed.

Code @2, read-write lock, is used to protect the non thread safe container HashMap.

Code @3, topicqueueuetable, the relationship between topic and queue, records the brokers on which the queues of a topic are distributed, and the number of queues of the topic on each Broker. QueueData queue description information, corresponding to the following attributes:

private String brokerName; // The name of the broker

private int readQueueNums; // Number of read queues

private int writeQueueNums; // Number of write queues

private int perm; // Permission Operation
Code @4, brokerAddrTable, all broker information, use brokerName as the key, and the brokerdata information describes each broker information.

//broker cluster
private String cluster;

// broker name
private String brokerName;

The IP address corresponding to the broker: port. broker id = 0 means Master, and greater than 0 means Slave.
Code @5, clusterAddrTable, Broker cluster information, which brokers are included in each cluster.

Code @6, brokerLiveTable, the current surviving broker. This information is not real-time. NameServer scans all brokers every 10S and knows the broker's status according to the heartbeat packet time. This mechanism also causes that when a broker process pretends to be dead, the message producer cannot immediately perceive it and may continue to send messages to it, resulting in failure (non high availability), How to ensure high availability of message sending, please pay attention to the follow-up articles in this series.

2.1.5 BrokerHousekeepingService

The BrokerHouseKeepingService implements the ChannelEventListener interface, which can be said to be a callback method when the channel sends exceptions (when the connection channel between Nameserver and Broker is closed, the channel sends exceptions, and the channel is idle), and removes the stopped Broker from the above data structure.

public interface ChannelEventListener {
void onChannelConnect(final String remoteAddr, final Channel channel);

void onChannelClose(final String remoteAddr, final Channel channel);

void onChannelException(final String remoteAddr, final Channel channel);

void onChannelIdle(final String remoteAddr, final Channel channel);

2.1.6 NettyServerConfig,RemotingServer ,ExecutorService

These three attributes are related to network communication. The network communication between NameServer and Broker, Producer and consumer is implemented based on Netty. This paper takes this opportunity to explore Netty thread model and Netty practical skills again.

Before analyzing network communication, we should pay attention to the following issues:

Configuration meaning of NettyServerConfig
Differences and functions between EventLoopGroup and EventExecutorGroup in Netty thread model
In the whole life cycle of the Channel, how to ensure that the read and write events of the Channel are processed by the same thread from beginning to end
First, let's talk about the configuration properties in NettyServerConfig:

private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;

With the above questions, we start the source code analysis org. Apache. Rocketmq. Remoting. Netty. Nettyremoteingserver.

1, serverWorkerThreads

Meaning: the number of threads in the business thread pool. According to the task type, RocketMQ has a special thread pool for each task type, such as sending messages and consuming messages, plus another thread pool (the default business thread pool). The default business thread pool is of fixed type, and its thread name is remotingexecutierthread_.

Scope of action: this parameter is currently mainly used for the default business thread pool of NameServer to process all interactive commands such as broker, producer, and consumer with NameServer.

Source code: org.apache.rocketmq.namesrv.NamesrvController

public boolean initialize() {

    this.kvConfigManager.load();

    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));   // @1

    this.registerProcessor();                 // @2

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    return true;
}

private void registerProcessor() {
    if (namesrvConfig.isClusterTest()) {

        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
            this.remotingExecutor);
    } else {

        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }

Code @ 1, create a fixed length thread pool with a thread capacity of serverWorkerThreads, which is used by the DefaultRequestProcessor class to implement specific default request command processing.

Code @ 2 binds DefaultRequestProcessor to the thread pool created by code @ 1.

Specific command calling class: org.apache.rocketmq.remoting.netty.nettyremotengabstract.

/**
* Process incoming request command issued by remote peer.
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }

                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    PLOG.error("process request exception", e);
                    PLOG.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }

This method is relatively simple. In fact, this method is a specific command processing template (template method). The specific command implementation is implemented by each subclass. The main responsibility of this class is to encapsulate the command into a thread object and then throw it into the thread pool for execution.

2,serverCallbackExecutorThreads

Meaning: Netty public task thread pool format. Thread Name: nettyserverpubliceexecutor_.

Source code: org.apache.rocketmq.remoting.netty.nettyremoteingserver.

3,serverSelectorThreads

Meaning: the number of Netty IO threads, the number of threads where the Selector is located, that is, the number of slave Reactor threads in the master-slave Reactor model.

Thread Name: nettyserver nioselector_.

Scope: the number of IO threads on the broker, product, and consumer server.

Source code: org.apache.rocketmq.remoting.netty.nettyremoteingserver.

4,serverOnewaySemaphoreValue, serverAsyncSemaphoreValue

Meaning: server oneway (one-way execution) and semaphore of asynchronous call (concurrency).

Source code: org.apache.rocketmq.remoting.netty.nettyremoteingserver.

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract:

Note: Oneway sending is only responsible for sending messages without waiting for a response from the server and without triggering the callback function, that is, only sending requests without waiting for a response.

Application scenario: it is applicable to some scenarios that take a very short time but do not require high reliability, such as log collection.

5. Other configuration parameters

//The channel idle time is 120S by default, which is realized through Netty's IdleStateHandler
private int serverChannelMaxIdleTimeSeconds = 120;

//socket send buffer size
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;

//socket receive buffer size
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;

//Whether pooledbytebuf (reusable, cache ByteBuf) is used

Keywords: RabbitMQ

Added by OnePlus on Thu, 30 Sep 2021 21:18:37 +0300