RabbitMQ message retry

1. Rabbitmq has its own retry mechanism

1 example code

rabbitMQ has its own message retry mechanism: when the consumer fails to consume the message, he can choose to "push" the message to the consumer again until the message consumption is successful.

To enable the built-in retry mechanism, the following configurations are required:

1. Enable the consumer's manual response mechanism, and the corresponding springboot configuration item:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

2. When consumption is abnormal, set the message to be listed again

 boolean multiple = false; // Single confirmation
 boolean requeue  = true; // Re enter the queue, set carefully!!! It can easily lead to dead cycles, with 100% CPU
 channel.basicNack(tag, multiple, requeue);

The following is a running example:

The consumer code is as follows:

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.util.concurrent.atomic.AtomicInteger;
​
/**
 * @author fmi110
 * @description Message consumer
 * @date 2021/7/1 16:08
 */
@Component
@Slf4j
public class RabbitConsumer {
​
    AtomicInteger count = new AtomicInteger();
​
    @RabbitListener(queues="my-queue")
    public void consumer1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
​
        log.info(">>>> consumer1 consumption tag = {},frequency count={},Message content : {}",tag, count.incrementAndGet(),data);
     
        try {
            Thread.currentThread().sleep(1000);
            int i = 1/0;
            channel.basicAck(tag,true); // Confirmation message consumption succeeded
        } catch (Exception e) {
            log.error(">>>>Consumption is abnormal. The message re enters the queue and is consumed");
            boolean multiple = false; // Single confirmation
            boolean requeue  = true; // Re enter the queue, set carefully!!!
            channel.basicNack(tag, multiple, requeue);
        }
    }
​
    @RabbitListener(queues="my-queue")
    public void consumer2(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
        log.info(">>>> consumer2 consumption tag = {},frequency count={},Message content : {}",tag, count.incrementAndGet(),data);
​
        try {
            Thread.currentThread().sleep(1000);
            int i = 1/0;
            channel.basicAck(tag,true); // Confirmation message consumption succeeded
        } catch (Exception e) {
            log.error(">>>>Consumption is abnormal. The message re enters the queue and is consumed");
            boolean multiple = false; // Single confirmation
            boolean requeue  = true; 
            channel.basicNack(tag, multiple, requeue);
        }
    }
}
​

Here, two consumers consumer1 and consumer2 are simulated, and the exception int 1/0 is artificially set in the logic, which is passed in the exception capture

 channel.basicNack(tag, false, true);

Set the message to re-enter the queue and finally push it to the consumer for consumption again. The operation results are as follows:

The log contains several information points:

  1. Consumers only consume one message at a time because I set spring rabbitmq. listener. simple. prefetch=1

  2. Round robin algorithm for message push

  3. rabbitMQ has two consumption modes: push and pull. The consumer mode created by springboot uses the push mode to consume this channel. basicConsume()

2 potential problems

As shown in the operation log, messages that re-enter the queue will be directly pushed to consumers at the head of the queue. If it is due to code logic problems, the message consumption will fail all the time, resulting in an endless loop!!!

It is reasonable to retry the consumption for a certain number of times, and if it still fails, terminate the retry, save the consumption exception message, report the exception, and handle it manually.

2. Combine spring retry and dead letter queue to realize message retry

A reasonable retry mechanism is as follows:

  1. When message consumption is abnormal, retry with the retry mechanism provided by springboot

    Because spring retry is used, an exception must be thrown in the method, otherwise spring retry will not be triggered!!!

  2. When the retry still fails, the message is forwarded to the dead letter queue, and the consumer of the dead letter queue records and reports the exception information

    To automatically forward the message consumption failure to the dead letter queue, rabbitmq needs to specify the dead letter queue bound to it when creating the message queue

The complete example code is as follows:

1. Configuration file application properties:

Spring. Com is commented out here rabbitmq. listener. simple. Acknowledge mode = manual. In this way, when the message consumption fails, it will be automatically transferred to the dead letter queue. If the manual confirmation mechanism is enabled, change must be called Basicnack (tag, false, false) messages will enter the dead letter queue!!!

