RocketMQ--RocketMQ retry mechanism

RocketMQ Retry Mechanism

Message retries are divided into two types: retries for Producer sending messages and retries for Constumer consuming messages.

1. Producer End Retry

Producer-side retry refers to a failure of Producer to send a message to MQ, such as a failure of producer to send a message to MQ due to network reasons.

Take a look at the code:

@Slf4j
public class RocketMQTest {
    /**
     * Producer group
     */
    private static String PRODUCE_RGROUP = "test_producer";
  
    public static void main(String[] args) throws Exception {
        //1. Create Producer Object
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //Set the number of retries (default 2)
        producer.setRetryTimesWhenSendFailed(3000);
        //Bind name server
        producer.setNamesrvAddr("74.49.203.55:9876");
        producer.start();
        //Create message
        Message message = new Message("topic_family", ("Young 3 years old" ).getBytes());
        //Send here fill out timeout is 5 milliseconds so every send fails
        SendResult sendResult = producer.send(message,5);
        log.info("Output producer information={}",sendResult);
    }
}

Timeout retry The statement that the time-out exception said on the Internet will be retried is wrong. It's terrible to think about it. So all the articles I checked say that the time-out exception will be retried. Don't so many people have to test it or go to a source code?

I found this problem because I set the timeout above to 5 milliseconds, which would normally cause a timeout exception to be reported, but I set 1 retry and 3,000 retries, which would eventually report the following exception, but output the error timeout

Clearly, this should not be a level.But the test found that no matter how many retries I set, the time to report an exception was similar.

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

For this confusion, I went to see the source code before I realized it.

   /**
     * Explanation Extract Part Code
     */
    private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
        
        //1. Get the current time
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev ;
        //2. Go to the server and see if there is a topic message
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            //3. It is obvious from this that if the message is not sent synchronously, then the message will be retried only once
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            //4. Recycle to get server theme messages based on the number of retries set
            for (times = 0; times < timesTotal; times++) {
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                beginTimestampPrev = System.currentTimeMillis();
                long costTime = beginTimestampPrev - beginTimestampFirst;
                //5. Time comparison If the time difference between before and after is greater than the set wait time, jump out of the for loop directly, which means that connection timeout is not repeated connection retries.
                if (timeout < costTime) {
                    callTimeout = true;
                    break;

                }
                //6. Direct error if timeout occurs
                if (callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }
        }
    }

From this source you can clearly see the following points

  1. If sent asynchronously, retries only occur once
  2. For synchronization, the timeout exception will not be retried.
  3. If a retry occurs, it is retried in a for loop, so it is retried immediately instead of every other time.

It's true practice!!!


2. Consumer Retry

Consumer side is more interesting, and in the actual development process, we should also consider the consumer side retry.

There are two main types of consumer-side failures, Exception and Timeout.

1,Exception

@Slf4j
@Component
public class Consumer {
    /**
     * Consumer Entity Object
     */
    private DefaultMQPushConsumer consumer;
    /**
     * Consumer Group
     */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
     * Instantiate an object through a constructor
     */
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
        //Subscribe to information under topic and tags (*for all tags)
        consumer.subscribe("topic_family", "*");
        //Register and listen for consumption information and return status information about consumption
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //1. Get messages
            Message msg = msgs.get(0);
            try {
                //2. Consumers Get Messages
                String body = new String(msg.getBody(), "utf-8");
                //3. Get retries
                int count = ((MessageExt) msg).getReconsumeTimes();
                log.info("Current consumption retries are = {}", count);
                //4. Set here to retry more than 3 times then go for the bottom by saving the database manually
                if (count >= 2) {
                    log.info("The message has been retried three times,Save the database. topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //Throw an exception directly
                throw new Exception("=======Something went wrong here============");
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        //lsnrctl start
        consumer.start();
    }
}

The code here means obvious: actively throw an exception, then if it's more than three times, don't try again, but save the record to the database for human consumption.

Look at the results

Note that there are two main differences between consumer and producer retries

1. Default number of retries: Product default is 2, Consumer default is 16.

2. Retry interval: Product retries immediately, while Consumer has a certain interval.It retries under 1S,5S,10S,30S,1M,2M @ 2H.

2,Timeout

The timeout exception here is not really a timeout. It means that after getting a message, RocketMQ is not returned to the state of consumption for some reason, that is, no return ConsumeConcurrentlyStatus.CONSUME_SUCCESS or return ConsumeConcurrentlyStatus.RECONSUME_LATER.

RocketMQ then assumes that the message was not sent and that it will be sent all the time.Because it would assume that the message was not sent to consumers at all, it was definitely not consumed.

It's easy to do this test.

        //1. Consumers get messages
        String body = new String(msg.getBody(), "utf-8");
        //2. Get retries
        int count = ((MessageExt) msg).getReconsumeTimes();
        log.info("Current consumption retries are = {}", count);
        //3. Sleep here for 60 seconds
        Thread.sleep(60000);
       log.info("Hibernate for 60 seconds to see if you can get here again. topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
        //Return successful
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

Turn off the process when you get the current number of consumer retries = 0.Restart the process and you will still be able to get the message

Consumerer consumer current consumption retries = 0
 Hibernate for 60 seconds to see if you can get here again.topic=topic_family,keys=1a2b3c4d5f,msg=3 years younger




As long as you become better, everything else will follow (top 2)

Keywords: Linux Database Hibernate network Apache

Added by jrolands on Mon, 01 Jul 2019 19:28:42 +0300