ConsumeQueue of RocketMQ storage

1, Function overview

As an important part of RocketMQ storage implementation, ConsumeQueue provides the index function of message storage in each Queue under each Topic. In other words, each Queue under each Topic will have a ConsumeQueue to save the messages under the Queue in the CommitLog location.
Like CommitLog, ConsumeQueue is implemented based on MappedFileQueue. Therefore, the implementations of ConsumeQueue and CommitLog have many similarities. In this article, we will only analyze the implementation of some core functions of ConsumeQueue. For other contents, readers can read the previous article on CommitLog first, and then study the implementation details of ConsumeQueue.

2, Source code analysis

1. Storage location

First, let's look at the construction method of ConsumeQueue:

    public ConsumeQueue(
        final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final DefaultMessageStore defaultMessageStore) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.defaultMessageStore = defaultMessageStore;

        this.topic = topic;
        this.queueId = queueId;
		// File storage location
        String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;

        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
        //Omit some contents
    }

We can see that the organizational structure of ConsumeQueue file storage is organized in topic/queueId directory under the basic storage directory.

2. Data storage

Unlike CommitLog, each record stored in ConsumeQueue is of fixed length. Fixed to 20 bytes. Let's look at the process of message writing:

    public void putMessagePositionInfoWrapper(DispatchRequest request) {
        final int maxRetries = 30;
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();
            if (isExtWriteEnable()) {
                // Storage of extended information
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                cqExtUnit.setFilterBitMap(request.getBitMap());
                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                cqExtUnit.setTagsCode(request.getTagsCode());

                long extAddr = this.consumeQueueExt.put(cqExtUnit);
                if (isExtAddr(extAddr)) {
                    tagsCode = extAddr;
                } else {
                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
                }
            }
            // Store data
            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                    this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                    this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                }
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                // XXX: warn and notify me
                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.warn("", e);
                }
            }
        }
        log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }

    /**
     *
     * @param offset offset in commitLog
     * @param size  message Size of
     * @param tagsCode message Hash of tag
     * @param cqOffset The location where the ConsumeQueue is expected to be written
     * @return
     */
    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            // The first mappedFile created needs to be pre written with blank data
            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                // It is expected that the position to be written is smaller than the current write pointer. It is considered that the write is repeated, and success is returned directly
                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    // The expected write position is not equal to the current write pointer position,
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            this.maxPhysicOffset = offset + size;
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

The maximum number of retries to write a ConsumeQueue message is 30. If the extended information write ID is configured, the extended information data will be written first, and then the ConsumeQueue data will be written. After successful writing, the LogicsMsgTimestamp ID of the checkpoint will be set, and the LogicsMsgTimestamp of the checkpoint will be set as the storage time of the message. This means that the storage of ConsumeQueue messages before this time point is successful.
Details of data writing: first take out the latest MappedFile, and then set the flush pointer and commit pointer of MappedFileQueue to expectLogicOffset. If there are spaces not written before expectLogicOffset, these spaces are written to blank values. Then write data from the specified location. The contents of a piece of data are as follows:

Data contentexplainsize
offsetThe location of the message in the CommitLog8 bytes
sizeThe location of the message in the CommitLog8 bytes
tagsCodeThe location of the message in the CommitLog8 bytes

The size of a MappedFile file is calculated as follows:

    	private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
    	
        int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
        return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);

The main reason for using the above method to calculate the file size is to make the file size ConsumeQueue CQ_ STORE_ UNIT_ Multiple of size, that is, multiple of the size of a piece of data stored in consumqueue. A MappedFile file of ConsumeQueue can store 30W records.

3. Data acquisition

Consumqueue has only one method for data acquisition:

    /**
     * Get the buffer starting from the startIndex index index (in a mappedFile)
     * @param startIndex
     * @return
     */
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) {
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
    }

The method of obtaining data is very simple. There is a noteworthy detail: startIndex is the index of the message, that is, the first message, not the file location index.

3, Summary

The implementation logic of ConsumeQueue is much simpler than CommitLog, so this article only analyzes the message storage and message reading. Readers can study other details by themselves. The function of ConsumeQueue is to store the position of the message of the next queueId of topic in the CommitLog. It can be understood as an index file. The advantage of this is that when a message is delivered to the broker, the messages written to the CommitLog and ConsumeQueue are written to the disk sequentially. When a message is consumed, the ConsumeQueue is read first, and the contents of the file are read in sequence. After reading the index of the message in the CommitLog, the CommitLog is read. In fact, the contents read by the CommitLog are also read in sequence. The data stored in the ConsumeQueue is stored first come first, so the position of the CommitLog pointed to by the extracted ConsumeQueue data is also in order. This design method makes full use of the physical characteristics of mechanical hard disk to maximize the disk read and write throughput.

Keywords: Java Concurrent Programming source code

Added by Nandini on Fri, 31 Dec 2021 21:42:04 +0200