rabbitmq private message queue

concept

The dead letter queue, as its name implies, is a message that cannot be consumed. Sometimes some messages in the queue cannot be consumed due to specific reasons. If there is no subsequent processing, such messages will become dead letters and be added to the private letter queue.

Source of dead letter

  • Message TTL expired
  • The queue has reached the maximum length (the queue is full and can no longer add data to mq)
  • The message is rejected (basic.reject or basic.nack) and request = false

Code demonstration ttl expiration effect

Simulate the relationship below
Add dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Set profile

# Service port
server:
  port: 8080
# Configure rabbitmq service
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.136.128
    port: 9999
    listener:
      type: simple
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual

The default request rejected should be changed to false, which means that after the consumer rejects the message, it will not be put back into the normal queue.

Create producer

@Component
public class Producer{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2: Routing key
    private String routeKey = "zhangsan";
    public void makeOrder() {
        // Send message to all customers
		for(int i = 1;i<=10;i++){
			String content = "news"+i;
			System.out.println("Producer sends message:"+i)
r				abbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE, 
		        routeKey, content, message -> {
					//Set the message expiration time of 10s. If the message is not accepted by the consumer within ten seconds, it will be automatically transferred to the dead letter queue
					message.getMessageProperties().setExpiration("10000");
					retrun message;
				});
		    }
		}      
  
}

Create configuration class

@Configuration
public class RabbitConfig {
	//Common switch name
	public static final String NORMAL_EXCHANGE = "normal_exchange";
	//Dead letter switch name
	public static final String DEAD_EXCHANGE = "dead_exchange";
	//Common queue name
	public static final String NORMAL_QUEUE = "normal_queue";
	//Dead letter queue name
	public static final String DEAD_QUEUE = "dead_queue";
	
    //Declaration queue
    @Bean
    public Queue normalQueueDeclared() {        
        //Normal queue binding dead letter queue information
		Map<String, Object> params = new HashMap<>();
		//The parameter key of normal queue setting dead letter switch is a fixed value
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//Set the dead letter routing key parameter for the normal queue. The key is a fixed value
		params.put("x-dead-letter-routing-key", "lisi");
        return new Queue(NORMAL_QUEUE , true, false, false, params);
    }
    //Declare dead letter queue
    @Bean
    public Queue deadlQueueDeclared() {
        return new Queue(DEAD_QUEUE , true ,false ,false ,null);
    }
    
    //Declare common switch
    @Bean
    public DirectExchange myNormalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }
    //Declare dead letter switch
    @Bean
    public DirectExchange myDeadExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }
    
    //Bind queue to switch
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(normalQueueDeclared()).to(myNormalExchange()).with("zhangsan");
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(deadlQueueDeclared()).to(myDeadExchange()).with("lisi");
    }
}

The consumer establishes a separate boot project, only starts the producer, does not start the consumer, and simulates the effect of message expiration, c1

// bindings is used to determine the binding relationship between the queue and the switch
@RabbitListener(bindings =@QueueBinding(
        //Specifies the queue to get information
        value = @Queue(value = RabbitConfig.NORMAL_QUEUE ,autoDelete = "false"),
        // designated switch 
        exchange = @Exchange(value = RabbitConfig.NORMAL_EXCHANGE,
                // Here is the mode of the determined switch
                type = ExchangeTypes.DIRECT)
))
@Component
public class Consumer01 {
    // @RabbitHandler represents that this method is a message receiving method. This does not have a return value
    @RabbitHandler
    public void messagerevice(String message){
        // The logic of sending email is omitted here
        System.out.println("c1-------------->" + message);
    }
}

Consumer c2

// bindings is used to determine the binding relationship between the queue and the switch
@RabbitListener(bindings =@QueueBinding(
        //Specifies the queue to get information
        value = @Queue(value = RabbitConfig.DEAD_QUEUE ,autoDelete = "false"),
        // designated switch 
        exchange = @Exchange(value = RabbitConfig.DEAD_EXCHANGE,
                // Here is the mode of the determined switch
                type = ExchangeTypes.DIRECT)
))
@Component
public class Consumer02 {
    // @RabbitHandler represents that this method is a message receiving method. This does not have a return value
    @RabbitHandler
    public void messagerevice(String message){
        // The logic of sending email is omitted here
        System.out.println("c2-------------->" + message);
    }
}

