[RabbitMq04] delay queue

1, Delay queue understanding

1. Delay queue concept

The delay queue is orderly. The most important feature is reflected in its delay attribute. The elements in the delay queue want to be taken out and processed after or before the specified time. In short, the delay queue is a queue used to store the elements that need to be processed at the specified time

Delay queue is a kind of TTL message expiration in dead letter queue

give an example

As mentioned in the dead letter queue article, after consumer 1 starts, it closes after generating the normal queue and dead letter queue on the rabbitmq console, and then turns on the producer to send messages to consumer 1. Because consumer 1 has a TTL message expiration mechanism, it automatically enters the dead letter queue after waiting for 10 seconds, and the messages are received by consumer 2, This is a 10 second delay for producer and consumer 2.

2. Delay queue usage scenario

1. If the order is not paid within ten minutes, it will be automatically cancelled
2. If the newly created store has not uploaded goods within ten days, it will automatically send a message reminder.
3. After successful registration, if the user does not log in within three days, a short message reminder will be sent.
4. The user initiates a refund, and if it is not handled within three days, notify the relevant operators.
5. After the scheduled meeting, all participants shall be notified to attend the meeting ten minutes before the scheduled time point

These scenarios have a feature that a task needs to be completed at a specified time point after or before an event occurs. For example, when an order generation event occurs, check the payment status of the order ten minutes later, and then close the unpaid order; It seems that using a scheduled task, polling the data all the time, checking once a second, taking out the data to be processed, and then processing is finished? If the amount of data is small, this can be done. For example, for the demand of "automatic settlement if the bill is not paid within one week", if the time is not strictly limited, but a week in a loose sense, running a regular task every night to check all unpaid bills is indeed a feasible scheme. However, for scenarios with large amount of data and strong timeliness, such as "order 10"
Close if not paid within minutes ", there may be a lot of unpaid order data in the short term, even reaching the level of millions or even tens of millions during the event. It is obviously undesirable to still use polling for such a huge amount of data. It is likely that the inspection of all orders can not be completed in one second. At the same time, it will put great pressure on the database, fail to meet business requirements and have low performance.

3. TTL in RabbitMQ

What is TTL? TTL is the attribute of a message or queue in RabbitMQ, indicating the maximum lifetime of a message or all messages in the queue, in milliseconds. In other words, if a message has the TTL attribute set or enters the queue with the TTL attribute set, the message will become a "dead letter" if it is not consumed within the time set by the TTL. If both the TTL of the queue and the TTL of the message are configured, the smaller value will be used. There are two ways to set the TTL.

Message setting TTL

Set TTL for each message

AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000")
                .build();

Queue settings TTL

When creating a queue, set the "x-message-ttl" attribute of the queue

The difference between the two

If the TTL attribute of the queue is set, once the message expires, it will be discarded by the queue (if the dead letter queue is configured, it will be thrown into the dead letter queue). In the second way, even if the message expires, it may not be discarded immediately, because whether the message expires is determined before it is delivered to the consumer. If the current queue has a serious message backlog, Then expired messages may survive for a long time; In addition, it should be noted that if TTL is not set, the message will never expire. If TTL is set to 0, the message will be discarded unless it can be delivered directly to the consumer at this time.

In the previous section, we introduced the dead letter queue and just introduced TTL. So far, the two elements of using RabbitMQ to realize the delay queue have been collected. Next, we just need to integrate them and add a little seasoning to make the delay queue fresh. Think about it. The delay queue is just how long you want messages to be processed. TTL can just make messages become dead letters after how long they are delayed. On the other hand, messages that become dead letters will be delivered to the dead letter queue. In this way, consumers only need to consume the messages in the dead letter queue all the time, because the messages inside are messages that they want to be processed immediately

2, Delayed queue practice

1. Integrating springboot

Import dependency

		<!--RabbitMQ rely on-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--Can Java Object to JSON Format, of course, it can also JSON Convert string to Java object-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ Test dependency-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

Modify profile

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Add Swagger configuration class

package com.lian.rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@EnableSwagger2
@Configuration
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq Interface documentation")
                .description("This document describes rabbitmq Microservice interface definition")
                .version("1.0")
                .contact(new Contact("muzhen", "http://lian.com",
                        "111111@qq.com"))
                .build();
    }
}

2. Queue TTL

a. Code architecture diagram

Create two normal queues QA and QB, and set their TTL S to 10S and 40S respectively. Then create a normal switch X and a dead letter switch Y, both of which are direct. Create a dead letter queue QD. Their binding relationship is as follows:

Dead letter switch Y uses the routingKey with the routing key of YD to bind with the normal queues AQ and QB

2 switches: X normal switch, Y dead letter switch
Three queues: QA normal queue, QB normal queue and QD dead letter queue
Binding relationship: X normal switch is bound to QA\QB, Y dead letter switch is bound to QD

