Source version
4.8.0
Message sending source code entry
The simplest message sending code
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg);
Today, we focus on the analysis of RocketMQ message sending, MessgQueue selection and high availability mechanism
therefore
producer.start();
We don't look at this code. We focus on producer.send(msg); This source code.
After entering, we will see the core source code of sending messages
Message sending core source code
The core method is org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
/** * message sending * @param msg news * @param communicationMode Synchronous asynchronous ONEWAY * @param sendCallback Message callback * @param timeout Timeout * @return * @throws MQClientException * @throws RemotingException * @throws MQBrokerException * @throws InterruptedException */ private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // Producer status check this.makeSureStateOK(); // Didn't you check it out once? Message verification Validators.checkMessage(msg, this.defaultMQProducer); // Get random id final long invokeID = random.nextLong(); // start time long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // Get the topic route from the cached topic route table. If it does not exist, initiate a lookup to the NameServer TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // Get the total sending times. If the message is sent synchronously, it will be retried 2 times if the message fails to be sent. Other sending methods will not be sent again int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // Select which MessageQueue to send to MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // Calculate the time it takes to send long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // Real message sending sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // If it is a synchronous sending process, the result is returned switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // If the execution is unsuccessful, try again if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
The whole core code process is very long. Our focus today is this line of code. This line of code is to select which message queue the message is sent to the Broker
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
Retry mechanism for Product sending messages
In the sending message, we can see the following code
// DefaultMQProducer.retryTimesWhenSendFailed private int retryTimesWhenSendFailed = 2; // Get the total sending times. If the message is sent synchronously, it will be retried 2 times if the message fails to be sent. Other sending methods will not be sent again int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { // message sending }
You can see that if the producer sends a message synchronously, it will retry sending the message twice by default (set by the parameter retryTimesWhenSendFailed). Here, it is based on Retry to solve the sending of exception messages
Broker failover mechanism
In the sending message source above, we will find that all exceptions will invoke the following methods in catch.
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // ----------------------------------------------------- // org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // fault if (this.sendLatencyFaultEnable) { // If unavailable, the isolation time is 30s long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } // ------------------------------------------------------ private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); // org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // List of problems FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
Here is to save the Broker that failed to send the message in a concurrent HashMap, which will be used later to check whether the Broker is available
Select the source code of MessageQueue
After analyzing some product retry and Broker problem avoidance methods for sending messages above, let's analyze the core method of selecting MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
After going deep into the selectOneMessageQueue method, you can see the following source code of selectOneMessageQueue
/** * * @param tpInfo topic Relevant information * @param lastBrokerName The last selected execution failed to send the message * @return */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // Start Broker failure delay /** * If enabled, filter out not available Broker agents on the basis of randomly increasing modulus. * The so-called "latency fault tolerance" refers to backoff for a certain period of time for previous failures. * For example, if the latency of the last request exceeds 550Lms, back off 3000Lms; If it exceeds 1000L, retreat 60000L; * If it is closed, a message queue is selected to send messages by randomly increasing modulus, * latencyFaultTolerance Mechanism is the key to realize high availability of message sending */ if (this.sendLatencyFaultEnable) { try { // Rotation training with simple self increment value is maintained through ThreadLocal int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // index and simple polling for the total number of columns in the current routing table int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; // Get the current corresponding queue to be sent MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // Verifying whether the queue is available is actually detecting whether the Broker is available if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // Try to select an available Broker from the circumvented brokers. If it is not found, it will return null. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); // Get the number of writable queues in the broker int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); // If the number of writable queues > 0, select a queue if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { // //Number of writable queues < = 0 remove the broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // Select messageQueue return tpInfo.selectOneMessageQueue(); }
There is a core judgment
private boolean sendLatencyFaultEnable = false; if (this.sendLatencyFaultEnable)
sendLatencyFaultEnable is the core key parameter of RocketMQ message sending high availability. It is not enabled by default. After it is enabled, it will automatically avoid the fault Broker.
Let's not look at the easiest way to open sendLatencyFaultEnable
return tpInfo.selectOneMessageQueue(lastBrokerName); // org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
You can see that this evasion algorithm is very simple, that is, as long as the broker is not equal to the last exception broker
Let's focus on the analysis
Code in if (this.sendLatencyFaultEnable) {}
- Get messageQueueList list
- Use a simple rotation training algorithm through ThreadLocal to obtain MessageQueue
- Use the faultItemTable to check whether the Broker where the MessageQueue is located is available. If it is available, it will return. If it is not available, it will continue to train the next MessageQueue in rotation
- If all brokers are unavailable, a Broker is selected by the non worst random method. If the queue has a writable MessageQueue, it returns. If none is available, the Broker is removed
So far, this is the end of the RocketMQ message sending, MessgQueue selection and high availability mechanism source code analysis
summary
Selection and high availability mechanism of messenger queue for RocketMQ message sending
- Send a message to Product and try again
- Memory evasion for exception Broker
There is a delay for NameServer to detect whether the Broker is available, and the shortest is a heartbeat detection interval (10s); Secondly, NameServer will not push messages to the message producer immediately after the Broker is down, but the message producer updates the routing information every 30s, so it also takes 30s for the message producer to perceive the latest routing information of the Broker as soon as possible. If a mechanism can be introduced, the Broker can be temporarily excluded from the selection range of message queue if a message fails to be sent during Broker downtime
About me
Pure technology dry goods blogger. It's not easy to be original. I think the article is good. Please scan the code and pay attention to me