This article follows RocketMQ Message Track Design Thought, from the following three aspects to interpret its source code:
- Send Message Track
- Message Track Format
- Storing message track data
This program catalog
1. Send Message Track Flow
First, let's look at how to enable message tracking on the message sender. The sample code is as follows:
public class TraceProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); // @1 producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
From the code above, you can see that the key point is to specify that message track tracking is turned on when creating the DefaultMQProducer.Let's take a look at the DefaultMQProducer constructor associated with enabling message trajectories:
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)
The parameters are as follows:
- String producerGroup
Group name to which the producer belongs. - boolean enableMsgTrace
Whether to turn on tracking message track, defaults to false. - String customizedTraceTopic
If message track tracking is turned on, the name of the subject to which the message track data belongs is stored by default: RMQ_SYS_TRACE_TOPIC.
1.1 DefaultMQProducer constructor
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) { // @1 this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message trace feature if (enableMsgTrace) { // @2 try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); // @3 } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } } }
Code @1: Let's start with its local variables.
- String producerGroup
Group of producers. - RPCHook rpcHook
The producer sends the hook function. - boolean enableMsgTrace
Whether to turn on message track tracking. - String customizedTraceTopic
Customize the data used to store message tracks.
Code @2: Used to build an AsyncTraceDispatcher, see its name: Asynchronous forwarding of message track data, focus on it later.
Code @3: Build the SendMessageTraceHookImpl object and use AsyncTraceDispatcher for asynchronous forwarding.
1.2 SendMessageTraceHookImpl hook function
1.2.1 SendMessageTraceHookImpl class diagram
- SendMessageHook
The Message Sending Hook function, which executes certain business logic before and after the message is sent, is the best extension point for recording message trajectories. - TraceDispatcher
The message track forwarding processor, which implements the class AsyncTraceDispatcher by default, asynchronously sends message track data.Here is a brief introduction to its properties:- int queueSize
Asynchronous forwarding, queue length, default to 2048, current version cannot be modified. - int batchSize
Number of batch messages, the number of data bars contained in a message send request in a message track, defaults to 100, and cannot be modified in the current version. - int maxMsgSize
The maximum message size that a message track can send at one time, which defaults to 128K, cannot be modified by the current version. - DefaultMQProducer traceProducer
The sender of the message used to send the message track. - ThreadPoolExecutor traceExecuter
Thread pool for asynchronous execution of message sending. - AtomicLong discardCount
Record the number of dropped messages. - Thread worker
The woker thread, which takes care of getting a batch of message track data from the append queue and submits it to the thread pool for execution. - ArrayBlockingQueue< TraceContext> traceContextQueue
The message track TraceContext queue, which holds messages to be sent to the server. - ArrayBlockingQueue< Runnable> appenderQueue
Internal queue in thread pool, default length 1024. - DefaultMQPushConsumerImpl hostConsumer
Consumer information, track information when message consumption is recorded. - String traceTopicName
topic name used to track message tracks.
- int queueSize
1.2.2 Source Analysis SendMessageTraceHookImpl
1.2.2.1 sendMessageBefore
public void sendMessageBefore(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { // @1 return; } //build the context content of TuxeTraceContext TraceContext tuxeContext = new TraceContext(); tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1)); context.setMqTraceContext(tuxeContext); tuxeContext.setTraceType(TraceType.Pub); tuxeContext.setGroupName(context.getProducerGroup()); // @2 //build the data bean object of message trace TraceBean traceBean = new TraceBean(); // @3 traceBean.setTopic(context.getMessage().getTopic()); traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); traceBean.setBodyLength(context.getMessage().getBody().length); traceBean.setMsgType(context.getMsgType()); tuxeContext.getTraceBeans().add(traceBean); }
Code @1: Return directly if the topic is Topic of the message track.
Code @2: In the context of message sending, set up the up and down environments for tracking message tracks, which mainly contain a TraceBean collection, tracking type (TraceType.Pub), and the group to which the producer belongs.
Code @3: Build a tracking message, represented by a TraceBean, to record the original message's topic, tags, keys, broker address, message body length, and so on.
As you can see from the above, the main purpose of sendMessageBefore is to prepare a portion of the message tracking log when the message is sent, which is stored in the sending context without sending the message track data.
1.2.2.2 sendMessageAfter
public void sendMessageAfter(SendMessageContext context) { //if it is message trace data,then it doesn't recorded if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()) // @1 || context.getMqTraceContext() == null) { return; } if (context.getSendResult() == null) { return; } if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) { // if switch is false,skip it return; } TraceContext tuxeContext = (TraceContext) context.getMqTraceContext(); TraceBean traceBean = tuxeContext.getTraceBeans().get(0); // @2 int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size()); // @3 tuxeContext.setCostTime(costTime); // @4 if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) { tuxeContext.setSuccess(true); } else { tuxeContext.setSuccess(false); } tuxeContext.setRegionId(context.getSendResult().getRegionId()); traceBean.setMsgId(context.getSendResult().getMsgId()); traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId()); traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2); localDispatcher.append(tuxeContext); // @5 }
Code @1: Return directly if the topic is Topic of the message track.
Code @2: TraceBean retrieved from MqTraceContext is designed as a List structure, but in a messaging scenario, there will always be only one piece of data, even if it is sent in bulk.
Code @3: Time-consuming to get messages sent to receive response results.
Code @4: Set costTime, success, regionId (sent to broker's partition), msgId (message ID, globally unique), offsetMsgId (physical offset of message, last message if bulk message), storeTime, which is used here (when client sends)Interval + half of the time spent) represents the storage time of the message, which is an estimate here.
Code @5: Forward the information you want to track to the Broker server through TraceDispatcher.The code is as follows:
public boolean append(final Object ctx) { boolean result = traceContextQueue.offer((TraceContext) ctx); if (!result) { log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx); } return result; }
A key point here is the use of the offer method, which returns false immediately when the queue cannot accommodate new elements, without blocking them.
Next, turn your attention to the implementation of TraceDispatcher.
1.3 TraceDispatcher Implementation Principles
TraceDispatcher, which forwards client message track data to Broker, has a default implementation class: AsyncTraceDispatcher.
1.3.1 TraceDispatcher constructor
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; this.batchSize = 100; this.maxMsgSize = 128000; this.discardCount = new AtomicLong(0L); this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024); this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize); if (!UtilAll.isBlank(traceTopicName)) { this.traceTopicName = traceTopicName; } else { this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC; } // @1 this.traceExecuter = new ThreadPoolExecutor(// : 10, // 20, // 1000 * 60, // TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); traceProducer = getAndCreateTraceProducer(rpcHook); // @2 }
Code @1: Initialize the core properties, these values are "solidified" in this version and cannot be modified by the user.
- queueSize
Queue length, default to 2048, the number of message tracks that the asynchronous thread pool can backlog. - batchSize
Number of messages sent in bulk to Broker at one time, defaulting to 100. - maxMsgSize
When reporting a message track to Broker, the total size of the message body cannot exceed this value, defaulting to 128k. - discardCount
Throughout the run, message track data is discarded. It is important to note that if the message TPS is sent too large, the asynchronous forwarding thread will actively discard message track data when it cannot process it. - traceContextQueue
The traceContext backlog queue, where the client (message sender, message consumer) submits the message track to the queue immediately after receiving the processing result. - appenderQueue
Submit to a queue in the Broker thread pool. - traceTopicName
Topic for receiving message tracks, defaulting to RMQ_SYS_TRANS_HALF_TOPIC. - traceExecuter
Asynchronous thread pool for sending to Broker service, default number of core threads is 10, maximum thread pool is 20, queue stack length is 2048, thread name: MQTraceSendThread_., - traceProducer
Roducer that sends message tracks.
Code @2: Call the getAndCreateTraceProducer method to create a roducer (message sender) for sending a message track, which is detailed below.
1.3.2 getAndCreateTraceProducer Details
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) { DefaultMQProducer traceProducerInstance = this.traceProducer; if (traceProducerInstance == null) { //@1 traceProducerInstance = new DefaultMQProducer(rpcHook); traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME); traceProducerInstance.setSendMsgTimeout(5000); traceProducerInstance.setVipChannelEnabled(false); // The max size of message is 128K traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000); } return traceProducerInstance; }
Code @1: If a sender has not been created yet, create a message sender to send the message track with GroupName: _INNER_TRACE_PRODUCER, a message delivery timeout of 5s, and a maximum message size of 118K allowed to be sent.
1.3.3 start
public void start(String nameSrvAddr) throws MQClientException { if (isStarted.compareAndSet(false, true)) { // @1 traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); // @2 this.worker.setDaemon(true); this.worker.start(); this.registerShutDownHook(); }
Start, which is called when DefaultMQProducer is started, if Tracking Message Track is enabled.
Code @1: If the sender used to send the message track is not started, set the nameserver address and start.
Code @2: Start a thread to perform the AsyncRunnable task, which will be highlighted next.
1.3.4 AsyncRunnable
class AsyncRunnable implements Runnable { private boolean stopped; public void run() { while (!stopped) { List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize); // @1 for (int i = 0; i < batchSize; i++) { TraceContext context = null; try { //get trace data element from blocking Queue — traceContextQueue context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS); // @2 } catch (InterruptedException e) { } if (context != null) { contexts.add(context); } else { break; } } if (contexts.size() > 0) { : AsyncAppenderRequest request = new AsyncAppenderRequest(contexts); // @3 traceExecuter.submit(request); } else if (AsyncTraceDispatcher.this.stopped) { this.stopped = true; } } } }
Code @1: Build a message tracking Bean to submit, sending batchSize at most, defaulting to 100.
Code @2: Remove a pending TraceContext from the traceContextQueue and set the timeout to 5s, i.e. how to wait up to 5S if there are no pending TraceContext s in the queue.
Code @3: Submit the task AsyncAppenderRequest to the thread pool.
1.3.5 AsyncAppenderRequest#sendTraceData
public void sendTraceData(List<TraceContext> contextList) { Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>(); for (TraceContext context : contextList) { //@1 if (context.getTraceBeans().isEmpty()) { continue; } // Topic value corresponding to original message entity content String topic = context.getTraceBeans().get(0).getTopic(); // @2 // Use original message entity's topic as key String key = topic; List<TraceTransferBean> transBeanList = transBeanMap.get(key); if (transBeanList == null) { transBeanList = new ArrayList<TraceTransferBean>(); transBeanMap.put(key, transBeanList); } TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context); // @3 transBeanList.add(traceData); } for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) { // @4 flushData(entry.getValue()); } }
Code @1: Traverse collected message track data.
Code @2: Get the Topic that stores the message track.
Code @3: Encode the TraceContext, where is the transport data for the message track. Take a closer look at it later to understand the format of its upload.
Code @4: Send the encoded data to the Broker server.
1.3.6 TraceDataEncoder#encoderFromContextBean
Depending on the type of message trajectory tracking, there are some differences in its format, which are described below.
1.3.6.1 PUB (Message Sending)
case Pub: { TraceBean bean = ctx.getTraceBeans().get(0); //append the content of context and traceBean to transferBean's TransData sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR); }
The protocol for message track data uses string splicing, the field's separator symbol is 1, and the whole data ends with 2. This design still feels a bit "amazing". Why not use the json protocol directly?
1.3.6.2 SubBefore
for (TraceBean bean : ctx.getTraceBeans()) { sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);// } }
Tracks are stitched together in the order described above, with fields separated by one and records ending with two.
1.3.2.3 SubAfter (after message consumption)
case SubAfter: { for (TraceBean bean : ctx.getTraceBeans()) { sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR); } } }
As with format encoding, don't repeat.
After the source tracking above, the message track tracking process and the message track data encoding protocol at the message sender are clear. Next, we use a sequence diagram to end this section.
In its implementation, only the message track tracking of message sending is concerned. What is the track tracking of message consumption?The implementation principle is the same, that is, to execute a specific hook function before and after message consumption. Its implementation class is ConsumeMessageTraceHookImpl. Because its implementation is similar to the idea of message sending, it is not detailed.
2. How message track data is stored
In fact, from the above analysis, we have already learned that the message track data of RocketMQ is stored on the Broker, so how can the subject name of the message track be specified?How can its routing information be allocated?Is it created on each broker or just one of them?RocketMQ supports topics for default and custom message trajectories.
2.1 Use the system default theme name
The default message track theme for RocketMQ is: RMQ_SYS_TRACE_TOPIC, does that Topic need to be created manually?What about its routing information?
{ if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { // @1 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); // @2 topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } }
The code above comes from the TopicConfigManager constructor, which creates a topicConfigManager object to manage the topic's routing information when the Broker starts.
Code @1: If Broker turns on message track tracking (traceTopicEnable=true), the top routing information for the default message track is automatically created, noting that the number of read-write queues is 1.
2.2 User-defined message trajectory theme
Topic for a specified message track that can be displayed when creating a message sender, message consumer, for example:
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)
The message track Topic is specified through customizedTraceTopic.
Warm Tip: Often in production environments, automatic creation will not be turned on, so the RocketMQ operations manager needs to create Topic in advance.
Okay, this is where we go. This paper details the implementation of RocktMQ message trajectory. Next, we will go into multi-copy learning.