Dead letter queue and delay queue of RabbitMQ

RabbitMQ dead letter queue and delay queue

RabbitMQ itself has the properties of dead letter queue and dead letter switch. Delay queue can be realized through dead letter queue and dead letter switch. In the e-commerce industry, there is usually a demand: if the order is not paid in time out, the order will be cancelled automatically. Then the delay queue implemented through RabbitMQ is a way to realize this requirement.

1. Dead letter queue

As the name suggests, Dead Letter is Dead Letter in English. Dead Letter switch (Dead Letter exchange) is no different from an ordinary switch. It can accept information and forward it to the queue bound to it and can be routed. The difference is that the Dead Letter switch forwards dead letters, and the queue bound to the Dead Letter switch is the Dead Letter queue. To put it more conventionally, the Dead Letter switch and the Dead Letter queue are actually just ordinary switches and queues, but they don't The information received and forwarded is Dead Letter, and other operations are no different.

1.1 conditions of dead letter

Information called dead letter requires the following conditions:

  • The message is rejected by the consumer (via basic.reject or back.nack) and request = false is set.
  • The message expired because the queue has a TTL (Time To Live) time set.
  • The message was discarded because the queue length limit was exceeded.

At this time, there are basically two methods for the above conditions: 1) setting the policy parameter on the rabbitmqctl command line; 2) hard coding, that is, setting it in the code.

1.2 consumer rejection

1.2. 1 coding method

Hard coding refers to the corresponding parameters when writing the business queue declaration in the code:

  • x-dead-letter-exchange: dead letter exchange, required
  • x-dead-letter-routing-key: the routing key that the dead letter switch forwards to the dead letter queue, optional

Producer:

// producer
public class RejectProducer {
	// Define service switch
    public static final String ORDER_X = "order.exchange";
	// main method 
    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Send messages, and the routing keys are: d.order 123,d.other.123,d
        // 1)d.order.123: the service consumer will receive the message, and the order dead letter queue will also receive the message
        // 2)d.other.123: business consumer s will receive messages, and other dead letter queues will also receive messages
        // 3) d: only business consumer s will receive messages
        channel.basicPublish(ORDER_X, "d.order.123", null, "hello my friend".getBytes(StandardCharsets.UTF_8));
        // close resource
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

/**
 * Dead letter queue condition: rejected by consumer
 * 1)basic.reject(tag, requeue) Indicates that the message is rejected. The second parameter indicates whether to rejoin the queue. If it is true, it may cause an endless loop, which needs attention
 * 2)basic.nack(tag, multi, requeue) multi Multiple requests can be rejected at the same time, and the request and reject are the same. nack refers to the data that is not ack ed and has a separate identification
 */
public class RejectConsumer {

    public static final String DEAD_LETTER_X = "dead.letter.exchange";
    public static final String DEAD_LETTER_Q_1 = "dead.letter.queue.order";
    public static final String DEAD_LETTER_Q_2 = "dead.letter.queue.other";
    public static final String ORDER_X = "order.exchange";
    public static final String ORDER_Q = "order.queue";

    public static void main(String[] args) throws IOException {
        // reject and nack methods
        rejectAndNack();
    }

    public static void rejectAndNack() throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declare dead letter queue and dead letter switch
        declareOrderDLX(channel);
        declareOtherDLX(channel);
        
