1. Rabbitmq has its own retry mechanism
1 example code
rabbitMQ has its own message retry mechanism: when the consumer fails to consume the message, he can choose to "push" the message to the consumer again until the message consumption is successful.
To enable the built-in retry mechanism, the following configurations are required:
1. Enable the consumer's manual response mechanism, and the corresponding springboot configuration item:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2. When consumption is abnormal, set the message to be listed again
boolean multiple = false; // Single confirmation boolean requeue = true; // Re enter the queue, set carefully!!! It can easily lead to dead cycles, with 100% CPU channel.basicNack(tag, multiple, requeue);
The following is a running example:
The consumer code is as follows:
package com.fmi110.rabbitmq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicInteger; /** * @author fmi110 * @description Message consumer * @date 2021/7/1 16:08 */ @Component @Slf4j public class RabbitConsumer { AtomicInteger count = new AtomicInteger(); @RabbitListener(queues="my-queue") public void consumer1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ log.info(">>>> consumer1 consumption tag = {},frequency count={},Message content : {}",tag, count.incrementAndGet(),data); try { Thread.currentThread().sleep(1000); int i = 1/0; channel.basicAck(tag,true); // Confirmation message consumption succeeded } catch (Exception e) { log.error(">>>>Consumption is abnormal. The message re enters the queue and is consumed"); boolean multiple = false; // Single confirmation boolean requeue = true; // Re enter the queue, set carefully!!! channel.basicNack(tag, multiple, requeue); } } @RabbitListener(queues="my-queue") public void consumer2(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ log.info(">>>> consumer2 consumption tag = {},frequency count={},Message content : {}",tag, count.incrementAndGet(),data); try { Thread.currentThread().sleep(1000); int i = 1/0; channel.basicAck(tag,true); // Confirmation message consumption succeeded } catch (Exception e) { log.error(">>>>Consumption is abnormal. The message re enters the queue and is consumed"); boolean multiple = false; // Single confirmation boolean requeue = true; channel.basicNack(tag, multiple, requeue); } } }
Here, two consumers consumer1 and consumer2 are simulated, and the exception int 1/0 is artificially set in the logic, which is passed in the exception capture
channel.basicNack(tag, false, true);
Set the message to re-enter the queue and finally push it to the consumer for consumption again. The operation results are as follows:
The log contains several information points:
-
Consumers only consume one message at a time because I set spring rabbitmq. listener. simple. prefetch=1
-
Round robin algorithm for message push
-
rabbitMQ has two consumption modes: push and pull. The consumer mode created by springboot uses the push mode to consume this channel. basicConsume()
2 potential problems
As shown in the operation log, messages that re-enter the queue will be directly pushed to consumers at the head of the queue. If it is due to code logic problems, the message consumption will fail all the time, resulting in an endless loop!!!
It is reasonable to retry the consumption for a certain number of times, and if it still fails, terminate the retry, save the consumption exception message, report the exception, and handle it manually.
2. Combine spring retry and dead letter queue to realize message retry
A reasonable retry mechanism is as follows:
-
When message consumption is abnormal, retry with the retry mechanism provided by springboot
Because spring retry is used, an exception must be thrown in the method, otherwise spring retry will not be triggered!!!
-
When the retry still fails, the message is forwarded to the dead letter queue, and the consumer of the dead letter queue records and reports the exception information
To automatically forward the message consumption failure to the dead letter queue, rabbitmq needs to specify the dead letter queue bound to it when creating the message queue
The complete example code is as follows:
1. Configuration file application properties:
Spring. Com is commented out here rabbitmq. listener. simple. Acknowledge mode = manual. In this way, when the message consumption fails, it will be automatically transferred to the dead letter queue. If the manual confirmation mechanism is enabled, change must be called Basicnack (tag, false, false) messages will enter the dead letter queue!!!
# apply name spring.application.name=rabbitmq server.port=8080 server.servlet.context-path=/ spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 # Specify the connected virtual host. You can view the name of the corresponding virtual host on the rabbitMQ console spring.rabbitmq.virtual-host=my_vhost spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.listener.simple.prefetch=1 # Enable the publish comfort mechanism and the message route matching failure return mechanism spring.rabbitmq.publisher-returns=true spring.rabbitmq.publisher-confirm-type=correlated # Enable the ack mechanism for consumers # spring.rabbitmq.listener.simple.acknowledge-mode=manual # Open the retry provided by spring spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.retry.initial-interval=3000
2 RabbitConfig
The following settings are mainly made when the program is started:
-
Create a dead letter queue and a dead letter exchanger, and bind the dead letter queue to the dead letter exchanger.
-
Create a normal queue and a normal switch, bind the normal queue to the normal switch, and associate the dead letter queue with the normal queue. In this way, when the message consumption fails, the message will enter the dead letter queue (using the automatic ack mode).
package com.fmi110.rabbitmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * @author fmi110 * @description rabbitMQ Configuration class * @date 2021/7/1 15:08 */ @Configuration @Slf4j public class RabbitConfig { String dlQueueName = "my-queue-dl"; // Common queue name String dlExchangeName = "my-exchange-dl"; // Dead letter exchanger name String dlRoutingKey = "rabbit.test"; String queueName = "retry-queue"; String exchangeName = "my-exchange"; // Name of common exchanger /** * Create dead letter queue * * @return */ @Bean public Queue queueDL() { return QueueBuilder .durable(dlQueueName) // Persistent queue .build(); } /** * Create dead letter switch * * @return */ @Bean public TopicExchange exchangeDL() { return new TopicExchange(dlExchangeName, true, false); } /** * Binding operation */ @Bean public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) { log.info(">>>> Queue and switch binding"); return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey); } /** * Create a persistent queue and bind the dead letter exchange at the same time * * @return */ @Bean public Queue queue() { log.info(">>>> Create queue retry-queue"); HashMap<String, Object> params = new HashMap<>(); params.put("x-dead-letter-exchange", dlExchangeName); params.put("x-dead-letter-routing-key", dlRoutingKey); return QueueBuilder .durable(queueName) // Persistent queue .withArguments(params) // Associated dead letter exchanger .build(); } /** * Create switch * * @return */ @Bean public TopicExchange exchange() { log.info(">>>> Create switch my-exchange"); boolean durable = true; // Persistence boolean autoDelete = false; // Consumers will not be automatically deleted when they are all unbound return new TopicExchange(exchangeName, durable, autoDelete); } /** * Bind queue to switch * * @param queue * @param exchange * @return */ @Bean public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) { log.info(">>>> Queue and switch binding"); return BindingBuilder.bind(queue).to(exchange).with("rabbit.test"); } // /** // * spring-retry Retry mechanism: callback when the maximum number of retries is reached and the message consumption fails. // * If this class is enabled, the dead letter queue becomes invalid and message consumption fails. Even if the dead letter queue is configured, the message will not enter the dead letter queue. // * Retry failed, callback and dead letter queue can only choose one!!! The callback implementation classes provided by spring are as follows: // * RejectAndDontRequeueRecoverer : Consumption fails, and the message is no longer listed. spring uses it by default. // * ImmediateRequeueMessageRecoverer : Reclassify messages // * RepublishMessageRecoverer: Forward the message to the specified queue, // * @return // */ // @Bean // public MessageRecoverer messageRecoverer(){ // return new MessageRecoverer() { // @Override // public void recover(Message message, Throwable cause) { // log.info(message.toString()); // log. Info ("callback with the maximum number of spring retry retries and the message still fails"); // / / TODO: record and report the error information // } // }; // } }
3. RabbitProducer
In order to ensure that the message can be sent, the confirm confirmation mechanism is configured
package com.fmi110.rabbitmq; import com.rabbitmq.client.AMQP; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author fmi110 * @description Message producer * @date 2021/7/1 15:08 */ @Component @Slf4j public class RabbitProducer { @Autowired RabbitTemplate rabbitTemplate; /** * 1 Set the confirm callback, which is called back when the message is sent to exchange * 2 Set return callback to call back when the routing rule cannot match the message queue * * correlationData: When a message is sent, there is only one id attribute in the parameter passed, which is used to identify the message */ @PostConstruct public void enableConfirmCallback(){ // #1 /** * Callback when exchange is not connected or does not exist */ rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{ if (!ack) { log.error("Message sending failed"); // TODO records logs and sends notifications } }); // #2 /** * This method is called back only when the message delivery to the queue fails * message:Messages sent * exchange:The name of the switch to which the message was sent * routingKey:Routing keyword information carried by the message */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{ log.error("Message routing failed"); // TODO routing failure subsequent processing logic }); } public void send(String msg){ String exchangeName = "my-exchange"; // String routingKey = "aaa.xxx"; String routingKey = "rabbit.test"; rabbitTemplate.convertAndSend(exchangeName, routingKey, msg); } }
4 message consumer RabbitConsumer
package com.fmi110.rabbitmq; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.concurrent.atomic.AtomicInteger; /** * @author fmi110 * @description Message consumer * @date 2021/7/1 16:08 */ @Component @Slf4j public class RabbitConsumer { AtomicInteger count = new AtomicInteger(); /** * Ordinary queue consumers * @param data * @param channel * @param tag * @throws Exception */ @RabbitListener(queues="retry-queue") public void consumer(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ log.info(">>>> consumer consumption tag = {},frequency count={},Message content : {}",tag, count.incrementAndGet(),data); // TODO message processing logic throw new RuntimeException("Throw an exception, simulate consumption failure, trigger spring-retry"); } /** * Dead letter queue consumer * @param data * @param channel * @param tag * @throws Exception */ @RabbitListener(queues="my-queue-dl") public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{ log.info(">>>> Dead letter queue consumption tag = {},Message content : {}",tag,data); // channel.basicNack(tag, false, false); } }
5 Controller
Used to trigger sending messages
package com.fmi110.rabbitmq.controller; import com.fmi110.rabbitmq.RabbitProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @RestController public class TestController { @Autowired RabbitProducer rabbitProducer; @GetMapping("/test") public Object test() { rabbitProducer.send("this is a message"); HashMap<String, Object> result = new HashMap<>(); result.put("code", 0); result.put("msg", "success"); return result; } }
6 operation results
The operation log is as follows:
: >>>> consumer consumption tag = 1,frequency count=1,Message content : this is a message : >>>> consumer consumption tag = 1,frequency count=2,Message content : this is a message : >>>> consumer consumption tag = 1,frequency count=3,Message content : this is a message o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'this is a message' MessageProperties [headers={spring_listener_return_correlation=2840e95b-8544-4ed8-b3ed-8ba02aee2729}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=my-exchange, receivedRoutingKey=rabbit.test, deliveryTag=1, consumerTag=amq.ctag-a5AZEb9AYpOzL6mQJQIvaQ, consumerQueue=retry-queue]) ... Caused by: java.lang.RuntimeException: Throw an exception, simulate consumption failure, trigger spring-retry at com.fmi110.rabbitmq.RabbitConsumer.consumer(RabbitConsumer.java:36) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181] .... : >>>> Dead letter queue consumption tag = 1,Message content : this is a message
It can be seen from the log that consumers in the normal queue consume three times and still fail. Finally, they call back spring to provide RejectAndDontRequeueRecoverer, and then the message enters the dead letter queue for consumption.
7 pom dependency
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>compile</scope> </dependency> </dependencies>