Source Analysis RocketMQ Message Track

This article follows RocketMQ Message Track Design Thought, from the following three aspects to interpret its source code:

  1. Send Message Track
  2. Message Track Format
  3. Storing message track data

This program catalog

1. Send Message Track Flow

First, let's look at how to enable message tracking on the message sender. The sample code is as follows:

public class TraceProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);      // @1
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

From the code above, you can see that the key point is to specify that message track tracking is turned on when creating the DefaultMQProducer.Let's take a look at the DefaultMQProducer constructor associated with enabling message trajectories:

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

The parameters are as follows:

  • String producerGroup
    Group name to which the producer belongs.
  • boolean enableMsgTrace
    Whether to turn on tracking message track, defaults to false.
  • String customizedTraceTopic
    If message track tracking is turned on, the name of the subject to which the message track data belongs is stored by default: RMQ_SYS_TRACE_TOPIC.

1.1 DefaultMQProducer constructor

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {      // @1
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {                                                                                                                                                                                            // @2
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);                                                         
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                new SendMessageTraceHookImpl(traceDispatcher));                                                                                                                             // @3
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

Code @1: Let's start with its local variables.

  • String producerGroup
    Group of producers.
  • RPCHook rpcHook
    The producer sends the hook function.
  • boolean enableMsgTrace
    Whether to turn on message track tracking.
  • String customizedTraceTopic
    Customize the data used to store message tracks.

Code @2: Used to build an AsyncTraceDispatcher, see its name: Asynchronous forwarding of message track data, focus on it later.

Code @3: Build the SendMessageTraceHookImpl object and use AsyncTraceDispatcher for asynchronous forwarding.

1.2 SendMessageTraceHookImpl hook function

1.2.1 SendMessageTraceHookImpl class diagram

  1. SendMessageHook
    The Message Sending Hook function, which executes certain business logic before and after the message is sent, is the best extension point for recording message trajectories.
  2. TraceDispatcher
    The message track forwarding processor, which implements the class AsyncTraceDispatcher by default, asynchronously sends message track data.Here is a brief introduction to its properties:
    • int queueSize
      Asynchronous forwarding, queue length, default to 2048, current version cannot be modified.
    • int batchSize
      Number of batch messages, the number of data bars contained in a message send request in a message track, defaults to 100, and cannot be modified in the current version.
    • int maxMsgSize
      The maximum message size that a message track can send at one time, which defaults to 128K, cannot be modified by the current version.
    • DefaultMQProducer traceProducer
      The sender of the message used to send the message track.
    • ThreadPoolExecutor traceExecuter
      Thread pool for asynchronous execution of message sending.
    • AtomicLong discardCount
      Record the number of dropped messages.
    • Thread worker
      The woker thread, which takes care of getting a batch of message track data from the append queue and submits it to the thread pool for execution.
    • ArrayBlockingQueue< TraceContext> traceContextQueue
      The message track TraceContext queue, which holds messages to be sent to the server.
    • ArrayBlockingQueue< Runnable> appenderQueue
      Internal queue in thread pool, default length 1024.
    • DefaultMQPushConsumerImpl hostConsumer
      Consumer information, track information when message consumption is recorded.
    • String traceTopicName
      topic name used to track message tracks.

1.2.2 Source Analysis SendMessageTraceHookImpl

1.2.2.1 sendMessageBefore
public void sendMessageBefore(SendMessageContext context) { 
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {   // @1
        return;
    }
    //build the context content of TuxeTraceContext
    TraceContext tuxeContext = new TraceContext();
    tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
    context.setMqTraceContext(tuxeContext);
    tuxeContext.setTraceType(TraceType.Pub);
    tuxeContext.setGroupName(context.getProducerGroup());                                                                                                                       // @2
    //build the data bean object of message trace
    TraceBean traceBean = new TraceBean();                                                                                                                                                // @3
    traceBean.setTopic(context.getMessage().getTopic());
    traceBean.setTags(context.getMessage().getTags());
    traceBean.setKeys(context.getMessage().getKeys());
    traceBean.setStoreHost(context.getBrokerAddr());
    traceBean.setBodyLength(context.getMessage().getBody().length);
    traceBean.setMsgType(context.getMsgType());
    tuxeContext.getTraceBeans().add(traceBean);
}

Code @1: Return directly if the topic is Topic of the message track.

Code @2: In the context of message sending, set up the up and down environments for tracking message tracks, which mainly contain a TraceBean collection, tracking type (TraceType.Pub), and the group to which the producer belongs.

Code @3: Build a tracking message, represented by a TraceBean, to record the original message's topic, tags, keys, broker address, message body length, and so on.

As you can see from the above, the main purpose of sendMessageBefore is to prepare a portion of the message tracking log when the message is sent, which is stored in the sending context without sending the message track data.

