Problem point
1. What if the message fails to be sent (due to the network, the broker hangs up)? How does the sender achieve high availability?
2. How to select the message queue, that is, which message queue does the producer send messages to?
3. Why design a broker failure delay mechanism separately?
Producer message retry
When the producer sends a message, the three communication modes do not retry by default (synchronous, asynchronous and oneway).
In principle, message retry can ensure that the message is sent successfully and not lost, but message retry may cause the problem of repeated consumption of messages. Therefore, Rocketmq does not guarantee the idempotency of messages, so developers need to ensure the idempotency by themselves.
Rocketmq start message retry configuration:
// Number of retries for message sending failure, 2 by default producer.setRetryTimesWhenSendFailed(2); // Message retry configuration. The message is not stored successfully. Do you want to send it to another broker producer.setRetryAnotherBrokerWhenNotStoreOK(true);
1. Message retry in synchronization mode
Retry after failed to send a message synchronously. The retry method here can basically be understood as retrying other brokers. This method is only available for synchronous sending. The specific logic is in the main logic method sendDefaultImpl of message sending.
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //Omit code int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; //An array that records the name of the target Broker that sent the message when retrying String[] brokersSent = new String[timesTotal]; //Send message logic in a cycle of retry times for (; times < timesTotal; times++) { //Select a message queue (brokerName!=lastBrokerName) and try again here with another broker MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); //send message //Omit code switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: //If the result sent is not SEND_OK, if retry is enabled, select another Broker to retry if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } //Omit code }
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //If lastBrokerName is empty, it indicates that it is sent normally for the first time if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //If lastBrokerName is not empty, it indicates that a retry has been made for (int i = 0; i < this.messageQueueList.size(); i++) { //Get the message queue of a queue (brokerName!=lastBrokerName) int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //If the message docking has not been obtained, make another selection return selectOneMessageQueue(); } }
2. Message retry in asynchronous mode
Send a message in asynchronous mode and try again. The number of retries is the retrytimeswhensensayncfailed set by the Producer. The specific retry logic is mqclientapimpl and the method is onExceptionImpl.
private void onExceptionImpl(final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int timesTotal, final AtomicInteger curTimes, final Exception e, final SendMessageContext context, final boolean needRetry, final DefaultMQProducerImpl producer ) { //Current retry times increase by + 1 int tmp = curTimes.incrementAndGet(); //Need to retry & & current retry times < = configured global retry times if (needRetry && tmp <= timesTotal) { //By default, it is still sent to the same broker, that is, the broker that failed to send last time String retryBrokerName = brokerName;//by default, it will send to the same broker //If topic subscription information is not empty if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send //Select a MessageQueue MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); // retryBrokerName = mqChosen.getBrokerName(); } //Get the corresponding broker address String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); //Re asynchronous sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { try { //If the above retry is invalid, an abnormal callback will be executed sendCallback.onException(e); } catch (Exception ignored) { } } }
Message queue selection
The logic of selecting a message queue is in selectOneMessageQueue. The general process is as follows.
- When fault tolerance is not enabled, the polling queue is used for sending. If it fails, the failed brokers are filtered when retrying
- If the fault tolerance policy is enabled, the prediction mechanism of RocketMQ will be used to predict whether a Broker can use latencyFaultTolerance
- If the above situation fails, select one at random for transmission
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //Whether to enable the broker failure delay mechanism. The default is false - not enabled if (this.sendLatencyFaultEnable) { try { //Select a message queue int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //Judge whether the brokerName is available. latencyFaultTolerance internally maintains a Map of FaultItem if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) //If available, return return mq; } //Take a Broker from the Map of fault tolerance information FaultItem final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); //Get the writeable broker of the queue ID int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); //If queueid > 0 if (writeQueueNums > 0) { //Get a queue final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { //Set brokerName mq.setBrokerName(notBestBroker); // Queue reset mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } //Randomly select a queue return tpInfo.selectOneMessageQueue(); } //Select a message queue return tpInfo.selectOneMessageQueue(lastBrokerName); }
@Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } public boolean isAvailable() { //Current time - available time return (System.currentTimeMillis() - startTimestamp) >= 0; }
public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); // Put all the elements in the faultItemTable into the list while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { // Disrupt and then sort Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { // There is only one element return tmpList.get(0).getName(); } else { // Take surplus according to half final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
Broker failure delay mechanism
1. Update broker delay information updateFaultItem
Review the process of message sending. When the message sends an exception, it will call updateFaultItem to update the broker exception information. Let's analyze it in detail.
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { //Whether to enable delay fault default false if (this.sendLatencyFaultEnable) { //Calculate the time duration of broker failure delay according to the current message delay currentLatency long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); //Update fault record this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } // private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; private long computeNotAvailableDuration(final long currentLatency) { //For example, if the message sending delay is 4000L, the corresponding delay is 3000L, and the broker unavailable time is 180000L for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
Therefore, the delay mechanism is mainly divided into two steps
- Calculate the broker unavailability duration according to the message sending delay (currentLatency). That is, if the message sending delay is longer, mq will think that the broker is unavailability longer. The broker unavailability duration is an empirical value. If the incoming isolation is true, it means that the current sending duration is 30000L by default, that is, the broker unavailability duration is 600000L.
- Call latencyfaulttolerance Updatefaultetem updates the broker exception delay tolerance information.
2. Delay fault tolerance
Latency fault tolerance is used to determine whether a broker is available. LatencyFaultToleranceImpl is implemented by default. A map of FaultItem is maintained internally. If the fault delay mechanism is enabled, a record will be added with brokerName as the key and FaultItem as the value, indicating that the brokerName will be marked as a fault before a certain time.
FaultItem is an inner class. There are three main attributes
parameter | type | explain |
---|---|---|
name | Sting | brokerName |
currentLatency | long | Delay time for sending messages |
startTimestamp | long | Before this time, the broker name is marked as a failure |
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { //Obtain the corresponding fault information according to the BrokerName FaultItem old = this.faultItemTable.get(name); //If empty if (null == old) { //Create FaultItem object final FaultItem faultItem = new FaultItem(name); //Set the current sending delay time faultItem.setCurrentLatency(currentLatency); //Set next available time = current time + delayed time faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //Put into map old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }