rabbitMq implements delay queue

Business scenario:

1. If the order is not paid for 30 minutes, it will be automatically cancelled. How can we achieve this?
2. Send a text message to the user 60 seconds after the order is generated

1 install rabbitMq

windows installer
Installed in ubuntu

2 add maven dependency

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3. Configure in application.properties

spring.application.name=rabbitmq-hello
# Configure rabbbitMq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=springCloud
spring.rabbitmq.password=123456

4 specific implementation

Rabbitmq itself does not have the function of delaying sending, but we implement it through the TTL (Time To Live) of messages. The so-called TTL refers to the survival time of messages. Rabbitmq can set TTL for queues and messages respectively.
Setting the queue is the retention time of the queue without consumers. You can also set each individual message separately. After this time, we think the news is dead, which is called dead letter. If the queue is set and the message is set, the smaller value will be taken. Therefore, if a message is routed to different queues, the time of death of the message may be different (different queue settings). The TTL of a single message is discussed here, because it is the key to realize the delay task.
We can set the time by setting the expiration field of the message or the x-message-ttl attribute. Both have the same effect. Only the expiration field is a string parameter, so you need to write a string of type int:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60*1000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

When the above message is thrown into the queue, after 60 seconds, if it is not consumed, it will die. Will not be consumed by consumers.
Behind this message, there is no "dead" message on the top, which is consumed by consumers. Dead letter will not be deleted or released in the queue. It will be counted into the number of messages in the queue. Dead letter alone can not achieve delayed tasks, but also Dead Letter Exchange.
Let me briefly explain Dead Letter Exchanges

4.1 Dead Letter Exchanges

A message will enter the dead letter route if it meets the following conditions. Remember that this is a route rather than a queue. A route can correspond to many queues.

  • 1. A message is rejected by the Consumer, and the request in the parameter of the reject method is false. In other words, it will not be put in the queue again and used by other consumers.
  • 2. The TTL of the above message has arrived and the message has expired.
  • 3. The queue length limit is full. The messages in the front row will be discarded or thrown on the dead letter route.

Dead Letter Exchange is actually an ordinary exchange, just like creating other exchanges. Only when a message expires in a queue set up for Dead Letter Exchange, the message forwarding will be automatically triggered and sent to Dead Letter Exchange.

4.2 implementation of delay queue

The delay task is implemented through TTL and Dead Letter Exchange of messages. We need to establish two queues, one for sending messages and the other for forwarding after message expiration. The general principle is shown in the figure below.

The producer outputs a message to Queue1, and the message is set with an effective time, such as 60s. The message will wait for 60s in Queue1. If no consumer receives it, it will be forwarded to Queue2. Queue2 has consumers who receive it and process the delayed task.
Next, we officially enter the code phase

code

  • Declare switches, queues, and their binding relationships:
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";//Ordinary switch
    public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";//Declare two queues a and B
    public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";
    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";//Dead Letter Exchanges
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";//Dead letter switch
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";

    // Claim delay Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // Declare dead letter Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // Claim delay queue A delay 10s
    // And bind to the corresponding dead letter switch
    @Bean("delayQueueA")
    public Queue delayQueueA() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange here declares the dead letter switch bound to the current queue
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key here, the dead letter routing key of the current queue is declared
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        // x-message-ttl declares the TTL of the queue
        args.put("x-message-ttl", 1000 * 10);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    // Claim delay queue B delay 60s
    // And bind to the corresponding dead letter switch
    @Bean("delayQueueB")
    public Queue delayQueueB() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange here declares the dead letter switch bound to the current queue
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key here, the dead letter routing key of the current queue is declared
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        // x-message-ttl declares the TTL of the queue
        args.put("x-message-ttl", 60000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
    }

    // Declare that dead letter queue A is used to receive messages processed with A delay of 10s
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // Declare that dead letter queue B is used to receive messages processed with a delay of 60s
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB() {
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // Declare delay queue A binding relationship
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    }

    // Declare business queue B binding relationship
    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    }

    // Declare the binding relationship of dead letter queue A
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // Declare the binding relationship of dead letter queue B
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}
  • Message producer
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import static com.talent.infocenter.rabbitmq.RabbitMQConfig.*;

@Service
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public enum DelayTypeEnum {
        DELAY_10s, DELAY_60s;
    }

    public static DelayTypeEnum getByIntValue(int value) {
        switch (value) {
            case 10:
                return DelayTypeEnum.DELAY_10s;
            case 60:
                return DelayTypeEnum.DELAY_60s;
            default:
                return null;
        }
    }

    public void sendMsg(String msg, DelayTypeEnum type) {
        switch (type) {
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }
    }
}
  • consumer
    We create two consumers who consume 10s and 60s orders respectively
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

import static com.talent.infocenter.rabbitmq.RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.talent.infocenter.rabbitmq.RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME;

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("Current time:{},Dead letter queue A Message received:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("Current time:{},Dead letter queue B Message received:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

  • Create an interface to test
import com.talent.infocenter.rabbitmq.DelayMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Objects;

@Slf4j
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;

    @RequestMapping(value = "sendmsg", method = RequestMethod.GET)
    public void sendMsg(@RequestParam(value = "msg") String msg, @RequestParam(value = "delayType") Integer delayType) {
        log.info("Current time:{},Upon receipt of the request, msg:{},delayType:{}", new Date(), msg, delayType);
        sender.sendMsg(msg, Objects.requireNonNull(DelayMessageSender.getByIntValue(delayType)));
    }
}

Next, start the test. I use swagger. You can use other methods such as postman to test yourself

Open our rabbitmq background to see our switch and queue information

In the same way, we create an order that can only be consumed after 60s

The above implementation can only set two specified times of 10s and 60s. Next, we set the delay queue at any time

4.3 RabbitMq optimization

We need a more general scheme to meet the requirements, so we can only set TTL in the message attribute. Only in this way can we more flexibly realize the specific business development of delay queue. The method is also very simple. We only need to add a delay queue to receive messages set to any delay time, At the same time, a corresponding dead letter queue and routingkey are added, but this method has a great disadvantage that if TTL is set on the message attribute, the message may not "die" on time ", because rabbitmq only checks whether the first message has expired. If it has expired, it will be thrown into the dead letter queue. Therefore, if the delay time of the first message is very long and the delay time of the second message is very short, the second message will not be executed first. No code will be written here. However, to solve this problem, we will use rabbitmq plug-in to implement delay Queue.

4.4 using plug-in to realize delay queue

4.4.1 downloading plug-ins

Click here to download

Unzip after downloading. It is recommended to unzip bandzip and put the unzipped folder in the plugins directory under the rabbitmq installation directory

Enter the sbin directory and use cmd to execute the following instructions to enable the plug-in

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

After performing the above steps, restart our rabbitmq service

  • 1. Access to services

  • 2. Enter the sbin directory and double-click rabbitmq-server.bat

Verify that access is restarted successfully http://localhost:15672
If the access is successful, the restart is successful

4.4.2 coding

Recreate a configuration class

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedRabbitMQConfig {
    public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
    public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";

    @Bean
    public Queue immediateQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

Create a new message sender

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import static com.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_EXCHANGE_NAME;
import static com.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_ROUTING_KEY;

@Service
public class Provider {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendDelayMsg(String msg, Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -> {
            a.getMessageProperties().setDelay(delayTime*1000);
            return a;
        });
    }
}

Create a new message consumer

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

import static com.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_QUEUE_NAME;

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("Current time:{},Delay queue received message:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

Modify our previous interface

import com.talent.api.utils.RedisUtils;
import com.talent.infocenter.rabbitMq.Provider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Objects;

@Slf4j
@RestController
public class RabbitMQMsgController {

    @Autowired
    private Provider provider;

    @RequestMapping(value = "sendmsg", method = RequestMethod.GET)
    public void sendMsg(@RequestParam(value = "msg") String msg, @RequestParam(value = "delayTime") Integer delayTime) {
        log.info("Current time:{},Upon receipt of the request, msg:{},delayTime:{}", new Date(), msg, delayTime);
        provider.sendDelayMsg(msg, delayTime);
    }
}

Next, start the test

Then test whether our orders in different orders are consumed in chronological order
We set order 0002 to 60s and order 0003 to 15s to see if order 0003 can be consumed before 0002

As a result, it is obvious that order 0003 was indeed consumed at the 15th s

5. Summary

Delay queue is very useful and stable in situations requiring delay processing. Using RabbitMQ to implement delay queue can make good use of RabbitMQ's characteristics, such as reliable message sending, reliable message delivery and dead letter queue to ensure that messages are consumed at least once and messages that are not processed correctly will not be discarded. In addition, through the characteristics of RabbitMQ cluster, the single point of failure problem can be well solved, and the delay queue will not be unavailable or messages will not be lost because a single node hangs up.
Of course, there are many other options for delay queue, such as using Java's delayqueue, Redis's zset, Quartz or kafka's time wheel. These methods have their own characteristics.

Keywords: Java RabbitMQ Spring Boot Distribution

Added by badre on Tue, 07 Dec 2021 12:00:42 +0200