RocketMQ Retry Mechanism
Message retries are divided into two types: retries for Producer sending messages and retries for Constumer consuming messages.
1. Producer End Retry
Producer-side retry refers to a failure of Producer to send a message to MQ, such as a failure of producer to send a message to MQ due to network reasons.
Take a look at the code:
@Slf4j public class RocketMQTest { /** * Producer group */ private static String PRODUCE_RGROUP = "test_producer"; public static void main(String[] args) throws Exception { //1. Create Producer Object DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP); //Set the number of retries (default 2) producer.setRetryTimesWhenSendFailed(3000); //Bind name server producer.setNamesrvAddr("74.49.203.55:9876"); producer.start(); //Create message Message message = new Message("topic_family", ("Young 3 years old" ).getBytes()); //Send here fill out timeout is 5 milliseconds so every send fails SendResult sendResult = producer.send(message,5); log.info("Output producer information={}",sendResult); } }
Timeout retry The statement that the time-out exception said on the Internet will be retried is wrong. It's terrible to think about it. So all the articles I checked say that the time-out exception will be retried. Don't so many people have to test it or go to a source code?
I found this problem because I set the timeout above to 5 milliseconds, which would normally cause a timeout exception to be reported, but I set 1 retry and 3,000 retries, which would eventually report the following exception, but output the error timeout
Clearly, this should not be a level.But the test found that no matter how many retries I set, the time to report an exception was similar.
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
For this confusion, I went to see the source code before I realized it.
/** * Explanation Extract Part Code */ private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) { //1. Get the current time long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev ; //2. Go to the server and see if there is a topic message TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; //3. It is obvious from this that if the message is not sent synchronously, then the message will be retried only once int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; //4. Recycle to get server theme messages based on the number of retries set for (times = 0; times < timesTotal; times++) { MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; //5. Time comparison If the time difference between before and after is greater than the set wait time, jump out of the for loop directly, which means that connection timeout is not repeated connection retries. if (timeout < costTime) { callTimeout = true; break; } //6. Direct error if timeout occurs if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } } }
From this source you can clearly see the following points
- If sent asynchronously, retries only occur once
- For synchronization, the timeout exception will not be retried.
- If a retry occurs, it is retried in a for loop, so it is retried immediately instead of every other time.
It's true practice!!!
2. Consumer Retry
Consumer side is more interesting, and in the actual development process, we should also consider the consumer side retry.
There are two main types of consumer-side failures, Exception and Timeout.
1,Exception
@Slf4j @Component public class Consumer { /** * Consumer Entity Object */ private DefaultMQPushConsumer consumer; /** * Consumer Group */ public static final String CONSUMER_GROUP = "test_consumer"; /** * Instantiate an object through a constructor */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877"); //Subscribe to information under topic and tags (*for all tags) consumer.subscribe("topic_family", "*"); //Register and listen for consumption information and return status information about consumption consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { //1. Get messages Message msg = msgs.get(0); try { //2. Consumers Get Messages String body = new String(msg.getBody(), "utf-8"); //3. Get retries int count = ((MessageExt) msg).getReconsumeTimes(); log.info("Current consumption retries are = {}", count); //4. Set here to retry more than 3 times then go for the bottom by saving the database manually if (count >= 2) { log.info("The message has been retried three times,Save the database. topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //Throw an exception directly throw new Exception("=======Something went wrong here============"); //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); //lsnrctl start consumer.start(); } }
The code here means obvious: actively throw an exception, then if it's more than three times, don't try again, but save the record to the database for human consumption.
Look at the results
Note that there are two main differences between consumer and producer retries
1. Default number of retries: Product default is 2, Consumer default is 16.
2. Retry interval: Product retries immediately, while Consumer has a certain interval.It retries under 1S,5S,10S,30S,1M,2M @ 2H.
2,Timeout
The timeout exception here is not really a timeout. It means that after getting a message, RocketMQ is not returned to the state of consumption for some reason, that is, no return ConsumeConcurrentlyStatus.CONSUME_SUCCESS or return ConsumeConcurrentlyStatus.RECONSUME_LATER.
RocketMQ then assumes that the message was not sent and that it will be sent all the time.Because it would assume that the message was not sent to consumers at all, it was definitely not consumed.
It's easy to do this test.
//1. Consumers get messages String body = new String(msg.getBody(), "utf-8"); //2. Get retries int count = ((MessageExt) msg).getReconsumeTimes(); log.info("Current consumption retries are = {}", count); //3. Sleep here for 60 seconds Thread.sleep(60000); log.info("Hibernate for 60 seconds to see if you can get here again. topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body); //Return successful return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
Turn off the process when you get the current number of consumer retries = 0.Restart the process and you will still be able to get the message
Consumerer consumer current consumption retries = 0 Hibernate for 60 seconds to see if you can get here again.topic=topic_family,keys=1a2b3c4d5f,msg=3 years younger
As long as you become better, everything else will follow (top 2)