        // Declarative service switch
        channel.exchangeDeclare(ORDER_X, "topic", false, true, null);
        // Set business parameters
        Map<String, Object> arguments = new HashMap<>();
        // Set the dead letter switch. One dead letter switch is bound to two dead letter queues to distinguish the forwarding of messages according to the routing key
        arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
        // Set dead letter queue routing key:
        // If the routing key is set here, when the message from the publish is forwarded to the dead letter switch, it will be forwarded to the dead letter queue with the routing key,
        // If this parameter is not set, it will be forwarded to the dead letter queue according to the routing key when publish ing
//        arguments.put("x-dead-letter-routing-key", "d.other");
        // Declare business queue
        channel.queueDeclare(ORDER_Q, false, false, true, arguments);
        // Bind service queue and service switch
        channel.queueBind(ORDER_Q, ORDER_X, "#");
        // Listening to service queue messages
        channel.basicConsume(ORDER_Q, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("order consumer: " + new String(body, StandardCharsets.UTF_8));
                // refuse
                channel.basicReject(envelope.getDeliveryTag(), false);
                System.out.println("order properties: " + envelope.toString());
            }
        });
    }

    // Declare order dead letter queue
    public static void declareOrderDLX(Channel channel) throws IOException {
        // The switch is declared only for simulation, and the setting of duriable=false and autodelete=true
        channel.exchangeDeclare(DEAD_LETTER_X, "topic", false, true, null);
        // Declaration queue
        channel.queueDeclare(DEAD_LETTER_Q_1, false, false, true, null);
        // Bind switches and queues, only * order.#  Rule message
        channel.queueBind(DEAD_LETTER_Q_1, DEAD_LETTER_X, "*.order.#");
        // Listen for messages
        channel.basicConsume(DEAD_LETTER_Q_1, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("dead letter consumer [order]: " + new String(body, StandardCharsets.UTF_8));
                System.out.println("dead letter properties [order]:" + envelope.toString());
            }
        });
    }

    // Declare other dead letter queue
    public static void declareOtherDLX(Channel channel) throws IOException {
        // Claim switch
        channel.exchangeDeclare(DEAD_LETTER_X, "topic", false, true, null);
        // Declaration queue
        channel.queueDeclare(DEAD_LETTER_Q_2, false, false, true, null);
        // Bind switches and queues, only * other.#  Rule message
        channel.queueBind(DEAD_LETTER_Q_2, DEAD_LETTER_X, "*.other.#");
        // Listen for messages
        channel.basicConsume(DEAD_LETTER_Q_2, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("dead letter consumer [other]: " + new String(body, StandardCharsets.UTF_8));
                System.out.println("dead letter properties [other]:" + envelope.toString());
            }
        });
    }
}

Summary:

1) The consumer side uses basic Reject or basic NACKs will forward messages to the matching dead letter queue (request = false). The difference is that basic.reject has one less parameter mutil than basic.nack, indicating whether to batch back. Moreover, the number of NACKs can be seen on the web.

2) Use x-letter-dead-exchange to set the dead letter switch, which must be set. x-letter-dead-routing-key sets the routing key that the dead letter switch forwards to the dead letter queue, which is equivalent to redefining the routing key of publish. This parameter is optional. You can judge whether it needs to be set according to specific services.

1.1. 2 strategy mode

For policy mode, execute the following commands on rabbitmq server:

rabbitmqctl set_policy {Policy name} ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

For example:

rabbitmqctl set_policy dlx "dead.*" '{"dead-letter-exchange":"test-dead-letter-exchange"}' --apply-to queues

Denotes all with dead Set dead letter switch test dead letter exchange at the beginning of the queue, and the policy name is dlx. Then we create a new exchange named test dead letter exchange in the web interface of rabbitmq, and create a new exchange named dead order. Queue and dead other. The queue of queue is bound with test dead letter exchange, and the routing keys are: order# And other. #.

Note: since we want to simulate the situation of dead letter forwarding to the dead letter queue, the ttl of these two new queues is set to 10000ms, that is, 10s.

We saw that the message successfully arrived at dead after 10s order. Queue, indicating that our configuration takes effect. Here I will draw a diagram of this process:

1.3 setting expiration time

file: https://www.rabbitmq.com/ttl.html

We can set the expiration time for queues or messages. The expiration time of the queue, similar to the autoDelete parameter, indicates that the queue will be deleted if it is not used within the specified time, and the queue has no users, The queue has not been redeclared recently (renew the lease), and basic.get is not called at least during the expiration period. For example, this can be used for RPC style reply queues, which can create many queues that may never be exhausted. We can set the expiration time of messages in the message body when sending messages, or set a message expiration time for this queue. In fact, there are two ways , one is set on the queue and the other is set on the message.

1.3. 1 coding method

1) Set message body expiration time

