Implementation of delayed tasks based on message queue (RabbitMQ)

1, Preface

Delayed tasks are widely used. Typical application scenarios of delayed tasks include automatic cancellation of orders when they timeout; Payment callback retry. Among them, the order overtime cancellation has idempotent property, and there is no need to consider the problem of repeated consumption; Payment callback retry needs to consider the problem of repeated consumption.

Delayed tasks have the following characteristics: execute at a certain time point in the future; Generally, it is executed only once.

1. Implementation principle

The producer sends the message with delay information to the RabbitMQ switch, and the party waiting for the end of the delay time forwards the message to the bound queue. The consumer consumes the message by listening to the queue. The key to delaying the task is that messages stay in the switch.

Obviously, the implementation of delayed tasks based on RabbitMQ requires very high reliability of the server. There is no persistence mechanism for internal messages of the switch, such as single machine mode service restart, and the non started delayed tasks are lost.

2. Component selection

2, Scheme design

(1) Server

The RabbitMQ service needs to install the x-delayed-message plug-in to handle delayed messages.

(2) Producer

The implementation of the delayed task requires the producer to deliver the message to the switch reliably, so the confirm confirmation mechanism can be used.

After the order is generated, it is received first, then the order details are stored in Redis (persistent) with the order ID as the key, and an asynchronous confirm confirmation request is sent to RabbitMQ. If you receive the return of normal delivery, delete the data with the order ID of key in Redis and recover the memory. Otherwise, with the order ID of key, query the order data from Redis and resend it.

(3) Consumer

The realization of delayed tasks requires consumers to consume messages in a way that information is not lost. The specific performance is as follows: manually confirm the consumption of messages to prevent message loss; The consumer side continues to be stable to prevent message accumulation; Message consumption fails. There is a retry mechanism.

Considering that order delay cancellation belongs to idempotent operation, it is unnecessary to consider the repeated consumption of messages.

3, SpringBoot implementation

Only part of the core source code is pasted in the implementation part. Please visit for the complete project GitHub.

(1) Producer

Considering that placing an order is a very important operation, first drop and save the order, and then carry out subsequent operations.

for (long i = 1; i <= 10; i++) {
    /* 1.Simulated order generation */
    BuOrder order = createOrder(i);
    /* 2.Order warehousing */
    orderService.removeById(order);
    orderService.saveOrUpdate(order);
    /* 3.Store the order information in Redis */
    RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order);
    /* 4.Asynchronous delivery of messages to RabbitMQ */
    rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
}

Producers deliver messages reliably

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (correlationData == null) {
        return;
    }
    String key = ORDER_PREFIX + correlationData.getId();
    if (ack) {
        /* If the message is delivered successfully, the order data in Redis will be deleted and memory will be recovered */
        RedisUtils.deleteObject(key);
    } else {
        /* Read the order data from Redis and deliver it again */
        BuOrder order = RedisUtils.getObject(key, BuOrder.class);
        /* Redelivery message */
        rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
    }
}

(2) Consumer

The consumer side confirms manually to avoid message loss; Failed to retry automatically.

@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME)
public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException {
    if (Objects.equals(0, order.getOrderStatus())) {
        /* Modify the order status and set it to closed status */
        orderService.updateById(new BuOrder(order.getOrderId(), -1));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info(String.format("Consumer node 01 consumption number is[%s]News of", order.getOrderId()));
    }
}

For reliable consumption, consumers should open at least two or more applications to ensure that there is no backlog of messages in the message queue.

(3) General Toolkit

The above code involves a tool class RabbitUtils, which exists in the following dependencies and mainly encapsulates the tools and methods commonly used by RabbitMQ.

<dependency>
  <groupId>xin.altitude.cms</groupId>
  <artifactId>ucode-cms-common</artifactId>
  <version>1.4.3.1</version>
</dependency>

Keywords: Java JavaEE RabbitMQ message queue

Added by kye on Tue, 08 Mar 2022 02:42:02 +0200