concept
The dead letter queue, as its name implies, is a message that cannot be consumed. Sometimes some messages in the queue cannot be consumed due to specific reasons. If there is no subsequent processing, such messages will become dead letters and be added to the private letter queue.
Source of dead letter
- Message TTL expired
- The queue has reached the maximum length (the queue is full and can no longer add data to mq)
- The message is rejected (basic.reject or basic.nack) and request = false
Code demonstration ttl expiration effect
Simulate the relationship below
Add dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Set profile
# Service port server: port: 8080 # Configure rabbitmq service spring: rabbitmq: username: admin password: admin virtual-host: / host: 192.168.136.128 port: 9999 listener: type: simple simple: default-requeue-rejected: false acknowledge-mode: manual
The default request rejected should be changed to false, which means that after the consumer rejects the message, it will not be put back into the normal queue.
Create producer
@Component public class Producer{ @Autowired private RabbitTemplate rabbitTemplate; // 2: Routing key private String routeKey = "zhangsan"; public void makeOrder() { // Send message to all customers for(int i = 1;i<=10;i++){ String content = "news"+i; System.out.println("Producer sends message:"+i) r abbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE, routeKey, content, message -> { //Set the message expiration time of 10s. If the message is not accepted by the consumer within ten seconds, it will be automatically transferred to the dead letter queue message.getMessageProperties().setExpiration("10000"); retrun message; }); } } }
Create configuration class
@Configuration public class RabbitConfig { //Common switch name public static final String NORMAL_EXCHANGE = "normal_exchange"; //Dead letter switch name public static final String DEAD_EXCHANGE = "dead_exchange"; //Common queue name public static final String NORMAL_QUEUE = "normal_queue"; //Dead letter queue name public static final String DEAD_QUEUE = "dead_queue"; //Declaration queue @Bean public Queue normalQueueDeclared() { //Normal queue binding dead letter queue information Map<String, Object> params = new HashMap<>(); //The parameter key of normal queue setting dead letter switch is a fixed value params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //Set the dead letter routing key parameter for the normal queue. The key is a fixed value params.put("x-dead-letter-routing-key", "lisi"); return new Queue(NORMAL_QUEUE , true, false, false, params); } //Declare dead letter queue @Bean public Queue deadlQueueDeclared() { return new Queue(DEAD_QUEUE , true ,false ,false ,null); } //Declare common switch @Bean public DirectExchange myNormalExchange() { return new DirectExchange(NORMAL_EXCHANGE); } //Declare dead letter switch @Bean public DirectExchange myDeadExchange() { return new DirectExchange(DEAD_EXCHANGE); } //Bind queue to switch @Bean public Binding binding1() { return BindingBuilder.bind(normalQueueDeclared()).to(myNormalExchange()).with("zhangsan"); } @Bean public Binding binding2() { return BindingBuilder.bind(deadlQueueDeclared()).to(myDeadExchange()).with("lisi"); } }
The consumer establishes a separate boot project, only starts the producer, does not start the consumer, and simulates the effect of message expiration, c1
// bindings is used to determine the binding relationship between the queue and the switch @RabbitListener(bindings =@QueueBinding( //Specifies the queue to get information value = @Queue(value = RabbitConfig.NORMAL_QUEUE ,autoDelete = "false"), // designated switch exchange = @Exchange(value = RabbitConfig.NORMAL_EXCHANGE, // Here is the mode of the determined switch type = ExchangeTypes.DIRECT) )) @Component public class Consumer01 { // @RabbitHandler represents that this method is a message receiving method. This does not have a return value @RabbitHandler public void messagerevice(String message){ // The logic of sending email is omitted here System.out.println("c1-------------->" + message); } }
Consumer c2
// bindings is used to determine the binding relationship between the queue and the switch @RabbitListener(bindings =@QueueBinding( //Specifies the queue to get information value = @Queue(value = RabbitConfig.DEAD_QUEUE ,autoDelete = "false"), // designated switch exchange = @Exchange(value = RabbitConfig.DEAD_EXCHANGE, // Here is the mode of the determined switch type = ExchangeTypes.DIRECT) )) @Component public class Consumer02 { // @RabbitHandler represents that this method is a message receiving method. This does not have a return value @RabbitHandler public void messagerevice(String message){ // The logic of sending email is omitted here System.out.println("c2-------------->" + message); } }
Observation results
The producer sent 10 messages. Because the consumer did not start, 10 messages were not consumed
After 10 seconds, the message expires and is automatically added to the dead letter queue
Start consumer c2, and you can see that the data in the dead letter queue has been consumed by c2
c2--------------> Message 1 c2--------------> Message 2 c2--------------> Message 3 c2--------------> Message 4 c2--------------> Message 5 c2--------------> Message 6 c2--------------> Message 7 c2--------------> Message 8 c2--------------> Message 9 c2--------------> Message 10
Code demonstration of queue reaching maximum length
The producer cancels the ttl of sending the message
@Component public class Producer{ @Autowired private RabbitTemplate rabbitTemplate; // 2: Routing key private String routeKey = "zhangsan"; public void makeOrder() { // Send message to all customers for(int i = 1;i<=10;i++){ String content = "news"+i; System.out.println("Producer sends message:"+i) r abbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE, routeKey, content, null); } } }
Sets the maximum length of a normal queue
//Declaration queue @Bean public Queue normalQueueDeclared() { //Normal queue binding dead letter queue information Map<String, Object> params = new HashMap<>(); //The parameter key of normal queue setting dead letter switch is a fixed value params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //Set the dead letter routing key parameter for the normal queue. The key is a fixed value params.put("x-dead-letter-routing-key", "lisi"); //Sets the maximum length of the queue params.put("x-max-length", 6); return new Queue(NORMAL_QUEUE , true, false, false, params); }
//Set the maximum length of the queue. Once the maximum length is exceeded, all messages will be routed to the dead letter queue
params.put("x-max-length", 6);
Observation results
- Do not start the consumer, otherwise the messages in the queue will be consumed in time, and it is difficult to demonstrate the effect.
- We set a maximum length of 6 for normal queues, so we can see that 4 messages are routed to the dead letter queue.
Consumer reject message code demo
c1 set reject consumption
// bindings is used to determine the binding relationship between the queue and the switch @RabbitListener(bindings =@QueueBinding( //Specifies the queue to get information value = @Queue(value = RabbitConfig.NORMAL_QUEUE ,autoDelete = "false"), // designated switch exchange = @Exchange(value = RabbitConfig.NORMAL_EXCHANGE, // Here is the mode of the determined switch type = ExchangeTypes.DIRECT) )) @Component public class Consumer01 { // @RabbitHandler represents that this method is a message receiving method. This does not have a return value @RabbitHandler public void messagerevice(Channel channel, Message message){ String content = new String(message.getBody()) System.out.println("consumer: " + content ); if("Message 1".equals(content )){ //If the request is set to false, it means to refuse to re queue the queue. If dead letter is configured, the switch will send it to the dead letter queue System.out.println("C1 Message received" + content + "And refuse to sign the message"); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false) } // The logic of sending email is omitted here System.out.println("c1-------------->" + message); } }
Start the producer to send a message
Start c1 consumption message
Because c1 rejected a message, the message was routed to the dead letter queue.
Application scenario of dead letter queue
It is generally used in more important service queues to ensure that messages that have not been correctly consumed are not discarded. Generally, consumption exceptions may mainly occur due to processing exceptions caused by errors in the message information itself, parameter verification exceptions during processing, or query exceptions caused by network fluctuations. By configuring the dead letter queue, Messages that have not been properly processed can be temporarily stored in another queue. After the problem is found out later, write corresponding processing code to process dead letter messages, which is much better than manually recovering data.
summary
In fact, the dead letter queue is nothing special, but an ordinary queue bound to the dead letter switch, and the dead letter switch is just an ordinary switch, but a switch used to deal with dead letters.