// Set message properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("2000") // Set the message expiration time, 2s, since the queue has no expiration time
    .build();
// Send a message
channel.basicPublish("hello", "order.123", properties, "hello my friend".getBytes(StandardCharsets.UTF_8));

2) Set the message expiration time of the queue

// Set parameters
Map<String, Object> arguments = new HashMap<>();
// Set the expiration time of the queue, 5s
arguments.put("x-message-ttl", 5000);
// Set dead letter queue
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// Declaration queue
channel.queueDeclare(ORDER_Q, false, false, true, arguments);

We can see that after the queue is created, there will be a TTL identifier. x-message-ttl identifies that the message expiration time of the queue is 5s.

3) Set the expiration time of the queue

// Set queue parameters
Map<String, Object> arguments = new HashMap<>();
// Set the expiration time of messages in the queue, 5s
arguments.put("x-message-ttl", 5000);
// Set the expiration time of the queue. 10s if the queue is not used (not operated), delete the queue
arguments.put("x-expires", 10000);
// Declaration queue
channel.queueDeclare(ORDER_Q, true, false, false, arguments);

Note: no matter whether there are messages in the queue or not, if there is no operation queue, it will be deleted automatically.

1.3. 2 strategy mode

1) Set the message expiration time of the queue

rabbitmqctl set_policy --vhost /adu TTL ".*" '{"message-ttl":60000}' --apply-to queues

Means to add a TTL policy under / adu virtual host and set the expiration time of all queue message TTL messages to 60s.

2) Set the expiration time of the queue

rabbitmqctl set_policy --vhost /adu expiry ".*" '{"expires":1800000}' --apply-to queues

Indicates that a policy named expiration is added under the / adu virtual host, and the expiration time of all queues is set to 180s.

1.4 queue length exceeded

By default, The queue has no length limit (however, there are always hard disk and memory restrictions). We can set the length of the queue, the number of messages, the memory occupied by the total message content of the queue, or both. The maximum length of a queue can be set by policy or coding, or by the web interface when creating the queue. If If both the policy method and the encoding method are set, the lower value will take effect.

If the queue has a queue length limit set, when the message in the queue reaches the maximum length, The default overflow rule is to discard the oldest message (queue header). We can change this rule and use the overflow parameter to configure it. The optional values of overflow are x-reject-publish or x-reject-publish-dls. Both of them indicate that new messages are rejected. The difference is that reject publish Dlx will also lead to dead letter rejection message 1.

Here is a question: the difference between reject publish Dlx and reject publish. For the translation of the official website, I think reject publish Dlx is bound to the dead letter queue. Instead of receiving messages, reject publish will receive messages. But the experiment was just the opposite of what I understood? The familiar irons replied ha.

1.4. 1 coding method

Use the x-max-length and x-max-length-bytes parameter settings.

// Set parameters
Map<String, Object> arguments = new HashMap<>();
// Set the maximum queue length, 5 messages
arguments.put("x-max-length", 5);
// Strategies for queue overflow: drop head (default), reject publish, reject publish Dlx
//        arguments.put("x-overflow", "reject-publish-dlx");
arguments.put("x-overflow", "reject-publish");
// Set dead letter queue
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// Declaration queue
channel.queueDeclare(ORDER_Q, false, false, true, arguments);

1.4. 2 strategy mode

rabbitmqctl set_policy --vhost /adu limit "^five_msg" '{"dead-letter-exchange":"test-dead-letter-exchange","max-length":5,"overflow":"reject-publish-dlx"}' --apply-to queues

It means adding a policy named limit under / adu virtual host and setting all to five_ The maximum number of queue messages starting with MSG is 5, the message overflow policy is reject, and the dead letter switch is set.

Set max length bytes in the same way.

2. Delay queue

Delay queue, as its name suggests, is the queue for storing delay messages, that is, consumers will receive messages after a certain delay. A typical application scenario is that the order is automatically cancelled if it is not paid after timeout as described above.

2.1 with the help of dead letter queue