# apply name
spring.application.name=rabbitmq
server.port=8080
server.servlet.context-path=/
​
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# Specify the connected virtual host. You can view the name of the corresponding virtual host on the rabbitMQ console
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
​
spring.rabbitmq.listener.simple.prefetch=1
​
# Enable the publish comfort mechanism and the message route matching failure return mechanism
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
# Enable the ack mechanism for consumers
# spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Open the retry provided by spring
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000

2 RabbitConfig

The following settings are mainly made when the program is started:

  1. Create a dead letter queue and a dead letter exchanger, and bind the dead letter queue to the dead letter exchanger.

  2. Create a normal queue and a normal switch, bind the normal queue to the normal switch, and associate the dead letter queue with the normal queue. In this way, when the message consumption fails, the message will enter the dead letter queue (using the automatic ack mode).

package com.fmi110.rabbitmq.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
import java.util.HashMap;
​
​
/**
 * @author fmi110
 * @description rabbitMQ Configuration class
 * @date 2021/7/1 15:08
 */
@Configuration
@Slf4j
public class RabbitConfig {
​
    String dlQueueName  = "my-queue-dl"; // Common queue name
    String dlExchangeName = "my-exchange-dl"; // Dead letter exchanger name
    String dlRoutingKey   = "rabbit.test";
​
    String queueName = "retry-queue";
    String exchangeName = "my-exchange"; // Name of common exchanger
​
    /**
     * Create dead letter queue
     *
     * @return
     */
    @Bean
    public Queue queueDL() {
​
        return QueueBuilder
                .durable(dlQueueName) // Persistent queue
                .build();
    }
​
    /**
     * Create dead letter switch
     *
     * @return
     */
    @Bean
    public TopicExchange exchangeDL() {
        return new TopicExchange(dlExchangeName, true, false);
    }
​
    /**
     * Binding operation
     */
    @Bean
    public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {
        log.info(">>>> Queue and switch binding");
        return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);
    }
​
    /**
     * Create a persistent queue and bind the dead letter exchange at the same time
     *
     * @return
     */
    @Bean
    public Queue queue() {
        log.info(">>>> Create queue retry-queue");
        HashMap<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", dlExchangeName);
        params.put("x-dead-letter-routing-key", dlRoutingKey);
​
        return QueueBuilder
                .durable(queueName) // Persistent queue
                .withArguments(params) // Associated dead letter exchanger
                .build();
    }
​
​
    /**
     * Create switch
     *
     * @return
     */
    @Bean
    public TopicExchange exchange() {
        log.info(">>>> Create switch my-exchange");
        boolean durable    = true; // Persistence
        boolean autoDelete = false; // Consumers will not be automatically deleted when they are all unbound
        return new TopicExchange(exchangeName, durable, autoDelete);
    }
​
    /**
     * Bind queue to switch
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {
        log.info(">>>> Queue and switch binding");
        return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");
    }
​
//    /**
//     * spring-retry Retry mechanism: callback when the maximum number of retries is reached and the message consumption fails.
//     * If this class is enabled, the dead letter queue becomes invalid and message consumption fails. Even if the dead letter queue is configured, the message will not enter the dead letter queue.
//     * Retry failed, callback and dead letter queue can only choose one!!! The callback implementation classes provided by spring are as follows:
//     * RejectAndDontRequeueRecoverer : Consumption fails, and the message is no longer listed. spring uses it by default.
//     * ImmediateRequeueMessageRecoverer : Reclassify messages
//     * RepublishMessageRecoverer: Forward the message to the specified queue,
//     * @return
//     */
//    @Bean
//    public MessageRecoverer messageRecoverer(){
//        return new MessageRecoverer() {
//            @Override
//            public void recover(Message message, Throwable cause) {
//                log.info(message.toString());
//                log. Info ("callback with the maximum number of spring retry retries and the message still fails");
// / / TODO: record and report the error information
//            }
//        };
//    }
}

3. RabbitProducer

In order to ensure that the message can be sent, the confirm confirmation mechanism is configured

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/**
 * @author fmi110
 * @description Message producer
 * @date 2021/7/1 15:08
 */
