[RabbitMQ series that laymen can understand] - advanced part of RabbitMQ: implementing delay queue through plug-in (including implementation code and rabbitmq_delayed_message_exchange installation)


preface

Congratulations to all the friends who read this article for successfully unlocking the advanced feature plug-in version of RabbitMQ series Content of delay queue 🎁 Through this article, you will clearly understand: what is delay queue? Delay queue usage scenario? How to install the BBU # exchange # Q plug-in? 😄 At the end of this paper, Xiaoming will make some modifications to the example of the previous article to achieve the new effect 😁

1, What is a delay queue

What is a delay queue? As the name suggests: first, it is a queue, and then it is attached with a function of delaying the consumption of queue messages, that is, the elements in the delay queue have time attributes, and you can specify at which time point the messages in the queue are consumed.

In short, a delay queue is a queue used to store elements that need to be processed at a specified time.

2, Delay queue usage scenario

Our common delay queue application scenarios:
1. If no payment is made within 30 minutes after the order is successful, the order will be automatically cancelled and the user will be notified
2. After the user is registered successfully, if he does not log in within three days, he will be reminded by SMS.
3. If the newly-built merchants on the e-commerce platform have not uploaded commodity information within one month, the shops will be frozen
4. After the scheduled meeting, all participants need to be notified to attend the meeting ten minutes before the scheduled time point.
2.1 analyze the above scenarios
These scenes are common to us, so let's think about it:

Take the first scenario. After the system creates an order, it needs to cancel all orders that have not been paid for more than 30 minutes. Take the nickname of "severe choice phobia". Maybe it's OK to add a shopping cart and pay. Suddenly, I stopped after seeing the price on the payment interface. I found that I didn't need it so much, so I gave up paying. I believe there are many such small partners in one day. For example, Xiaoming gave up payment at 03:15, Xiaohong gave up payment at 03:16, Xiaogang gave up payment at 03:15, and Xiaowang gave up payment at 04:45... How can we let the system know that Xiaoming and Xiaogang were notified at 03:45, Xiaohong at 3:46, and Xiaowang at 05:15?

Another example is the following scenarios: when a new user registration event occurs, check the activity data of the newly registered user three days later, and then notify the user without any activity record; If the Merchant fails to upload commodity information within one month, the merchant's shop will be frozen; If a scheduled meeting event occurs, judge whether it is only ten minutes from the beginning of the meeting. If so, notify all participants.

2.2 time-saving and labor-saving solutions
In these scenarios, do you feel that you can use scheduled tasks, poll all data, check once per second, take out the data to be processed, and then run the corresponding business code for processing? Indeed, if the amount of data is relatively small, it saves time and effort.

For example: "after the user registers successfully, if he / she does not log in within three days, he / she will be reminded by SMS". This is not a strict time limit requirement,
We can run a scheduled task every night to check all users who haven't logged in three days after registration. This is really a feasible scheme.

2.3 disadvantages of the above methods
If the data volume is relatively large and the timeliness is strong, "orders that have not been paid within 30 minutes will be automatically cancelled and the user will be notified", the data of orders that have not been paid may be amazing in a very short time. If it is one million or even ten million during the event, the above polling method is still used for such a huge amount of data, Obviously, it is not desirable. It is likely that the inspection of all orders can not be completed in one second. At the same time, it will bring great pressure to the database, unable to meet the business requirements and low performance.

2.4 necessity of using delay queue
If the delayed message queue is used, we will delay the time for 30 minutes while creating the order, put it into the message middleware, and then take it out for consumption as soon as the time comes.

3, TTL in RabbitMQ

The nickname above has been explained. Here, let's briefly recall it

Requirements:

Simulate two situations when users buy goods in the mall: 1 Successful order, 2 Timeout reminder

  1. User orders
  2. After placing an order, the user displays the waiting for payment page
  3. Click the payment button on the page. If it does not time out, it will jump to the payment success page
  4. If the timeout occurs, a message notification will be sent to the user to ask whether the user has not paid yet and whether it is still needed?

Configuration code:

