Advanced features of RabbitMQ -- TTL, dead letter queue and delay queue

catalogue

1.TTL mechanism

1.1 implementation scheme

1.2 native API implementation

1.3 SpringBoot implementation

2. Dead letter queue

2.1 native API implementation

2.2 SpringBoot implementation

3. Delay queue

3.1 use of delay queue

1.TTL mechanism

1.1 implementation scheme

In the current e-commerce business, the order is created successfully. Generally, a certain time will be given to wait for payment to start the countdown. If the user does not pay within this period of time, the default order will be cancelled.

How to realize this function?

Regular polling (database, etc.)

If the user orders successfully, the order data will be put into the database, and the payment status will be put into the database, and the user's payment will be easier
Change database status. Regularly poll the payment status of the database, and cancel the order if it exceeds 30 minutes.

Advantages: simple design and implementation.

Disadvantages: it requires a lot of IO operations on the database, which is inefficient.

         Timer

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
    @Override
    public void run() {
        System.out.println("The user has not paid, and the transaction is cancelled:" +
        simpleDateFormat.format(new
        Date(System.currentTimeMillis())));
        timer.cancel();
    }
};
System.out.println("Waiting for user payment:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
// Execute timerTask in 10 seconds
timer.schedule(timerTask, 10 * 1000);

        

Advantage: didn't expect

Disadvantages: there is no persistence mechanism.

Inflexible (only start time and repetition interval can be set, which is not suitable for other businesses).

The thread pool cannot be used. One timer and one thread.

There is no real management plan.

Scheduler and other timers

// Thread factory
ThreadFactory factory = Executors.defaultThreadFactory();
// Use thread pool
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("Start waiting for user payment for 10 seconds:" + format.format(new Date()));
service.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("If the user fails to pay, the transaction is cancelled:" +
        format.format(new Date()));
    }// Wait 10s unit second
}, 10, TimeUnit.SECONDS);

Advantages: it can be executed by multiple threads to avoid mutual influence of tasks to a certain extent, and the exception of a single task does not affect other tasks

Disadvantages: high concurrent delivery. It is not recommended to use scheduled tasks, which wastes server performance

TTL of RabbitMQ

Set TTL through message and queue dimensions

Advantages: the timing function can be realized without consuming too much server performance

Disadvantages: the capacity and stacking capacity of any middleware are limited, and the introduction of message oriented middleware will increase many problems

        

Because the capacity and stacking capacity of message middleware are limited, if some messages can't be consumed all the time, there needs to be something to explain.

At present, there are two methods to set the TTL of messages. If the two methods are used at the same time, the TTL of messages has been subject to the smaller value between the two.

                1. By setting the Queue property, all messages in the Queue have the same expiration time.

                2. The TTL of each message can be set separately.

1.2 native API implementation

try(Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // Create a queue (actually AMQP default is used as a direct switch)
    // Set queue properties
    Map<String, Object> arguments = new HashMap<>();
    // Set TTL of queue
    arguments.put("x-message-ttl", 30000);
    // Set the idle survival time of the queue (if the queue has no consumers at all and has not been used, how long the queue can survive)
    arguments.put("x-expires", 10000);

    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

    channel.basicPublish("", 
                         QUEUE_NAME, 
                         //Set the expiration time of the message itself
                         new AMQP.BasicProperties().builder().expiration("30000").build(),
                         message.getBytes())

} catch (TimeoutException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

In addition, you can set the global TTL through the command line and execute the following commands:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

Default rule:

        1. If x-message-ttl is not set, the message will not expire.

        2. If x-message-ttl is set to 0, it means that the message will be discarded immediately unless it can be delivered directly to the consumer.

1.3 SpringBoot implementation

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queueTTLWaiting() {
        Map<String, Object> props = new HashMap<>();
        // For the messages in the queue, it is set to wait for 10s
        props.put("x-message-ttl", 10000);
        // Set the idle survival time of the queue (if the queue has no consumers at all and has not been used, how long the queue can survive)
        props.put("x-expires", 1000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    }
}
@RestController
public class controller{
    
   @RequestMapping("/pay/msgttl")
   public String sendTTLMessage() throws UnsupportedEncodingException {
       MessageProperties properties = new MessageProperties();
       //Set the expiration time of the message itself
       properties.setExpiration("5000");
       Message message = new Message("Sent WAITINGMESSAGE".getBytes("utf-8"), properties);
       rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
       return "msg-ttl-ok";
    }     
}

2. Dead letter queue

DLX, fully known as dead letter excahnge, dead letter switch.

In each delivery system, the user places an order and invokes the order service, and then the order service invokes the delivery system to notify the delivery personnel to deliver the order. At this time, the order system and the delivery system adopt MQ asynchronous communication.

How to deal with a user who places an order and then cancels the order after the takeout receives the order? How to deal with a user who has not received the order for a long time?

In this scenario, we can consider defining a dead letter switch and binding a dead letter queue. When the message becomes a dead letter, the message will be sent to the dead letter queue, which is convenient for us to check the reason for the failure of the message, so as to j carry out the next step, whether to cancel the order or arrange other takeout workers.

After the message arrives at the queue, it is re sent to a special switch (DLX). At the same time, the queue bound to DLX is called "dead letter queue".

The following situations lead to bad messages:

                1. The message is rejected (Basic.Reject / Basic.Nack) and the request parameter is set to false.

                2. The message has expired.

                3. The queue has reached its maximum length.

At the same time, when dealing with exceptions, if the message cannot be consumed normally by consumers, we can put it into the dead letter queue to analyze the exceptions in the program later, so as to improve and optimize the system.

2.1 native API implementation

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    // Define a dead letter exchange (also an ordinary exchange)
    channel.exchangeDeclare("exchange.dlx", "direct", true);

    // Define a normal service switch
    channel.exchangeDeclare("exchange.biz", "fanout", true);

    Map<String, Object> arguments = new HashMap<>();
    // Set queue TTL
    arguments.put("x-message-ttl", 10000);
    // Set the dead letter exchange associated with the queue (if the queue message is still not consumed after the TTL expires, it will be added to the dead letter queue)
    arguments.put("x-dead-letter-exchange", "exchange.dlx");
    // Set the routingKey of the dead letter exchange associated with the queue. If there is no special specification, use the routingKey of the original queue
    arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");

    //Define switch queue and binding relationship
    channel.queueDeclare("queue.biz", true, false, false, arguments);
    channel.queueBind("queue.biz", "exchange.biz", "");
    channel.queueDeclare("queue.dlx", true, false, false, null);
    // Dead letter queue and dead letter exchanger
    channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");

    channel.basicPublish("exchange.biz", 
                         "",
                         MessageProperties.PERSISTENT_TEXT_PLAIN,
                         "dlx.test".getBytes());
} catch (Exception e) {
    e.printStackTrace();
}