@Component
@Slf4j
public class RabbitProducer {
    @Autowired
    RabbitTemplate rabbitTemplate;
​
    /**
     * 1 Set the confirm callback, which is called back when the message is sent to exchange
     * 2 Set return callback to call back when the routing rule cannot match the message queue
     *
     * correlationData: When a message is sent, there is only one id attribute in the parameter passed, which is used to identify the message
     */
    @PostConstruct
    public void enableConfirmCallback(){
        // #1
        /**
         * Callback when exchange is not connected or does not exist
         */
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
            if (!ack) {
                log.error("Message sending failed");
                // TODO records logs and sends notifications
            }
        });
​
        // #2
        /**
         * This method is called back only when the message delivery to the queue fails
         * message:Messages sent
         * exchange:The name of the switch to which the message was sent
         * routingKey:Routing keyword information carried by the message
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{
            log.error("Message routing failed");
            // TODO routing failure subsequent processing logic
        });
    }
​
    public void send(String msg){
        String exchangeName = "my-exchange";
        // String routingKey   = "aaa.xxx";
        String routingKey   = "rabbit.test";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

4 message consumer RabbitConsumer

package com.fmi110.rabbitmq;
​
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
​
import java.util.concurrent.atomic.AtomicInteger;
​
​
/**
 * @author fmi110
 * @description Message consumer
 * @date 2021/7/1 16:08
 */
@Component
@Slf4j
public class RabbitConsumer {
​
    AtomicInteger count = new AtomicInteger();
​
    /**
     * Ordinary queue consumers
     * @param data
     * @param channel
     * @param tag
     * @throws Exception
     */
    @RabbitListener(queues="retry-queue")
    public void consumer(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
​
        log.info(">>>> consumer consumption tag = {},frequency count={},Message content : {}",tag, count.incrementAndGet(),data);
        // TODO message processing logic
        throw new RuntimeException("Throw an exception, simulate consumption failure, trigger spring-retry");
    }
​
    /**
     * Dead letter queue consumer
     * @param data
     * @param channel
     * @param tag
     * @throws Exception
     */
    @RabbitListener(queues="my-queue-dl")
    public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
        log.info(">>>> Dead letter queue consumption tag = {},Message content : {}",tag,data);
//        channel.basicNack(tag, false, false);
    }
}

5 Controller

Used to trigger sending messages

package com.fmi110.rabbitmq.controller;
​
​
import com.fmi110.rabbitmq.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
​
import java.util.HashMap;
​
@RestController
public class TestController {
    @Autowired
    RabbitProducer rabbitProducer;
​
    @GetMapping("/test")
    public Object test() {
​
        rabbitProducer.send("this is a message");
​
        HashMap<String, Object> result = new HashMap<>();
        result.put("code", 0);
        result.put("msg", "success");
        return result;
    }
}

6 operation results

The operation log is as follows:

: >>>> consumer consumption tag = 1,frequency count=1,Message content : this is a message
: >>>> consumer consumption tag = 1,frequency count=2,Message content : this is a message
: >>>> consumer consumption tag = 1,frequency count=3,Message content : this is a message
o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message 
(Body:'this is a message' MessageProperties 
[headers={spring_listener_return_correlation=2840e95b-8544-4ed8-b3ed-8ba02aee2729}, 
contentType=text/plain, contentEncoding=UTF-8, contentLength=0, 
receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, 
receivedExchange=my-exchange, receivedRoutingKey=rabbit.test, 
deliveryTag=1, consumerTag=amq.ctag-a5AZEb9AYpOzL6mQJQIvaQ, 
consumerQueue=retry-queue])
​
...
Caused by: java.lang.RuntimeException: Throw an exception, simulate consumption failure, trigger spring-retry
    at com.fmi110.rabbitmq.RabbitConsumer.consumer(RabbitConsumer.java:36) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181]
....
​
: >>>> Dead letter queue consumption tag = 1,Message content : this is a message

It can be seen from the log that consumers in the normal queue consume three times and still fail. Finally, they call back spring to provide RejectAndDontRequeueRecoverer, and then the message enters the dead letter queue for consumption.

7 pom dependency

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

       
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>compile</scope>
        </dependency>

</dependencies>

Keywords: RabbitMQ Spring Spring Boot

Added by sashi34u on Sun, 23 Jan 2022 10:08:53 +0200