//The order can exist for 10s at most
args.put("x-message-ttl", 10 * 1000);
//Declare the dead letter switch bound by the current queue
args.put("x-dead-letter-exchange", "ex.go.dlx");
//Declare the dead letter routing key bound by the current queue
args.put("x-dead-letter-routing-key", "go.dlx");

Time To Live(TTL)
RabbitMQ can set x-message-ttl for the queue (set the message separately, and the TTL of each message can be different) to control the lifetime of the message. If it times out, the message becomes dead letter. In short, if the TTL attribute of the queue is set, the message will be discarded by the queue once it expires.

4, Install rabbitmq_delayed_message_exchange

Due to the external network, the download speed may be a little slow. The nickname here helps you prepare the installation package, which you can download and use directly

Address:

https://wwp.lanzouq.com/ifWwA007nwmf

password:

eamon

Step 1: after downloading, unzip the file to the local computer (the network disk requires that the unrecognized *. ez file cannot be uploaded, and the small name compresses the uploaded file)
Step 2: upload to the server's RabbitMQ plug-in directory (/ rabbitmq/plugins)
Step 3: enter the sbin directory under the RabbitMQ installation directory and execute the following command to make the plug-in effective

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Step 4: restart RabbitMQ

Shut down service:

rabbitmqctl stop

Start service:

rabbitmq-server -detached

5, An example of implementing the delay queue of the plug-in version

5.1 new scenarios

Assuming that the method in the previous article is just to push information in an app and add new requirements later, such as email in one minute, SMS reminder in one hour, etc., we need to create many queues to receive different messages, Moreover, we cannot guarantee that the reminders of these orders are in order (that is, there may be A time stamp of "A" in the queue that is longer than that of "B" in the queue). At this time, we need A more general way to send such messages. Here we use the delay queue plug-in rabbitmq_delayed_message_exchange mentioned above

5.2 adjustment demand

Requirements of the previous article:
Simulate two situations when users buy goods in the mall: 1 Successful order, 2 Timeout reminder

  1. User orders
  2. After placing an order, the user displays the waiting for payment page
  3. Click the payment button on the page. If it does not time out, it will jump to the payment success page
  4. If it exceeds 10s, a system message notification will be sent to the user to ask whether the user still needs to pay?
    The above middle and small names have implemented a function of sending messages to users over a timeout of 10s. Next, we will do the following to the code of the previous article

5.3 modify code according to new requirements

  1. New queue binding
@Configuration
public class DelayedConfig {
    public static final String DELAYED_QUEUE_NAME = "q.delay.plugin";
    public static final String DELAYED_EXCHANGE_NAME = "ex.delay.plugin";
    public static final String DELAYED_ROUTING_KEY = "delay.plugin";

    @Bean
    public Queue queueDelay() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange exchangeDelay() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayPlugin(@Qualifier("queueDelay") Queue queue,
                           @Qualifier("exchangeDelay") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}
  1. Make some changes to the listener
    The nickname first says the parts that need to be modified, turns over and compares them, and pastes the full version at the end of the article.

1) Add a new consumer

 //Plug in delay queue, listening
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("[※]Current time:"+new Date().toString()+",Delay queue received message:"+msg);
    }

2) New producers

@RabbitListener(queues = "q.go.dlx")
    public void dlxListener(Message message, Channel channel) throws IOException {
            //Omit
            //If the payment is not made, email the user 1 min later
            long t = System.currentTimeMillis();
            String delayOneMin  = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"[Email message]", delayOneMin);
            //No payment, send text message to the user after 1 hour
            String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"[[SMS message]", delayOneHour);

        }
}


public void sendDelayMsgByPlugin(String msg, String delayTime) {
    System.out.println("delay time "+delayTime);
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
        a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60 * 1000 and integer The difference between valueof (delaytime)
        return a;
    });
}

[full version code]

@Component
@Slf4j
public class MqListener {

