RabbitMQ Dead Letter Queue

1. The concept of dead letter queues

_First, clarify this definition from the conceptual explanation. Dead faith, as its name implies, is a message that cannot be consumed. Literally, it means that producer delivers the message to broker or directly to queue. Consumerer takes the message out of queue for consumption, but sometimes some messages in queue cannot be consumed for specific reasons. If such messages are not processed subsequently, they become dead letters, which naturally leads to a dead letter queue.
_Application scenario: In order to ensure that the message data of the order business is not lost, the dead letter queue mechanism of RabbitMQ is needed, and when the message consumption is abnormal, the message is put into the dead letter queue. For example, if a user successfully places an order in a store and clicks to pay, it will automatically fail if the payment is not made at a specified time

2. Source of Dead Letters

  • Message TTL expires
  • Queue reached maximum length (queue full, no more data can be added to mq)
  • The message was rejected (basic.reject or basic.nack) and requeue=false.

3. Dead Faith and Actual War

3.1 Code Architecture Diagram


One producer and two consumers are sent to the dead-letter queue when the message meets the dead-letter criteria.
zhangsan is normal_ routingKey for exchange and normal-queue normal queue bindings
lisi is dead_ routingKey bound by exhange dead letter switch and dead-queue dead letter queue

3.2 Analog message TTL expires

Producer Code

public class Producer {
    //Name of normal switch
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Dead Letter Message Set TTL Tim to live in MS 10000ms=10s
        AMQP.BasicProperties properties=
                new AMQP.BasicProperties()
                .builder().expiration("10000")
                .build();

        for (int i = 1; i <11; i++) {
            String message = "info" + i;    //info1.....info10
           
         channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }

Consumer 01:

public class Consumer01 {
    //Name of normal switch
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //Name of Dead Letter Switch
    public static final String DEAD_EXCHANGE="dead_exchange";
    //Name of normal queue
    public static final String NORMAL_QUEUE="normal_queue";
    //Name of Dead Letter Queue
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //Acquisition channel
        Channel channel = RabbitMqUtils.getChannel();
        //Declare dead letter and normal switch types as direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //Declare normal queue
        Map<String,Object> arguments=new HashMap<>();
        //Expiration time 10s=10000ms
//        arguments.put("x-message-ttl",10000);
        //Normal Queue Set Dead Letter Switch
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //Set Dead Letter RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //Set a limit on the length of the normal queue
//        arguments.put("x-max-length",6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        
        //Declare Dead Letter Queue
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //Bind regular switches to regular queues
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //Dead-letter bound switches and queues
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("Waiting to receive messages......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
           String msg= new String(message.getBody(),"UTF-8");
               System.out.println("Consumer01 The message received is:"+msg);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //Turn on manual response
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}

Consumer 02: This consumer code is the simplest and only responsible for receiving messages from the dead letter queue.

public class Consumer02 {
    //Name of Dead Letter Queue
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //Acquisition channel
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("Waiting to receive messages......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            System.out.println("Consumer02 The message received is:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
        
    }
}

Start consumer C1 first

_First establish a binding relationship between the switch and the queue (these codes are written in Consumer c1), then turn off Consumer C1, when the producer starts sending messages, the messages will all go into the Dead Letter queue because the Consumer C1 thread is closed.
_We started the producer thread to send 10 messages at this time. Since consumer C1 has been turned off, we specified in the configuration that if the message is rejected it will go to dead-queue, so after we started the producer, the messages will all go to dead-letter queue, which we can view through the rabbitmq console

From the picture above, you can see that normal_ All 10 messages in queue queue have entered dead_letter queue Queue

_As the schema diagram shows, the messages in the dead letter queue will be consumed by the consumer C2, so now we start the consumer C2 thread, after which, reasonably, 10 messages in the dead letter queue will be consumed by the consumer C2.
Code for C2:

/**
 * Dead Letter Queue Actual Warfare
 *
 * Consumer 2
 */
public class Consumer02 {
    //Name of Dead Letter Queue
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //Acquisition channel
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("Waiting to receive messages......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            System.out.println("Consumer02 The message received is:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});

    }
}

As you can see from the image below, the messages in the dead letter queue were consumed successfully by C2 and the test was successful.

3.3 Maximum length of simulated queue

_Consumer C2 code unchanged, we now set normal_normal normal queue Queue can only receive up to six messages, and producers can send 10 messages, then the last four messages will be sent to the Dead Letter Queue (don't let consumers spend C1 here, otherwise they will consume too fast and have no effect).
Producer Code Modification: (comment out TTL)

public class Producer {
    //Name of normal switch
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Dead Letter Message Set TTL Tim to live in MS 10000ms=10s
//        AMQP.BasicProperties properties=
//                new AMQP.BasicProperties()
//                .builder().expiration("10000")
//                .build();

        for (int i = 1; i <11; i++) {
            String message = "info" + i;    //info1.....info10
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
//            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
        
    }
}

C1 Code Modification
In normal_ Add the following configuration to the queue queue declaration

Then start the C1 thread and close it after starting

Producer sends 10 messages to start producer thread

_From the number of message bars in the dead letter queue and the normal queue in the figure above, we can see that it meets the experimental expectation. Since we closed the consumer C1 thread, the message entered normal_queue won't be consumed, but since we set it to accept up to six messages, the remaining four messages go into the Dead Letter queue.

3.4 Analog message rejected

Consumer C1: (turn off after boot, simulate it can't receive message)

public class Consumer01 {
    //Name of normal switch
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //Name of Dead Letter Switch
    public static final String DEAD_EXCHANGE="dead_exchange";
    //Name of normal queue
    public static final String NORMAL_QUEUE="normal_queue";
    //Name of Dead Letter Queue
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //Acquisition channel
        Channel channel = RabbitMqUtils.getChannel();
        //Declare dead letter and normal switch types as direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //Declare normal queue
        Map<String,Object> arguments=new HashMap<>();
        //Expiration time 10s=10000ms
//        arguments.put("x-message-ttl",10000);
        //Normal Queue Set Dead Letter Switch
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //Set Dead Letter RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //Set a limit on the length of the normal queue
//        arguments.put("x-max-length",6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        
        //Declare Dead Letter Queue
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //Bind regular switches to regular queues
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //Dead-letter bound switches and queues
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("Waiting to receive messages......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
           String msg= new String(message.getBody(),"UTF-8");
           if(msg.equals("info5")){ //Reject info5 to make it dead letter
               System.out.println("Consumer01 The message received is:"+msg+":This message was sent by C1 Rejected");
               //false does not jam back into the normal queue to make it dead letter
               channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
           }else{
               System.out.println("Consumer01 The message received is:"+msg);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
           }
        };
        //Turn on manual response
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});

    }
}

Start the producer (code as above)
Message queue at this time

_We start consumer C1 then start consumer C2 at this time. We set the message that consumer C1 will reject info5 and let it enter the dead letter queue. After we start consumer C2, this rejected message will be consumed by C2.
C1:

C2:

Test Successful

Keywords: Java RabbitMQ Distribution message queue

Added by D_tunisia on Mon, 27 Dec 2021 16:55:00 +0200