b. Configuration file class code

package com.lian.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {

    //Name of common switch
    public final static String X_EXCHANGE = "X";
    //Name of dead letter switch
    public final static String Y_DEAD_LETTER_EXCHANGE = "Y";
    //Name of the normal queue
    public final static String QUEUE_A = "QA";
    public final static String QUEUE_B = "QB";
    //Name of the dead letter queue
    public final static String DEAD_LETTER_QUEUE = "QD";

    //Declare switch xExchange alias
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //Declare switch yExchange alias
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //Declare that the TTL of the normal queue is 10s
    @Bean("queueA")
    public Queue queueA(){
        //The normal queue binds the dead letter switch information, because the normal queue message will become dead letter
        Map<String, Object> arguments = new HashMap<>();
        //The parameter key of normal queue setting dead letter switch is a fixed value
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //Set the expiration time of 10s in ms, which can be set at the normal queue of consumers or at the sending place of producers
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    //Declare that the TTL of the normal queue is 40s
    @Bean("queueB")
    public Queue queueB(){
        //The normal queue binds the dead letter switch information, because the normal queue message will become dead letter
        Map<String, Object> arguments = new HashMap<>();
        //The parameter key of normal queue setting dead letter switch is a fixed value
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //Set the expiration time of 40s in ms, which can be set at the normal queue of consumers or at the sending place of producers
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //Declare dead letter queue
    @Bean("queueD")
    public Queue queueD(){
        //No parameters
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //After the five components are completed, they begin to bind to each other
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //Bind X normal switch to QA queue
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //Bind the X normal switch to the QB queue
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueABindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        //Bind Y dead letter switch to QD dead letter queue
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

c. Producer

package com.lian.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * Send delay message
 */
@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("Current time:{},Send a message to two TTL queue:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X","XA","Message from ttl For 10 s Queue of"+message);
        rabbitTemplate.convertAndSend("X","XB","Message from ttl For 40 s Queue of"+message);
    }
}

d. Consumer

package com.lian.rabbitmq.config;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void received(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("Current time:{},Received dead letter queue information{}", new Date().toString(), msg);
    }

}

test

Initiate a request http://localhost:8080/ttl/sendMsg/ Hee hee

Return results

2021-07-29 19:20:09.641  INFO 15732 --- [nio-8080-exec-1] c.l.rabbitmq.config.SendMsgController    : Current time: Thu Jul 29 19:20:09 CST 2021,Send a message to two TTL queue:Hee hee
2021-07-29 19:20:19.893  INFO 15732 --- [ntContainer#0-1] c.l.r.config.DeadLetterQueueConsumer: current time: Thu Jul 29 19:20:19 CST 2021, the dead letter queue information message is received from the queue with ttl of 10s, hee hee
2021-07-29 19:20:49.756  INFO 15732 --- [ntContainer#0-1] c.l.r.config.DeadLetterQueueConsumer: current time: Thu Jul 29 19:20:49 CST 2021, the dead letter queue information message is received from the queue with ttl of 40s, hee hee

The first message becomes a dead letter message after 10S and is consumed by the consumer. The second message becomes a dead letter message after 40S and is consumed. Such a delay queue is completed. However, if it is used in this way, it is necessary to add a queue every time a new time demand is added. There are only two time options: 10S and 40S. If it needs to be processed after one hour, it is necessary to add a queue with TTL of one hour. If it is to book a meeting room and notify it in advance, Isn't it necessary to add countless queues to meet the demand?

3. Delay queue optimization

A new queue QC is added here. The binding relationship is as follows. The queue does not set TTL time

a. Consumer

package com.lian.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig2 {

    //Name of common switch
    public final static String X_EXCHANGE = "X";
    //Name of dead letter switch
    public final static String Y_DEAD_LETTER_EXCHANGE = "Y";
    //Name of the normal queue
    public final static String QUEUE_A = "QA";
    public final static String QUEUE_B = "QB";
    public final static String QUEUE_C = "QC";
    //Name of the dead letter queue
    public final static String DEAD_LETTER_QUEUE = "QD";

    //Declare switch xExchange alias
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //Declare switch yExchange alias
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //Declare that the TTL of the normal queue is 10s
    @Bean("queueA")
    public Queue queueA(){
        //The normal queue binds the dead letter switch information, because the normal queue message will become dead letter
        Map<String, Object> arguments = new HashMap<>();
        //The parameter key of normal queue setting dead letter switch is a fixed value
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //Set the expiration time of 10s in ms, which can be set at the normal queue of consumers or at the sending place of producers
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    //Declare that the TTL of the normal queue is 40s
    @Bean("queueB")
    public Queue queueB(){
        //The normal queue binds the dead letter switch information, because the normal queue message will become dead letter
        Map<String, Object> arguments = new HashMap<>();
        //The parameter key of normal queue setting dead letter switch is a fixed value
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //Set the expiration time of 40s in ms, which can be set at the normal queue of consumers or at the sending place of producers
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //Declare that the TTL of the normal queue is 40s
    @Bean("queueC")
    public Queue queueC(){
        //The normal queue binds the dead letter switch information, because the normal queue message will become dead letter
        Map<String, Object> arguments = new HashMap<>();
        //The parameter key of normal queue setting dead letter switch is a fixed value
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
    //Declare dead letter queue
    @Bean("queueD")
    public Queue queueD(){
        //No parameters
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    //After the five components are completed, they begin to bind to each other
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //Bind X normal switch to QA queue
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //Bind the X normal switch to the QB queue
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        //Bind the X normal switch to the QB queue
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
    @Bean
    public Binding queueABindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        //Bind Y dead letter switch to QD dead letter queue
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

b. Producer

package com.lian.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * Send delay message
 */
@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController2 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
        log.info("Current time:{},Length of time to send a message{}millisecond ttl Message to queue", new Date().toString(), message, ttlTime);
        rabbitTemplate.convertAndSend("X","XA","Message from ttl For 10 s Queue of"+message);
        rabbitTemplate.convertAndSend("X","XC",message,msg -> {
            //How long is the delay when sending a message
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }
}

c. Testing

Initiate request
http://localhost:8080/ttl/sendExpirationMsg/ Hello 1 / 20000
http://localhost:8080/ttl/sendExpirationMsg/ Hello 2 / 2000

It is reasonable that the queue message sent for 2 seconds should be consumed first, but the queue for 20 seconds was consumed first. The queue for 2 seconds did not die on time

It seems that there is no problem, but at the beginning, I introduced how to set TTL on message properties, Messages may not "die" on time, because RabbitMQ will only check whether the first message expires. If it expires, it will be thrown into the dead letter queue. If the delay time of the first message is very long and the delay time of the second message is very short, the second message will not be executed first.

4. The Rabbitmq plug-in implements delay queues

The problem mentioned above is indeed a problem. If the TTL on message granularity cannot be realized and it dies in time at the set TTL time, it cannot be designed into a general delay queue. Then how to solve it? Next, let's solve the problem.

Installing the delay queue plug-in
Download on the official website https://www.rabbitmq.com/community-plugins.html , Download
rabbitmq_delayed_message_exchange plug-in, and then unzip and place it in the plug-in directory of RabbitMQ. Enter the plugin directory under the RabbitMQ installation directory, execute the following command to make the plug-in effective, and then restart RabbitMQ

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Configuration file class code
In our custom switch, this is a new exchange type. This type of message supports the delayed delivery mechanism. After message delivery, it will not be delivered to the target queue immediately, but will be stored in the mnesia (a distributed data system) table. When the delivery time is reached, it will be delivered to the target queue

Configuration class

@Configuration
public class DelayedQueueConfig {
 public static final String DELAYED_QUEUE_NAME = "delayed.queue";
 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
 public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
 @Bean
 public Queue delayedQueue() {
 return new Queue(DELAYED_QUEUE_NAME);
 }
 //Custom switch what we define here is a delay switch
 @Bean
 public CustomExchange delayedExchange() {
 Map<String, Object> args = new HashMap<>();
 //Custom switch type
 args.put("x-delayed-type", "direct");
 return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, 
args);
 }
 @Bean
 public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
 @Qualifier("delayedExchange") CustomExchange 
delayedExchange) {
 return 
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
 } }

Message producer

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
 rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, 
correlationData ->{
 correlationData.getMessageProperties().setDelay(delayTime);
 return correlationData;
 });
 log.info(" Current time: {}, Send a delay message {} Milliseconds of information to the queue delayed.queue:{}", new 
Date(),delayTime, message);
}

Message consumer

public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
 String msg = new String(message.getBody());
 log.info("Current time:{},Message received from delay queue:{}", new Date().toString(), msg);
}

Initiate request:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000

The second message was consumed first, in line with expectations

summary

Delay queues are very useful in situations where delay processing is required. Using RabbitMQ to implement delay queues can be well utilized
RabbitMQ features, such as reliable message sending, reliable message delivery and dead letter queue, to ensure that messages are consumed at least once and that messages that are not processed correctly will not be discarded. In addition, through the characteristics of RabbitMQ cluster, the single point of failure problem can be well solved, and the delay queue will not be unavailable or messages will not be lost because a single node hangs up.
Of course, there are many other options for delay queue, such as using Java's DelayQueue, Redis's zset, and Quartz
Or use kafka's time wheel. These methods have their own characteristics, depending on the applicable scenarios

Keywords: RabbitMQ

Added by squiggerz on Tue, 04 Jan 2022 23:03:50 +0200