1.2.2.2 sendMessageAfter
public void sendMessageAfter(SendMessageContext context) {
    //if it is message trace data,then it doesn't recorded
    if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())     // @1
        || context.getMqTraceContext() == null) {
        return;
    }
    if (context.getSendResult() == null) {
        return;
    }

    if (context.getSendResult().getRegionId() == null
        || !context.getSendResult().isTraceOn()) {
        // if switch is false,skip it
        return;
    }

    TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
    TraceBean traceBean = tuxeContext.getTraceBeans().get(0);                                                                                                // @2
    int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());     // @3
    tuxeContext.setCostTime(costTime);                                                                                                                                      // @4
    if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {                                                                    
        tuxeContext.setSuccess(true);
    } else {
        tuxeContext.setSuccess(false);
    }
    tuxeContext.setRegionId(context.getSendResult().getRegionId());                                                                                      
    traceBean.setMsgId(context.getSendResult().getMsgId());
    traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
    traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
    localDispatcher.append(tuxeContext);                                                                                                                                   // @5
}

Code @1: Return directly if the topic is Topic of the message track.

Code @2: TraceBean retrieved from MqTraceContext is designed as a List structure, but in a messaging scenario, there will always be only one piece of data, even if it is sent in bulk.

Code @3: Time-consuming to get messages sent to receive response results.

Code @4: Set costTime, success, regionId (sent to broker's partition), msgId (message ID, globally unique), offsetMsgId (physical offset of message, last message if bulk message), storeTime, which is used here (when client sends)Interval + half of the time spent) represents the storage time of the message, which is an estimate here.

Code @5: Forward the information you want to track to the Broker server through TraceDispatcher.The code is as follows:

public boolean append(final Object ctx) {
    boolean result = traceContextQueue.offer((TraceContext) ctx);
    if (!result) {
        log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
    }
    return result;
}

A key point here is the use of the offer method, which returns false immediately when the queue cannot accommodate new elements, without blocking them.

Next, turn your attention to the implementation of TraceDispatcher.

1.3 TraceDispatcher Implementation Principles

TraceDispatcher, which forwards client message track data to Broker, has a default implementation class: AsyncTraceDispatcher.

1.3.1 TraceDispatcher constructor

public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {    
    // queueSize is greater than or equal to the n power of 2 of value
    this.queueSize = 2048;
    this.batchSize = 100;
    this.maxMsgSize = 128000;                                        
    this.discardCount = new AtomicLong(0L);         
    this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
    this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
    if (!UtilAll.isBlank(traceTopicName)) {
        this.traceTopicName = traceTopicName;
    } else {
        this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
    }                   // @1
    this.traceExecuter = new ThreadPoolExecutor(// :
        10, //
        20, //
        1000 * 60, //
        TimeUnit.MILLISECONDS, //
        this.appenderQueue, //
        new ThreadFactoryImpl("MQTraceSendThread_"));
    traceProducer = getAndCreateTraceProducer(rpcHook);      // @2
}

Code @1: Initialize the core properties, these values are "solidified" in this version and cannot be modified by the user.

  • queueSize
    Queue length, default to 2048, the number of message tracks that the asynchronous thread pool can backlog.
  • batchSize
    Number of messages sent in bulk to Broker at one time, defaulting to 100.
  • maxMsgSize
    When reporting a message track to Broker, the total size of the message body cannot exceed this value, defaulting to 128k.
  • discardCount
    Throughout the run, message track data is discarded. It is important to note that if the message TPS is sent too large, the asynchronous forwarding thread will actively discard message track data when it cannot process it.
  • traceContextQueue
    The traceContext backlog queue, where the client (message sender, message consumer) submits the message track to the queue immediately after receiving the processing result.
  • appenderQueue
    Submit to a queue in the Broker thread pool.
  • traceTopicName
    Topic for receiving message tracks, defaulting to RMQ_SYS_TRANS_HALF_TOPIC.
  • traceExecuter
    Asynchronous thread pool for sending to Broker service, default number of core threads is 10, maximum thread pool is 20, queue stack length is 2048, thread name: MQTraceSendThread_.,
  • traceProducer
    Roducer that sends message tracks.

Code @2: Call the getAndCreateTraceProducer method to create a roducer (message sender) for sending a message track, which is detailed below.

1.3.2 getAndCreateTraceProducer Details

private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
        DefaultMQProducer traceProducerInstance = this.traceProducer;
        if (traceProducerInstance == null) {  //@1
            traceProducerInstance = new DefaultMQProducer(rpcHook);
            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
            traceProducerInstance.setSendMsgTimeout(5000);
            traceProducerInstance.setVipChannelEnabled(false);
            // The max size of message is 128K
            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
        }
        return traceProducerInstance;
    }

Code @1: If a sender has not been created yet, create a message sender to send the message track with GroupName: _INNER_TRACE_PRODUCER, a message delivery timeout of 5s, and a maximum message size of 118K allowed to be sent.

1.3.3 start

public void start(String nameSrvAddr) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {     // @1
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);   // @2
    this.worker.setDaemon(true);
    this.worker.start();                                                                                   
    this.registerShutDownHook();
}

Start, which is called when DefaultMQProducer is started, if Tracking Message Track is enabled.

Code @1: If the sender used to send the message track is not started, set the nameserver address and start.

