The role of tag in CousumeQueue

Raising questions

Existence is meaningful. What is the purpose of storing the hashcode of the message tag in ConsumeQueue?
The data found is used for message filtering, because consumers can consume messages according to topics and tag s

 consumer.subscribe("TopicTest", "TagA");

So is the message filtering in the broker or the condenser? According to the common sense, it is in the broker, because the traffic can be reduced in the broker. In fact, the broker filters most and the Consumer filters a small part

Structure of ConsumeQueue

ConsumeQueue stores the logical information of the topic, as shown in the figure below, representing a record. The recorded information is stored in commitLog at CommitLog Offset.

Offset is used to mark the position of the message in the CommitLog
Size marks the size of the message
HashCode is used to filter messages

Source tracking

Construction of SubscriptionData (started by Consumer)

Consumer s generally have subscribed topics and tag s

consumer.subscribe("TopicTest", "TagA");

If you follow it, you will follow the buildSubscriptionData method of the FilterAPI

public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);

        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            //Add tag's set
                            //Add tag's set
                            //Add tag's set
                            subscriptionData.getTagsSet().add(trimString);
                            
                            //Add the set of hashcode of tag
                            //Add the set of hashcode of tag
                            //Add the set of hashcode of tag
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }

        return subscriptionData;
    }

Summary: SubscriptionData contains tag list and hashcode list of tag

broker filtering messages

First, the Consumer sends a message to the broker. The request code is requestcode PULL_ Message, so we can contact the processor in borker to process the request code, and finally locate it
The PullMessageProcessor###processRequest method has the following code

final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

Same as DefaultMessageStore###getMessage method

 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        //ellipsis        
        
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //Gets the offset of the message
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             //Gets the size of the message
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //Gets the hashcode of the tag of the message
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }


                            //ellipsis
                            //ellipsis
                            //ellipsis
                            

                             //Check whether the message tag s match. At this time, implement filtering in the broker
                             //Check whether the message tag s match. At this time, implement filtering in the broker
                             //Check whether the message tag s match. At this time, implement filtering in the broker
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
                              

                          //ellipsis
                          //ellipsis
                          //ellipsis
        return getResult;
    }

Follow up the matching method. At this time, you can find that the filtering method is to see whether there is tagsCode in the subscription data

//ExpressionMessageFilter###

```java
 @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
        
            //ellipsis
            //ellipsis
            //ellipsis
        
            //Whether the hashcode is included in the subscription topic
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
           //ellipsis
    }

Summary: the broker filters messages according to the hashcode list of the tag in the subscriptionData, and judges whether the hashcode of the tag read from the ConsumeQueue is in the hashcode list of the tag in the subscriptionData.

Consumer filtering messages

Where the Consumer starts, there is a PullCallback in the DefaultMQPushConsumerImpl###pullMessage method, which is a callback method after sending a pull message to the broker

PullCallback pullCallback = new PullCallback() {
@Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

         //ellipsis
}

Follow the pullapiwrapper ####processpullresult method

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
       
              //ellipsis


          
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                    //Filter messages on the Consumer side
                    //Filter messages on the Consumer side
                    //Filter messages on the Consumer side
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            
           
           //ellipsis

        return pullResult;
    }

Summary: the message filtering on the broker side is to check whether the tag list in the subscription data contains the tag of the current message

Summary: both broker and Consuemr can filter

(1) When the Consumer starts, it will pass in topic and tag, and then encapsulate the string and hashcode of tag into SubscriptionData.
(2) When the concentrator goes to the broker to pull messages, check whether the hashcode list in SubscriptionData is consistent with the tag.hashcode read in consumqueue. This is the first time to filter most messages.
(3) When (2) the filtered message is sent to the consumer, the consumer will check whether it matches the current tag in the tag list in the SubscriptionData. This is the second filtering.

reference resources

https://www.bilibili.com/video/BV1fE411V7Ho?p=8 (from 1 hour and 5 minutes)

Keywords: Java Back-end

Added by flunn on Thu, 30 Dec 2021 00:02:59 +0200