2.2 SpringBoot implementation

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        Map<String, Object> props = new HashMap<>();
        // The lifetime of the message in the queue is 10s
        props.put("x-message-ttl", 10000);
        // Set the dead letter exchange associated with the queue (if the queue message is still not consumed after the TTL expires, it will be added to the dead letter queue)
        props.put("x-dead-letter-exchange", "ex.go.dlx");
        // Set the routingKey of the dead letter exchange associated with the queue. If there is no special specification, use the routingKey of the original queue
        props.put("x-dead-letter-routing-key", "go.dlx");
        Queue queue = new Queue("q.go", true, false, false, props);
        return queue;
    }
}

Generally speaking, the dead letter queue will be used in conjunction with the TTL of RabbitMQ. After the message times out, it will be automatically routed to the dead letter queue.  

3. Delay queue

In certain business scenarios, the messages we send to the message queue do not necessarily want to be consumed immediately, for example:

Buy a train ticket in 12306 and select a seat in the time period when the order has not been paid. The system will lock the seat. If the payment has not been made after the time, the system will automatically release the seat. How to achieve similar functions?

        1. You can use the scheduled task to scan once every minute. If it takes more than 15 minutes, it will be released, but this is a waste of system resources.

        2. The passive expiration time of distributed lock and distributed cache can be used. After 15 minutes, the lock will be released automatically, but this will occupy the resources of the system for a long time.

        3. It can be realized through TTL and dead letter queue. However, TTL scanning is to scan back from the head of the queue. The first message added does not timeout, and subsequent messages will not be scanned. If the expiration time of messages in the queue is not fixed, subsequent messages will expire but still not be processed.

Therefore, the delay queue can be used. After the seat is locked successfully, a delay message will be sent to the delay switch. The delay switch polls all messages. The message will be consumed after the specified time. The consumption process is to check whether the seat is in the "paid" state;

RabbitMQ itself does not provide the function of delay queue. You can use RabbitMQ_ delayed_ message_ The biggest difference between TTL and exchange plug-in is that TTL stores messages in the dead letter queue, while TTL stores messages in the delay switch.

        

1. The producer sends the message (msg) and routing key to the specified delay exchange
2. The delay exchange stores messages, continuously scans all messages, waits for the message to expire, finds and binds its own queue according to the routing key, and sends the message to it
3. The queue sends the message to the customer listening to it

3.1 use of delay queue

Download plug-ins

Download address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

Install plug-ins

Copy the plug-in to the installation path of rabbitmq server

# Enable plug-ins
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#Restart rabbitmq server
systemctl restart rabbitmq-server
@Configuration
public class RabbitConfig {
    @Bean
    public Exchange exchange() {
        Map<String, Object> props = new HashMap<>();
        props.put("x-delayed-type", ExchangeTypes.FANOUT);
        Exchange exchange = new CustomExchange("ex.delayed", 
                                               "xdelayed-message", 
                                               true,
                                               false, 
                                               props);
        return exchange;
}
public class PublishController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/prepare/{seconds}")
    public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException {
    // RabbitMQ only checks whether the message at the head of the queue has expired. If it has expired, it will be placed in the dead letter queue
    // If the first message expires for a long time, 10s, and the second message is 3s, the system will first look at the first message, wait until the first message expires, and put it into DLX
    // The second message will only be checked at this time, but in fact, the second message has already expired, but it is not put into DLX before the first message.
    // Plug in rabbitmq_delayed_message_exchange helped us with this.
    MessageProperties properties = new MessageProperties();
    properties.setHeader("x-delay", (seconds - 10) * 1000);
    Message message = new Message((seconds + "Hold a sales department meeting in seconds.").getBytes("utf-8"),properties);
    rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",message);
    return "The alarm clock has been set. Let us know in advance";
}
}

Keywords: Java RabbitMQ Distribution

Added by pdmiller on Fri, 04 Feb 2022 03:51:20 +0200