Store for RocketMQ Source Analysis

Store for RocketMQ Source Analysis

Think Before Reading Store Source

  • When the number of topics increases to 100+, the TPS of kafka's single broker decreases by one order of magnitude, while RocketMQ remains high in the mass topic scenario?

    kafka has tens of thousands of TPS with less topic tps, but more topic will decrease to 1-2,000 TPS. But Rocket has more topics, and even tens of thousands of topics can maintain very high TPS

  • How does CommitLog's Random Read affect performance?

    RocketMQ is based on file storage. All messages'ontologies are stored on Commitlog. Messages are written sequentially and efficiently, but they are consumed thematically. Messages from one theme are distributed randomly on the Commitlog, so this is read randomly. How does this affect RocketMQ?

RocketMQ versus Kafka

The layout of files in Kafka is Topic/partition, one physical folder per partition, which implements file sequential writing at the partition file level. If there are hundreds or thousands of topics in a Kafka cluster, and hundreds of partitions per topic, the IO operations of messages will be scattered when concurrent writing is high (Disk IO competition becomes a bottleneck due to the dropping strategy of message dispersion), which is equivalent to random IO, i.e. Kafka's IO performance increases first and then decreases as the number of topics and partitions increases during message writing.

RocketMQ, on the other hand, pursues extreme sequential writing when writing messages. All messages are written to commitlog files in the same order regardless of topic, and their sequentiality is not affected by the number of topics and partitions.

In the scenario of message sender and consumer side coexistence, Kafka throughput decreases sharply as Topic number increases, while RocketMQ performs steadily. Therefore, Kafka is suitable for business scenarios where Topic and consumer side are both less (less throughput is greater at a time), while RocketMQ is more suitable for business scenarios with more Topic and more consumer side. (Platform tools are more applicable, there are many topics built, and Huawei is using MQS (packaged version of RocketMQ).

Message Format for CommitLog

Message Sending for Store Architecture Design

The entire storage design level is very clear, roughly as follows:

  1. The business layer, also known as the network layer, is usually assigned to SendMessageProcessor after receiving a message (to which business to process).

  2. DefaultMessageStore, which is the core entry into the storage layer.

  3. Another important thing is CommitLog.

These are the three core classes.

Store Layer Processing Message Entry

This store processes messages as an RPC request, so we look for an entry. Started by Broker, of course

// org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {
    //todo loads theme information json in Broker
    boolean result = this.topicConfigManager.load();
    //todo Load Consumption Progress
    result = result && this.consumerOffsetManager.load();
    //todo load subscription information
    result = result && this.subscriptionGroupManager.load();
    //todo load subscription consumer filter information
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            //Create Message Store Management Component
            this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                                        this.messageArrivingListener,
                                        this.brokerConfig);
            //If the multiple replica mechanism is turned on, a roleChangeHandler event handler is added to the cluster node selector, that is, the event handler after the node sends the changes.
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler = 
                    new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog())
                .getdLedgerServer().getdLedgerLeaderElector()
                .addRoleChangeHandler(roleChangeHandler);
            }
...
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }

    result = result && this.messageStore.load();

    if (result) {
      ...
        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(),
                                         new ThreadFactoryImpl("ConsumerManageThread_"));
        //Register various processing message requests
        this.registerProcessor(); // here
       ...
    }
    return result;
}

public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        //Core processing after todo receives a message
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
        //Register a series of task requests
        //todo registration message sending
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
}
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }

    Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
    this.processorTable.put(requestCode, pair);
}

// org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {
    if (result) {
        //Building a netty server
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
        //Here, todo builds a thread pool dedicated to message sending
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));

        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), 
                                         new ThreadFactoryImpl("ConsumerManageThread_"));
        //Register various processing message requests
        this.registerProcessor();
    }
       ...
}

SendMessageProcessor.processRequest

RocketMQ uses Netty to process the network, and when the framework receives a request for processing it enters the processRequest

// SendMessageProcessor#processRequest
//todo receives request processing
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
    RemotingCommand response = null;
    try {
        response = asyncProcessRequest(ctx, request).get();
    } catch (InterruptedException | ExecutionException e) {
        log.error("process SendMessage error, request : " + request.toString(), e);
    }
    return response;
}
// SendMessageProcessor#asyncProcessRequest()
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                              RemotingCommand request) {
    final SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.asyncConsumerSendMsgBack(ctx, request);
        default:
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
                return CompletableFuture.completedFuture(null);
            }
            //This part of todo is to implement message trajectories (not yet implemented)
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            if (requestHeader.isBatch()) {
                //todo Batch Messages
                return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                //todo non-bulk messages (general processing) 
                return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); // here
            }
    }
}
// 
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) {
    //todo's processing before sending, identifying the SeqNumber of RPC, checking the read and write permissions of queues, automatically creating Topic, and so on
    final RemotingCommand response = preSend(ctx, request, requestHeader); // here
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    if (response.getCode() != -1) {
        return CompletableFuture.completedFuture(response);
    }
...
    //If todo does not specify a QueueID, the system randomly specifies one
    if (queueIdInt < 0) {
        queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }
    //todo constructs the Message used inside the Broker (wraps one layer)
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic()); //theme
    msgInner.setQueueId(queueIdInt);          //queueId
    //Dead Letter Message Processing Logic for todo--If the number of consumer retries reaches online, it will enter the Dead Letter Queue
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return CompletableFuture.completedFuture(response);
    }
