Rocketmq source code analysis - message retry and delay fault-tolerant logic in the Producer part

Problem point

1. What if the message fails to be sent (due to the network, the broker hangs up)? How does the sender achieve high availability?
2. How to select the message queue, that is, which message queue does the producer send messages to?
3. Why design a broker failure delay mechanism separately?

Producer message retry

When the producer sends a message, the three communication modes do not retry by default (synchronous, asynchronous and oneway).
In principle, message retry can ensure that the message is sent successfully and not lost, but message retry may cause the problem of repeated consumption of messages. Therefore, Rocketmq does not guarantee the idempotency of messages, so developers need to ensure the idempotency by themselves.

Rocketmq start message retry configuration:

    // Number of retries for message sending failure, 2 by default
    producer.setRetryTimesWhenSendFailed(2);
    // Message retry configuration. The message is not stored successfully. Do you want to send it to another broker
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);

1. Message retry in synchronization mode

Retry after failed to send a message synchronously. The retry method here can basically be understood as retrying other brokers. This method is only available for synchronous sending. The specific logic is in the main logic method sendDefaultImpl of message sending.

	private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

	//Omit code
 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            //An array that records the name of the target Broker that sent the message when retrying
            String[] brokersSent = new String[timesTotal];
			//Send message logic in a cycle of retry times
            for (; times < timesTotal; times++) {
            	//Select a message queue (brokerName!=lastBrokerName) and try again here with another broker
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
				//send message
				//Omit code
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                            //If the result sent is not SEND_OK, if retry is enabled, select another Broker to retry
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }								
			}
		//Omit code
	}
 public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //If lastBrokerName is empty, it indicates that it is sent normally for the first time
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //If lastBrokerName is not empty, it indicates that a retry has been made
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                //Get the message queue of a queue (brokerName!=lastBrokerName)
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            //If the message docking has not been obtained, make another selection
            return selectOneMessageQueue();
        }
    }

2. Message retry in asynchronous mode

Send a message in asynchronous mode and try again. The number of retries is the retrytimeswhensensayncfailed set by the Producer. The specific retry logic is mqclientapimpl and the method is onExceptionImpl.

    private void onExceptionImpl(final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int timesTotal,
        final AtomicInteger curTimes,
        final Exception e,
        final SendMessageContext context,
        final boolean needRetry,
        final DefaultMQProducerImpl producer
    ) {
    	//Current retry times increase by + 1
        int tmp = curTimes.incrementAndGet();
        //Need to retry & & current retry times < = configured global retry times
        if (needRetry && tmp <= timesTotal) {
            //By default, it is still sent to the same broker, that is, the broker that failed to send last time
            String retryBrokerName = brokerName;//by default, it will send to the same broker
            //If topic subscription information is not empty
            if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
               //Select a MessageQueue
                MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
                //
                retryBrokerName = mqChosen.getBrokerName();
            }
            //Get the corresponding broker address
            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
            log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
                retryBrokerName);
            try {
                request.setOpaque(RemotingCommand.createNewRequestId());
                //Re asynchronous
                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    timesTotal, curTimes, context, producer);
            } catch (InterruptedException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingConnectException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            } catch (RemotingTooMuchRequestException e1) {
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, false, producer);
            } catch (RemotingException e1) {
                producer.updateFaultItem(brokerName, 3000, true);
                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                    context, true, producer);
            }
        } else {
            try {
            	//If the above retry is invalid, an abnormal callback will be executed
                sendCallback.onException(e);
            } catch (Exception ignored) {
            }
        }
    }

Message queue selection

The logic of selecting a message queue is in selectOneMessageQueue. The general process is as follows.

  • When fault tolerance is not enabled, the polling queue is used for sending. If it fails, the failed brokers are filtered when retrying
  • If the fault tolerance policy is enabled, the prediction mechanism of RocketMQ will be used to predict whether a Broker can use latencyFaultTolerance
  • If the above situation fails, select one at random for transmission
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    	//Whether to enable the broker failure delay mechanism. The default is false - not enabled
        if (this.sendLatencyFaultEnable) {
            try {
            	//Select a message queue
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //Judge whether the brokerName is available. latencyFaultTolerance internally maintains a Map of FaultItem
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        //If available, return
                        return mq;
                }
				//Take a Broker from the Map of fault tolerance information FaultItem
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                //Get the writeable broker of the queue ID
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                //If queueid > 0
                if (writeQueueNums > 0) {
                	//Get a queue
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        //Set brokerName
                        mq.setBrokerName(notBestBroker);
                        // Queue reset
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
			//Randomly select a queue
            return tpInfo.selectOneMessageQueue();
        }
		//Select a message queue
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }
    public boolean isAvailable() {
    	//Current time - available time
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
   public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        // Put all the elements in the faultItemTable into the list
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            // Disrupt and then sort
            Collections.shuffle(tmpList);
            Collections.sort(tmpList);
        
            final int half = tmpList.size() / 2;
            if (half <= 0) {
                // There is only one element
                return tmpList.get(0).getName();
            } else {
                // Take surplus according to half
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }
        return null;
    }

Broker failure delay mechanism

1. Update broker delay information updateFaultItem

Review the process of message sending. When the message sends an exception, it will call updateFaultItem to update the broker exception information. Let's analyze it in detail.

   public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        //Whether to enable delay fault default false
        if (this.sendLatencyFaultEnable) {
            //Calculate the time duration of broker failure delay according to the current message delay currentLatency
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            //Update fault record
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
    
    //   private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //   private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    private long computeNotAvailableDuration(final long currentLatency) {
 		//For example, if the message sending delay is 4000L, the corresponding delay is 3000L, and the broker unavailable time is 180000L
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }

Therefore, the delay mechanism is mainly divided into two steps

  1. Calculate the broker unavailability duration according to the message sending delay (currentLatency). That is, if the message sending delay is longer, mq will think that the broker is unavailability longer. The broker unavailability duration is an empirical value. If the incoming isolation is true, it means that the current sending duration is 30000L by default, that is, the broker unavailability duration is 600000L.
  2. Call latencyfaulttolerance Updatefaultetem updates the broker exception delay tolerance information.

2. Delay fault tolerance

Latency fault tolerance is used to determine whether a broker is available. LatencyFaultToleranceImpl is implemented by default. A map of FaultItem is maintained internally. If the fault delay mechanism is enabled, a record will be added with brokerName as the key and FaultItem as the value, indicating that the brokerName will be marked as a fault before a certain time.

FaultItem is an inner class. There are three main attributes

parametertypeexplain
nameStingbrokerName
currentLatencylongDelay time for sending messages
startTimestamplongBefore this time, the broker name is marked as a failure
    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
   		//Obtain the corresponding fault information according to the BrokerName
        FaultItem old = this.faultItemTable.get(name);
        //If empty
        if (null == old) {
            //Create FaultItem object
            final FaultItem faultItem = new FaultItem(name);
            //Set the current sending delay time
            faultItem.setCurrentLatency(currentLatency);
            //Set next available time = current time + delayed time
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
			//Put into map
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

Keywords: Java Spring Boot Back-end Middleware message queue

Added by dmcdivitt on Thu, 03 Feb 2022 03:28:38 +0200