RabbitMQ retry mechanism

The consumer may report an error during message processing. How to reprocess the message? There are two solutions.

  • Record the number of retries in redis or database. After the maximum number of retries is reached, the messages enter the dead letter queue or other queues, and then process these messages separately;

  • Use the retry function in spring rabbit;

We won't talk about the first scheme in detail. Let's mainly look at the second scheme. The old rule is to start with the code:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # Automatic ack
        retry:
          enabled: true
          max-attempts: 5
          max-interval: 10000   # Maximum retry interval
          initial-interval: 2000  # Retry initial interval
          multiplier: 2 # Interval multiplier, interval * multiplier = next interval, the maximum interval cannot exceed the set maximum interval

At this point, our consumer code is as follows:

@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
		log.info("Message received>>>{}",msg);
		int temp = 10/0;
		log.info("news{}Consumption success",msg);
}

At this time, start the program. After sending the message, you can see the console output as follows:

You can see that the number of retries is 5 (including one consumed by yourself), the retry times are 2s, 4s, 8s and 10s (the last interval * interval multiplier), and the last retry time is theoretically 16s. However, since the maximum interval is 10s, the last interval can only be 10s, which is consistent with the configuration.

be careful:

Retry does not mean that RabbitMQ resends the message, but only the internal retry of the consumer. In other words, retry has nothing to do with mq;

Therefore, try{}catch() {} cannot be added to the above consumer code. Once an exception is caught, in the automatic ack mode, it is equivalent to that the message is processed correctly, the message is directly confirmed, and the retry will not be triggered;

MessageReCoverer

In the test of the above example, we also found a problem, that is, after 5 retries, the console outputs an exception stack log, and then the data in the queue is ack (automatic ack mode). First, let's see what the exception log is.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted

The reason why the message is consumed and the above exception occurs is that the MessageRecoverer interface is used when building the SimpleRabbitListenerContainerFactoryConfigurer class. This interface has a cover method to process the message after retry. The source code is as follows:

ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
	RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
			: RetryInterceptorBuilder.stateful();
	RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
			.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
	builder.retryOperations(retryTemplate);
	MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
			: new RejectAndDontRequeueRecoverer(); //<1>
	builder.recoverer(recoverer);
	factory.setAdviceChain(builder.build());

Note that the code at < 1 > uses the RejectAndDontRequeueRecoverer implementation class by default. According to the name of the implementation class, we can see that the function of the implementation class is to reject and will not send messages back to the queue. We can see the specific contents of this implementation class:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
	protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
	@Override
	public void recover(Message message, Throwable cause) {
		if (this.logger.isWarnEnabled()) {
			this.logger.warn("Retries exhausted for message " + message, cause);
		}
		throw new ListenerExecutionFailedException("Retry Policy Exhausted",
					new AmqpRejectAndDontRequeueException(cause), message);
	}
}

The above source code gives the source of the exception, but does not see the code of the rejection message. I guess it should be implemented in the way of aop. I won't go further here.

The MessageRecoverer interface also has two other implementation classes, namely RepublishMessageRecoverer and ImmediateRequeueMessageRecoverer. As the name suggests, they are republishing messages and returning to the queue immediately. Let's test these two implementation classes respectively:

RepublishMessageRecoverer

First create an exception switch and exception queue and bind them:

@Bean
public DirectExchange errorExchange(){
	return new DirectExchange("error-exchange",true,false);
}

@Bean
public Queue errorQueue(){
	return new Queue("error-queue", true);
}

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
	return BindingBuilder.bind(errorQueue).to(errorExchange).with("error-routing-key");
}

To create a RepublishMessageRecoverer:

@Bean
public MessageRecoverer messageRecoverer(){
	return new RepublishMessageRecoverer(rabbitTemplate,"error-exchange","error-routing-key");
}

Start the service and view the processing results:

It can be seen from the console that after five retries, the message is directly sent to the configured switch with a new routingKey. At this time, check the monitoring page to see that there are no messages in the original queue, but there is a message in the configured exception queue.

ImmediateRequeueMessageRecoverer

Test the ImmediateRequeueMessageRecoverer again:

@Bean
public MessageRecoverer messageRecoverer(){
	return new ImmediateRequeueMessageRecoverer();
}

After 5 retries, return to the queue, and then try again 5 times, and repeat until no exception is thrown. This will still affect the subsequent message consumption.

Summary:

Through the above tests, for messages that are still abnormal after retry, you can use RepublishMessageRecoverer to send messages to other queues, and then process them specifically for the new queue.

Dead letter queue

In addition to the above-mentioned RepublishMessageRecoverer, you can also use the dead letter queue to process the message of retry failure.

First, create dead letter switch, dead letter queue and their binding

/**
 * Dead letter switch
 * @return
 */
@Bean
public DirectExchange dlxExchange(){
	return new DirectExchange(dlxExchangeName);
}

/**
 * Dead letter queue
 * @return
 */
@Bean
public Queue dlxQueue(){
	return new Queue(dlxQueueName);
}

/**
 * Dead letter queue binding dead letter switch
 * @param dlxQueue
 * @param dlxExchange
 * @return
 */
@Bean
public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
	return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
}

Some modifications need to be made to create the service queue. The configuration of dead letter switch and dead letter routing key should be added

/**
 * Service queue
 * @return
 */
@Bean
public Queue queue(){
	Map<String,Object> params = new HashMap<>();
	params.put("x-dead-letter-exchange",dlxExchangeName);//Declare the dead letter switch bound by the current queue
	params.put("x-dead-letter-routing-key",dlxRoutingKey);//Declare the dead letter routing key of the current queue
	return QueueBuilder.durable(queueName).withArguments(params).build();
    //return new Queue(queueName,true);
}

When the service is started, you can see that the business queue and dead letter queue are created at the same time

The identification of DLX and DLK appears on the service queue. The identification has been bound to the dead letter switch and the dead letter routing key. At this time, the producer is called to send the message. After the consumer retries 5 times, because the default implementation class of MessageCover is rejectanddontrequeuerecover, that is, request = false, and because the service queue is bound to the dead letter queue, Therefore, the message will be deleted from the business queue and sent to the dead letter queue at the same time.

be careful:

If the ack mode is manual ack, you need to call the channel.nack method and set request = false to send the exception message to the dead letter queue

retry usage scenario

What is retry and how to solve the data loss caused by retry, so how to choose the use scenario of retry?

Is it that consumers have to try again as long as there are exceptions? In fact, it is not. Suppose the following two scenarios:

  • http download video or pictures or call a third-party interface
  • Null pointer exception or type conversion exception (other checked runtime exceptions)

Obviously, the first case has the meaning of retry, and the second case does not.

In the first case, the request fails due to network fluctuation and other reasons, and retry is meaningful;

In the second case, it is meaningless to retry the problem that needs to be solved by modifying the code. What is needed is to record the log and handle it manually or by polling the task.

retry best practices

For abnormal messages on the consumer side, it is best to consume successfully in the limited retry process. For messages that still fail after limited retries, either RejectAndDontRequeueRecoverer or private message queue can be used. At the same time, the compromise method can also be used to ack the messages from the service queue first, Then send the message to another queue, and then process the exception data separately.

In addition, some people say that retry can only be used in the automatic ack mode. After testing, retry is also effective in the manual ack mode, but catch cannot be used to catch exceptions. Even if catch is used to catch exceptions in the automatic ack mode, retry will not be triggered. Of course, in the manual ackm mode, remember to confirm the message, whether the consumption is successful or failed, otherwise the message will remain unack until the consumer process is restarted or stopped.

If you must use the retry function in the manual ack mode, you'd better confirm that the retry can succeed in a limited number of retries. Otherwise, if the number of retries exceeds and you can't execute nack, the message will always be unack. I think this is the reason why the retry can only be used in the automatic ack mode. The test code is as follows:

@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
	log.info("Message received>>>{}",msg);
	if(msg.indexOf("0")>-1){
		throw new RuntimeException("Throw exception");
	}
	log.info("news{}Consumption success",msg);
	channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

Keywords: Java RabbitMQ Distribution message queue

Added by daddymac on Wed, 08 Dec 2021 07:29:39 +0200