In fact, after introducing the dead letter queue, you can roughly see how to use the dead letter queue to realize the delay queue. Just use the TTL attribute of the message to forward expired messages to the dead letter queue, and the business can listen to the messages in the dead letter queue. This situation is suitable for setting the message expiration time for the queue, that is, all messages in the queue have the same expiration time, and they will be forwarded to the dead letter queue in order.

If the expiration time of the message is set on the message body when sending the message, there may be a problem. For example, msg1 and msg2 messages are sent sequentially. The expiration time of msg1 is 5s and that of msg2 is 2s. Under normal understanding, the result must be that msg2 first arrives at the dead letter queue and is consumed, but the result is that both messages are forwarded to the dead letter queue and consumed at 5s. In fact, it's easy to understand, because the characteristic of the queue is first in first out. Even if msg2 reaches the expiration time first, msg1 is blocked before it. Only msg1 is consumed can msg2 be consumed at the head of the queue. Let's draw a picture:

2.2 implementation with RabbitMQ plug-in

Rabbitmq provides a plug-in rabbitmq_delayed_message_exchange enables us to achieve the effect of delay queue and solve the message blocking problem of delay queue through dead letter queue. The plug-in is from rabbitmq 3.6 12. To start support, confirm whether your current rabbitmq version supports the plug-in.

2.2. 1 download plug-ins

Download address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

After downloading the plug-in, you will rabbitmq_delayed_message_exchange-3.9.0.ez package is placed in the plugins directory of RabbitMQ installation directory:

2.2. 2 enable plug-ins

Execute the console command to restart rabbitmq service:

# 1. List all plug-ins
rabbitmq-plugins list
# 2. Enable rabbitmq_delayed_message_exchange
rabbitmq-plubins enable rabbitmq_delayed_message_exchange
# 3. Restart the service (it seems that there is no need to restart)
systemctl restart rabbitmq-server.service

After that, the exchanges of the web interface can create a switch of type x-delayed-message or declare it in the code. If you use its delay function, you need to add a header: x-delay=xxx when sending messages, indicating a delay of xxx milliseconds.

// Declare the delay switch, type=x-delayed-message, x-delayed-type=direct|fanout|topic
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic"); // Equivalent to the previous exchange type
channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", false, true, args);

// Send message, x-delay, value is expiration time
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000); // 5s
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
    .headers(headers)
    .build();
// send message
channel.basicPublish(EXCHANGE_NAME, "other.save", props, "i am 5s message".getBytes(StandardCharsets.UTF_8));

3. Summary

Above, we mentioned the scheme of using RabbitMQ to realize the delay queue function: 1) listen to the dead letter queue with the help of the capable dead letter queue; 2) With the help of plug-ins. The advantages and disadvantages are as follows:

  • The implementation of dead letter queue requires setting the message expiration time on the queue, which is not flexible; Another dead letter queue needs to be used to occupy space; rabbitmq has its own dead letter queue, which is easy to implement.
  • The plug-in implementation method requires downloading and installing plug-ins, and version compatibility should be considered; The code logic is simple and easy to use.

Back to our initial demand: the order payment timeout is automatically cancelled. This function mainly requires a delay queue. It is only one way to implement the delay queue through rabbitmq. It can also be implemented in other ways, such as DelayQueue in Java, Quartz scheduled task, zset in Redis, time wheel, etc. the specific scheme should be selected in combination with the advantages and disadvantages of the project and the specific method. For example, if rabbitmq is used in the project, it is a better way to use rabbitmq to implement delay queue. The specific selection of plug-in mode or dead letter queue mode depends on the flexibility of this function in the project.

reference resources:

  1. https://www.rabbitmq.com/dlx.html
  2. https://www.cnblogs.com/williamwsj/p/8108970.html
  3. https://www.jianshu.com/p/256d2eaf1786
  4. https://www.rabbitmq.com/community-plugins.html
  5. https://blog.csdn.net/zhenghongcs/article/details/106700446

For more articles: A programmer who asked not to be named

Finally: due to the lack of talent and shallow learning, if you have any questions, please point them out. Thank you ~

Keywords: Java RabbitMQ Distribution Microservices

Added by Aurasia on Sun, 26 Dec 2021 19:41:36 +0200