Observation results

The producer sent 10 messages. Because the consumer did not start, 10 messages were not consumed
After 10 seconds, the message expires and is automatically added to the dead letter queue

Start consumer c2, and you can see that the data in the dead letter queue has been consumed by c2

c2--------------> Message 1
c2--------------> Message 2
c2--------------> Message 3
c2--------------> Message 4
c2--------------> Message 5
c2--------------> Message 6
c2--------------> Message 7
c2--------------> Message 8
c2--------------> Message 9
c2--------------> Message 10

Code demonstration of queue reaching maximum length

The producer cancels the ttl of sending the message

@Component
public class Producer{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2: Routing key
    private String routeKey = "zhangsan";
    public void makeOrder() {
        // Send message to all customers
		for(int i = 1;i<=10;i++){
			String content = "news"+i;
			System.out.println("Producer sends message:"+i)
r				abbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE, 
		        routeKey, content, null);
		    }
		}      
  
}

Sets the maximum length of a normal queue

//Declaration queue
    @Bean
    public Queue normalQueueDeclared() {        
        //Normal queue binding dead letter queue information
		Map<String, Object> params = new HashMap<>();
		//The parameter key of normal queue setting dead letter switch is a fixed value
		params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		//Set the dead letter routing key parameter for the normal queue. The key is a fixed value
		params.put("x-dead-letter-routing-key", "lisi");
		//Sets the maximum length of the queue
		params.put("x-max-length", 6);
        return new Queue(NORMAL_QUEUE , true, false, false, params);
    }

//Set the maximum length of the queue. Once the maximum length is exceeded, all messages will be routed to the dead letter queue
params.put("x-max-length", 6);

Observation results

  • Do not start the consumer, otherwise the messages in the queue will be consumed in time, and it is difficult to demonstrate the effect.
  • We set a maximum length of 6 for normal queues, so we can see that 4 messages are routed to the dead letter queue.

Consumer reject message code demo

c1 set reject consumption

// bindings is used to determine the binding relationship between the queue and the switch
@RabbitListener(bindings =@QueueBinding(
        //Specifies the queue to get information
        value = @Queue(value = RabbitConfig.NORMAL_QUEUE ,autoDelete = "false"),
        // designated switch 
        exchange = @Exchange(value = RabbitConfig.NORMAL_EXCHANGE,
                // Here is the mode of the determined switch
                type = ExchangeTypes.DIRECT)
))
@Component
public class Consumer01 {
    // @RabbitHandler represents that this method is a message receiving method. This does not have a return value
    @RabbitHandler
    public void messagerevice(Channel channel, Message message){
    	String content = new String(message.getBody())
    	System.out.println("consumer: " + content );
    	if("Message 1".equals(content )){
    		//If the request is set to false, it means to refuse to re queue the queue. If dead letter is configured, the switch will send it to the dead letter queue
    		System.out.println("C1 Message received" + content + "And refuse to sign the message");
    		channel.basicReject(message.getMessageProperties().getDeliveryTag(),false)
    		
    	}
        // The logic of sending email is omitted here
        System.out.println("c1-------------->" + message);
    }
}

Start the producer to send a message

Start c1 consumption message

Because c1 rejected a message, the message was routed to the dead letter queue.

Application scenario of dead letter queue

It is generally used in more important service queues to ensure that messages that have not been correctly consumed are not discarded. Generally, consumption exceptions may mainly occur due to processing exceptions caused by errors in the message information itself, parameter verification exceptions during processing, or query exceptions caused by network fluctuations. By configuring the dead letter queue, Messages that have not been properly processed can be temporarily stored in another queue. After the problem is found out later, write corresponding processing code to process dead letter messages, which is much better than manually recovering data.

summary

In fact, the dead letter queue is nothing special, but an ordinary queue bound to the dead letter switch, and the dead letter switch is just an ordinary switch, but a switch used to deal with dead letters.

Keywords: Java RabbitMQ MQ

Added by cherubrock74 on Fri, 17 Dec 2021 07:36:20 +0200