...
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            return CompletableFuture.completedFuture(response);
        }
        //Prepare message for todo transaction message
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
        //todo handles Commit/Rollback messages for normal and transactional messages
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

DefaultMessageStore.asyncPutMessage

// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }

    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }

    long beginTime = this.getSystemClock().now();
    //This enters the message processing logic for commitLog
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); // here

    putResultFuture.thenAccept((result) -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
    });

    return putResultFuture;
}

CommitLog. asyncPutMessage

// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    //Time Message s are stored in Broker
    msg.setStoreTimestamp(System.currentTimeMillis());
    //Message body's crc checkcode to prevent tampering and corruption of message content
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;
    //Store time-consuming information that can be collected and reported to the monitoring system
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();
    //Processing delay messages (timed messages)
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.
                getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            ...
        }
    }

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    //Get the content mapping file of the most recent CommitLog file (zero copy)
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    //todo putMessgae has multiple threads working in parallel and needs to be locked. You can configure reentrant or spin locks in the broker
    //The default is false, using a spin lock. Asynchronous brush discs recommend spin locks, synchronous brush discs recommend re-entry locks
    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;
        //Once you have the lock, set it up again so that it is globally ordered
        msg.setStoreTimestamp(beginLockTimestamp);
        //The latest Commtlog file is full, so create a new one.
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        //Todo 3.1 flushes this Message inside the Broker to MappedFile's memory (note that there is no brush disk at this time)
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    }
}

Mapped File File stored in MappedFile Queue

//Todo 3.1 flushes this Message inside the Broker to MappedFile's memory (note that there is no brush disk at this time)
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;
    //Current write location for this MaapedFile
    int currentPos = this.wrotePosition.get(); // here
    if (currentPos < this.fileSize) {
        //There are two brush modes to choose from when losing asynchronously
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {
            //todo non-batch processing
            result = cb.doAppend(this.getFileFromOffset(), 
                                 byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), 
                                 byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// Here we will not go into details, just some format processing of the data.

Synchronized Brush Disk: GroupCommitService (Separate Threads)

The brush disk initiates separate threading in the commitlog construction method

// org.apache.rocketmq.store.CommitLog#CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
...
    this.defaultMessageStore = defaultMessageStore;
    //todo synchronous brush disc
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        //todo asynchronous brush disc
        //FlushRealTimeService thread writes new additional memory from MappedByteBuffer to disk by default every 500ms
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //CommitRealTimeService thread submits new ByteBuffer additions to MappedByteBuffer every 200ms
    this.commitLogService = new CommitRealTimeService();
...
}
// CommitLog.GroupCommitService#run
public void run() {
    while (!this.isStopped()) {
        try {
            this.waitForRunning(10);
            this.doCommit(); // here
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    this.doCommit(); // here
}
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit                                           
private void doCommit() {
    //Locking
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                //todo here because the message may span files (full and generated one more time, so brush twice here)
                for (int i = 0; i < 2 && !flushOK; i++) {
                    //Brush disc
                    CommitLog.this.mappedFileQueue.flush(0);
                    //The todo brush pointer is greater than or equal to the commit pointer, indicating that the brush is successful
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                }
                //Wake Up Sending Message Client
                req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
            //Update Brush Disk Monitoring Points
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }

            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

Asynchronous brush disk: CommitRealTimeService/FlushCommitLogService (separate threads)

// org.apache.rocketmq.store.CommitLog#CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
...
    this.defaultMessageStore = defaultMessageStore;
    //todo synchronous brush disc
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        //todo asynchronous brush disc
        //FlushRealTimeService thread writes new additional memory from MappedByteBuffer to disk by default every 500ms
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //CommitRealTimeService thread submits new ByteBuffer additions to MappedByteBuffer every 200ms
    this.commitLogService = new CommitRealTimeService();
...
}

Message Consumption for Store Architecture Design

How does random reading and writing of CommitLog affect performance?

In RocketMQ, all queues store a file (commitlog), so rocketmq writes io sequentially and reads randomly. Each time a message is read, the metadata in the logical queue consumQue is read before the message body is found in the commitlog. Increased overhead.

So how is it optimized in RocketMQ?

  1. Either the Commitlog file or the Consumequeue file itself is mapped through MMAP memory. (One-time zero-copy technology)

  2. The Commitlog itself stores Commitlogs in containers that are copied at write time, separating read from write, so it can greatly improve some efficiency. (copy while writing)

Keywords: Java kafka RocketMQ

Added by fabby on Mon, 29 Nov 2021 23:46:15 +0200