Talk to the interviewer about RocketMQ message sending, MessgQueue selection and high availability mechanism source code

Source version

4.8.0

Message sending source code entry

The simplest message sending code

		DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
        SendResult sendResult = producer.send(msg);

Today, we focus on the analysis of RocketMQ message sending, MessgQueue selection and high availability mechanism

therefore

producer.start();

We don't look at this code. We focus on producer.send(msg); This source code.
After entering, we will see the core source code of sending messages

Message sending core source code

The core method is org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

/**
     * message sending
     * @param msg news
     * @param communicationMode Synchronous asynchronous ONEWAY
     * @param sendCallback Message callback
     * @param timeout Timeout
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // Producer status check
        this.makeSureStateOK();
        // Didn't you check it out once? Message verification
        Validators.checkMessage(msg, this.defaultMQProducer);
        // Get random id
        final long invokeID = random.nextLong();
        // start time
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // Get the topic route from the cached topic route table. If it does not exist, initiate a lookup to the NameServer
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // Get the total sending times. If the message is sent synchronously, it will be retried 2 times if the message fails to be sent. Other sending methods will not be sent again
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // Select which MessageQueue to send to
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // Calculate the time it takes to send
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        // Real message sending
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // If it is a synchronous sending process, the result is returned
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                // If the execution is unsuccessful, try again
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

The whole core code process is very long. Our focus today is this line of code. This line of code is to select which message queue the message is sent to the Broker

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

Retry mechanism for Product sending messages

In the sending message, we can see the following code

// DefaultMQProducer.retryTimesWhenSendFailed
private int retryTimesWhenSendFailed = 2;

// Get the total sending times. If the message is sent synchronously, it will be retried 2 times if the message fails to be sent. Other sending methods will not be sent again
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            for (; times < timesTotal; times++) {
            // message sending

}

You can see that if the producer sends a message synchronously, it will retry sending the message twice by default (set by the parameter retryTimesWhenSendFailed). Here, it is based on Retry to solve the sending of exception messages

Broker failover mechanism

In the sending message source above, we will find that all exceptions will invoke the following methods in catch.

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

// -----------------------------------------------------

// org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        // fault
        if (this.sendLatencyFaultEnable) {
            // If unavailable, the isolation time is 30s
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
        
// ------------------------------------------------------
 private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);

// org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // List of problems
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

Here is to save the Broker that failed to send the message in a concurrent HashMap, which will be used later to check whether the Broker is available

Select the source code of MessageQueue

After analyzing some product retry and Broker problem avoidance methods for sending messages above, let's analyze the core method of selecting MessageQueue

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

After going deep into the selectOneMessageQueue method, you can see the following source code of selectOneMessageQueue

/**
     *
     * @param tpInfo topic Relevant information
     * @param lastBrokerName The last selected execution failed to send the message
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // Start Broker failure delay
        /**
         * If enabled, filter out not available Broker agents on the basis of randomly increasing modulus.
         * The so-called "latency fault tolerance" refers to backoff for a certain period of time for previous failures.
         * For example, if the latency of the last request exceeds 550Lms, back off 3000Lms; If it exceeds 1000L, retreat 60000L;
         * If it is closed, a message queue is selected to send messages by randomly increasing modulus,
         * latencyFaultTolerance Mechanism is the key to realize high availability of message sending
         */
        if (this.sendLatencyFaultEnable) {
            try {
                // Rotation training with simple self increment value is maintained through ThreadLocal
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // index and simple polling for the total number of columns in the current routing table
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    // Get the current corresponding queue to be sent
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // Verifying whether the queue is available is actually detecting whether the Broker is available
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // Try to select an available Broker from the circumvented brokers. If it is not found, it will return null.
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // Get the number of writable queues in the broker
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                // If the number of writable queues > 0, select a queue
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    // //Number of writable queues < = 0 remove the broker
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // Select messageQueue
            return tpInfo.selectOneMessageQueue();
        }

There is a core judgment

private boolean sendLatencyFaultEnable = false;
if (this.sendLatencyFaultEnable)

sendLatencyFaultEnable is the core key parameter of RocketMQ message sending high availability. It is not enabled by default. After it is enabled, it will automatically avoid the fault Broker.

Let's not look at the easiest way to open sendLatencyFaultEnable

return tpInfo.selectOneMessageQueue(lastBrokerName);

// org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.getAndIncrement();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

You can see that this evasion algorithm is very simple, that is, as long as the broker is not equal to the last exception broker

Let's focus on the analysis
Code in if (this.sendLatencyFaultEnable) {}

  1. Get messageQueueList list
  2. Use a simple rotation training algorithm through ThreadLocal to obtain MessageQueue
  3. Use the faultItemTable to check whether the Broker where the MessageQueue is located is available. If it is available, it will return. If it is not available, it will continue to train the next MessageQueue in rotation
  4. If all brokers are unavailable, a Broker is selected by the non worst random method. If the queue has a writable MessageQueue, it returns. If none is available, the Broker is removed

So far, this is the end of the RocketMQ message sending, MessgQueue selection and high availability mechanism source code analysis

summary

Selection and high availability mechanism of messenger queue for RocketMQ message sending

  1. Send a message to Product and try again
  2. Memory evasion for exception Broker

There is a delay for NameServer to detect whether the Broker is available, and the shortest is a heartbeat detection interval (10s); Secondly, NameServer will not push messages to the message producer immediately after the Broker is down, but the message producer updates the routing information every 30s, so it also takes 30s for the message producer to perceive the latest routing information of the Broker as soon as possible. If a mechanism can be introduced, the Broker can be temporarily excluded from the selection range of message queue if a message fails to be sent during Broker downtime

About me

Pure technology dry goods blogger. It's not easy to be original. I think the article is good. Please scan the code and pay attention to me

Keywords: Java message queue RocketMQ

Added by Langridge on Sun, 05 Dec 2021 19:18:36 +0200