    @Autowired
    IPracticeDlxOrderService iPracticeDlxOrderService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "q.go.dlx")
    public void dlxListener(Message message, Channel channel) throws IOException {
        System.out.println("Payment timeout");
        Long id = Long.valueOf(new String(message.getBody(), "utf-8"));

        PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one();
        Boolean payStatue = order.getPay();
        //Determine whether to pay
        if (!payStatue) {//Unpaid, modification not timed out
            UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>();
            dlxOrder.eq("id", id);
            dlxOrder.set("timeout", 1);
            iPracticeDlxOrderService.update(dlxOrder);
            log.info("Current time:{},Upon receipt of the request, msg:{},delayTime:{}", new Date(), message, new Date().toString());
            //If not paid, send app information to the user after 10
            sendDelayMsg(id);
            //If the payment is not made, email the user 1 min later
            long t = System.currentTimeMillis();
            String delayOneMin  = String.valueOf(this.dateRoll(new Date(), Calendar.MINUTE, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"[Email message]", delayOneMin);
            //No payment, send text message to the user after 1 hour
            String delayOneHour = String.valueOf(this.dateRoll(new Date(), Calendar.HOUR, 1).getTime() - t);
            sendDelayMsgByPlugin(message.getBody()+"[[SMS message]", delayOneHour);
        }
    }

    public Date dateRoll(Date date, int i, int d) {
        // Get the Calendar object and subject to the time passed in
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(date);
        // Scroll the current time for a fixed length of time and convert it to Date type assignment
        calendar.add(i, d);
        // Conversion to Date type and re assignment
        date = calendar.getTime();
        return date;
    }

    //Dead letter queue listening
    @RabbitListener(queues = "q.delay")
    public void delayListener(Message message, Channel channel) throws IOException {
        System.out.println(new String(message.getBody()));
    }

    //Plug in delay queue, listening
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("[※]Current time:"+new Date().toString()+",Delay queue received message:"+msg);

    }

    /**
     * If the payment is not made, send a message to the user after 10s
     */
    public void sendDelayMsg(Long id){
        rabbitTemplate.setMandatory(true);
        //id + timestamp globally unique
        Date date = DateUtil.getDate(new Date(),1,10);
        CorrelationData correlationData = new CorrelationData(date.toString());

        //Specify the header delay time when sending a message
        rabbitTemplate.convertAndSend("ex.delay", "q.delay", "Your order number:" + id + "Not paid yet, do you still need it?",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //Set message persistence
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setDelay(10*1000);
                        return message;
                    }
                }, correlationData);
    }

    /**
     *
     * @param msg
     * @param delayTime
     */
    public void sendDelayMsgByPlugin(String msg, String delayTime) {
        System.out.println("delay time "+delayTime);
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
            a.getMessageProperties().setDelay(Integer.valueOf(delayTime));//60 * 1000 and integer The difference between valueof (delaytime)
            return a;
        });
    }
}
  1. Operation results:
Payment timeout
2022-02-17 17:28:10.650  INFO 18324 --- [ntContainer#2-1] e.mq.dlx.modules.listener.MqListener: current time: Thu Feb 17:28:10 CST 2022, received the request, msg:(Body:'1494242214543482881' MessageProperties [headers={x-first-death-exchange=ex.go, x-death=[{reason=expired, count=1, exchange=ex.go, time=Thu Feb 17 17:28:08 CST 2022, routing-keys=[go], queue=q.go}], x-first-death-reason=expired, x-first-death-queue=q.go}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex.go.dlx, receivedRoutingKey=go.dlx, deliveryTag=1, consumerTag=amq.ctag-SasPqfbiS6-pt-e54uV5Hw, consumerQueue=q.go.dlx]),delayTime:Thu Feb 17 17:28:10 CST 2022
 Delay time 60000
 Your order number:1494242214543482881 Not paid yet, do you still need it?
Delay time 3616616
2022-02-17 17:28:27.268  WARN 18324 --- [nectionFactory1] o.s.amqp.rabbit.core.RabbitTemplate      : Returned message but no callback available
[※]Current time: Thu Feb 17 17:29:27 CST 2022,Delay queue received message:[B@36c9cd1[Email message]
[※]Current time: Thu Feb 17 18:28:44 CST 2022,Delay queue received message:[B@36c9cd1[[SMS message]

Keywords: Java RabbitMQ Spring Boot Distribution Middleware

Added by larsojl on Wed, 02 Mar 2022 02:50:54 +0200