Code @2: Start a thread to perform the AsyncRunnable task, which will be highlighted next.

1.3.4 AsyncRunnable

class AsyncRunnable implements Runnable {
         private boolean stopped;
	public void run() {
        while (!stopped) {
            List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);     // @1
            for (int i = 0; i < batchSize; i++) {
                TraceContext context = null;
                try {
                    //get trace data element from blocking Queue — traceContextQueue
                    context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);        // @2
                } catch (InterruptedException e) {
                }
                if (context != null) {
                    contexts.add(context);
                } else {
                    break;
                }
            }
            if (contexts.size() > 0) {                                                                               :
                AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);  // @3
                traceExecuter.submit(request);                                                               
            } else if (AsyncTraceDispatcher.this.stopped) {
                this.stopped = true;
            }
        }
    }
}

Code @1: Build a message tracking Bean to submit, sending batchSize at most, defaulting to 100.

Code @2: Remove a pending TraceContext from the traceContextQueue and set the timeout to 5s, i.e. how to wait up to 5S if there are no pending TraceContext s in the queue.

Code @3: Submit the task AsyncAppenderRequest to the thread pool.

1.3.5 AsyncAppenderRequest#sendTraceData

public void sendTraceData(List<TraceContext> contextList) {
    Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
    for (TraceContext context : contextList) {        //@1
        if (context.getTraceBeans().isEmpty()) {
            continue;
        }
        // Topic value corresponding to original message entity content
        String topic = context.getTraceBeans().get(0).getTopic();     // @2
        // Use  original message entity's topic as key
        String key = topic;
        List<TraceTransferBean> transBeanList = transBeanMap.get(key);
        if (transBeanList == null) {
            transBeanList = new ArrayList<TraceTransferBean>();
            transBeanMap.put(key, transBeanList);
        }
        TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);    // @3
        transBeanList.add(traceData);
    }
    for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {       // @4
        flushData(entry.getValue());
    }
}

Code @1: Traverse collected message track data.

Code @2: Get the Topic that stores the message track.

Code @3: Encode the TraceContext, where is the transport data for the message track. Take a closer look at it later to understand the format of its upload.

Code @4: Send the encoded data to the Broker server.

1.3.6 TraceDataEncoder#encoderFromContextBean

Depending on the type of message trajectory tracking, there are some differences in its format, which are described below.

1.3.6.1 PUB (Message Sending)
case Pub: {
    TraceBean bean = ctx.getTraceBeans().get(0);
    //append the content of context and traceBean to transferBean's TransData
    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
     .append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}

The protocol for message track data uses string splicing, the field's separator symbol is 1, and the whole data ends with 2. This design still feels a bit "amazing". Why not use the json protocol directly?

1.3.6.2 SubBefore
for (TraceBean bean : ctx.getTraceBeans()) {
    sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
      .append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
    }
}

Tracks are stitched together in the order described above, with fields separated by one and records ending with two.

1.3.2.3 SubAfter (after message consumption)
case SubAfter: {
    for (TraceBean bean : ctx.getTraceBeans()) {
        sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
          .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
        }
    }
}

As with format encoding, don't repeat.

After the source tracking above, the message track tracking process and the message track data encoding protocol at the message sender are clear. Next, we use a sequence diagram to end this section.

In its implementation, only the message track tracking of message sending is concerned. What is the track tracking of message consumption?The implementation principle is the same, that is, to execute a specific hook function before and after message consumption. Its implementation class is ConsumeMessageTraceHookImpl. Because its implementation is similar to the idea of message sending, it is not detailed.

2. How message track data is stored

In fact, from the above analysis, we have already learned that the message track data of RocketMQ is stored on the Broker, so how can the subject name of the message track be specified?How can its routing information be allocated?Is it created on each broker or just one of them?RocketMQ supports topics for default and custom message trajectories.

2.1 Use the system default theme name

The default message track theme for RocketMQ is: RMQ_SYS_TRACE_TOPIC, does that Topic need to be created manually?What about its routing information?

{
    if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {    // @1
        String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
        TopicConfig topicConfig = new TopicConfig(topic);
        this.systemTopicList.add(topic);
        topicConfig.setReadQueueNums(1);                                              // @2
        topicConfig.setWriteQueueNums(1);
        this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    }
}

The code above comes from the TopicConfigManager constructor, which creates a topicConfigManager object to manage the topic's routing information when the Broker starts.

Code @1: If Broker turns on message track tracking (traceTopicEnable=true), the top routing information for the default message track is automatically created, noting that the number of read-write queues is 1.

2.2 User-defined message trajectory theme

Topic for a specified message track that can be displayed when creating a message sender, message consumer, for example:

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

The message track Topic is specified through customizedTraceTopic.

Warm Tip: Often in production environments, automatic creation will not be turned on, so the RocketMQ operations manager needs to create Topic in advance.

Okay, this is where we go. This paper details the implementation of RocktMQ message trajectory. Next, we will go into multi-copy learning.

Keywords: encoding JSON

Added by Lyleyboy on Mon, 05 Aug 2019 05:07:26 +0300