Raising questions
Existence is meaningful. What is the purpose of storing the hashcode of the message tag in ConsumeQueue?
The data found is used for message filtering, because consumers can consume messages according to topics and tag s
consumer.subscribe("TopicTest", "TagA");
So is the message filtering in the broker or the condenser? According to the common sense, it is in the broker, because the traffic can be reduced in the broker. In fact, the broker filters most and the Consumer filters a small part
Structure of ConsumeQueue
ConsumeQueue stores the logical information of the topic, as shown in the figure below, representing a record. The recorded information is stored in commitLog at CommitLog Offset.
Offset is used to mark the position of the message in the CommitLog
Size marks the size of the message
HashCode is used to filter messages
Source tracking
Construction of SubscriptionData (started by Consumer)
Consumer s generally have subscribed topics and tag s
consumer.subscribe("TopicTest", "TagA");
If you follow it, you will follow the buildSubscriptionData method of the FilterAPI
public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { //Add tag's set //Add tag's set //Add tag's set subscriptionData.getTagsSet().add(trimString); //Add the set of hashcode of tag //Add the set of hashcode of tag //Add the set of hashcode of tag subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData; }
Summary: SubscriptionData contains tag list and hashcode list of tag
broker filtering messages
First, the Consumer sends a message to the broker. The request code is requestcode PULL_ Message, so we can contact the processor in borker to process the request code, and finally locate it
The PullMessageProcessor###processRequest method has the following code
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
Same as DefaultMessageStore###getMessage method
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { //ellipsis for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { //Gets the offset of the message long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); //Gets the size of the message int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); //Gets the hashcode of the tag of the message long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } //ellipsis //ellipsis //ellipsis //Check whether the message tag s match. At this time, implement filtering in the broker //Check whether the message tag s match. At this time, implement filtering in the broker //Check whether the message tag s match. At this time, implement filtering in the broker if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } //ellipsis //ellipsis //ellipsis return getResult; }
Follow up the matching method. At this time, you can find that the filtering method is to see whether there is tagsCode in the subscription data
//ExpressionMessageFilter### ```java @Override public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { //ellipsis //ellipsis //ellipsis //Whether the hashcode is included in the subscription topic return subscriptionData.getCodeSet().contains(tagsCode.intValue()); } else { //ellipsis }
Summary: the broker filters messages according to the hashcode list of the tag in the subscriptionData, and judges whether the hashcode of the tag read from the ConsumeQueue is in the hashcode list of the tag in the subscriptionData.
Consumer filtering messages
Where the Consumer starts, there is a PullCallback in the DefaultMQPushConsumerImpl###pullMessage method, which is a callback method after sending a pull message to the broker
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); //ellipsis }
Follow the pullapiwrapper ####processpullresult method
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { //ellipsis for (MessageExt msg : msgList) { if (msg.getTags() != null) { //Filter messages on the Consumer side //Filter messages on the Consumer side //Filter messages on the Consumer side if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } //ellipsis return pullResult; }
Summary: the message filtering on the broker side is to check whether the tag list in the subscription data contains the tag of the current message
Summary: both broker and Consuemr can filter
(1) When the Consumer starts, it will pass in topic and tag, and then encapsulate the string and hashcode of tag into SubscriptionData.
(2) When the concentrator goes to the broker to pull messages, check whether the hashcode list in SubscriptionData is consistent with the tag.hashcode read in consumqueue. This is the first time to filter most messages.
(3) When (2) the filtered message is sent to the consumer, the consumer will check whether it matches the current tag in the tag list in the SubscriptionData. This is the second filtering.
reference resources
https://www.bilibili.com/video/BV1fE411V7Ho?p=8 (from 1 hour and 5 minutes)