[RabbitMQ series that laymen can understand] - dead letter queue of RabbitMQ advanced (including video demonstration service and service code)


preface

Congratulations to all the friends who read this article for successfully unlocking the advanced features of RabbitMQ series Contents of dead letter queue 🎁 Through this article, you will clearly understand: what is dead letter? What is a dead letter queue? How to use the dead letter queue? etc. 😄 Finally, Xiaoming will help you remember the dead letter queue through an example 😁

1, What is a dead letter queue

For some reasons, messages cannot be delivered correctly or consumed normally. In order to ensure that such messages will not be discarded for no reason, they will generally be stored in a queue with a special role, which is generally called dead letter queue.

If the dead letter queue information is configured, the message will be dropped into the dead letter queue. If it is not configured, the message will be dropped. Therefore, dead letter queue is a guarantee mechanism for exception handling messages in RabbitMQ.

2, What kind of news will become dead letter

  1. Message rejected, use channel Basicnack or channel Basicreject, and the request property is set to false.
  2. The lifetime of the message in the queue exceeds the set TTL time
  3. The number of messages in the message queue has exceeded the maximum queue length

3, What is a dead letter exchange

Dead letter exchange (DLX), called dead letter exchange. When a message becomes dead message in a queue, it is re sent to another switch, which is DLX. The queue bound to DLX is called dead message queue.

4, Dead letter processing process

DLX is also a normal switch. It is no different from a general switch. In fact, it is to set the properties of a queue, which can be specified on any queue. When there is a dead letter in this queue, RabbitMQ will automatically republish this message to the set DLX, and then it will be routed to another dead letter queue.

5, How to use dead letter switch

Configuring the dead letter queue is divided into the following steps:

  1. Configure the service queue and bind it to the service switch
  2. Configure dead letter switch and routing key for service queue
  3. Configure dead letter queue for dead letter switch

Configuration code:

//The order can exist for 10s at most
args.put("x-message-ttl", 10 * 1000);
//Declare the dead letter switch bound by the current queue
args.put("x-dead-letter-exchange", "ex.go.dlx");
//Declare the dead letter routing key bound by the current queue
args.put("x-dead-letter-routing-key", "go.dlx");

web interface:

Time To Live(TTL)

RabbitMQ can set x-message-ttl for the queue (set the message separately, and the TTL of each message can be different) to control the lifetime of the message. If it times out, the message will become dead letter

Dead Letter Exchanges(DLX)

The Queue of RabbitMQ can be configured with two parameters: x-dead-letter-exchange and x-dead-letter-routing-key (optional). If a dead letter appears in the Queue, it will be rerouted and forwarded to the specified Queue according to these two parameters.
x-dead-letter-exchange: resend the dead letter to the specified exchange after the dead letter appears.
x-dead-letter-routing-key: send the dead letter according to the specified routing key again after the dead letter appears.

6, Examples

Demo business:
Simulate two situations when users buy goods in the mall: 1 Successful order, 2 Timeout reminder

Because the video is recorded in a small name and split screen, the picture is a little long. Please bother the little partner who can't see the content clearly Click step Full screen viewing

Business scenario:

  1. User orders
  2. After placing an order, the user displays the waiting for payment page
  3. Click the payment button on the page. If it does not time out, it will jump to the payment success page
  4. If the timeout occurs, a message notification will be sent to the user to ask whether the user has not paid yet and whether it is still needed?

6.1 database table design

CREATE TABLE `practice_dlx_order` (
  `id` bigint(20) NOT NULL,
  `name` varchar(255) DEFAULT NULL COMMENT 'Order name',
  `price` decimal(10,2) DEFAULT NULL COMMENT 'amount of money',
  `timeout` tinyint(1) DEFAULT '0' COMMENT 'Timed out: 1-Timed out, 0-No timeout',
  `pay` tinyint(1) DEFAULT NULL COMMENT 'Paid or not: 1-Paid, 0-Unpaid',
  `order_time` datetime DEFAULT NULL COMMENT 'Order time ',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

6.2 configuration file

spring:
  #thymeleaf configuration
  thymeleaf:
    #Template mode, such as HTML, XML, TEXT, JAVASCRIPT, etc
    mode: HTML5
    #Coding, no configuration required
    encoding: utf-8
    servlet:
      #Content category, no configuration required
      content-type: text/html
    #The development configuration is false to avoid modifying the template and restarting the server
    cache: false
    #Configure the template path. The default is templates. You don't need to configure it
    prefix: classpath:/templates/
  rabbitmq:
    host: deploy rabbitmq Server extranet ip
    port: 5672
    username: user name
    password: password
    publisher-confirms: true
    publisher-returns: true

6.3 page

  1. Order page
<html xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
    <title></title>
    <script type="text/javascript" src="http://code.jquery.com/jquery-1.10.0.js"></script>
</head>
<body>
<p>Order page</p>
<form id="form1" action="/mqConsumer/practice-dlx-order/payPage">
    <table border="1">
        <tr>
            <th>Name</th>
            <th>Price</th>
            <th>Pperate</th>
        </tr>
        <tr>
            <td>MacBook</td>
            <input name="form1name" value="MacBook" style="display:none">
            <td>$999</td>
            <input name="form1price" value="999" style="display:none">
            <td><button>Buy</button></td>
        </tr>
    </table>
</form>
</body>
</html>
  1. Checkout page
<html xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
    <title></title>
    <script type="text/javascript" src="http://code.jquery.com/jquery-1.10.0.js"></script>
</head>
<body>
<p>Checkout page</p>
<a id="out"></a>
<span id="id" th:text="${id}"></span>

<form id="form1" action="/mqConsumer/practice-dlx-order/paySuccessful">
    <input  th:name="form1id" th:value="${id}" style="display:none">
    <button>To pay</button>
</form>

</body>
</html>

<script type="text/javascript">
    window.onload = function () {
        var i = 10;
        var out = document.getElementById('out');
        var timer = setInterval(function () {
        //The setInterval() method can call a function or evaluate an expression for a specified period (in milliseconds).
            if (i == -1) {
                //The clearInterval() method cancels the timeout set by setInterval().
                clearInterval(timer);
                document.write("Payment timeout");
            } else {
                //document.body.innerHTML = i;
                out.innerHTML = i + "second";
                --i;
            }
        }, 1000);
    }
</script> 
  1. Payment success page
<html xmlns:th="http://www.thymeleaf.org" lang="zh-CN">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
    <title></title>
    <script type="text/javascript" src="http://code.jquery.com/jquery-1.10.0.js"></script>
</head>
<body>
<p>Payment succeeded! Waiting for delivery~</p>
</body>
</html>

6.4 queue binding

@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("ex.go", true, false);
    }

    @Bean
    public Queue queue() {
        Map<String, Object> args = new HashMap<>(3);
        //The order can exist for 10s at most
        args.put("x-message-ttl", 10 * 1000);
        //Declare the dead letter switch bound by the current queue
        args.put("x-dead-letter-exchange", "ex.go.dlx");
        //Declare the dead letter routing key bound by the current queue
        args.put("x-dead-letter-routing-key", "go.dlx");
        return new Queue("q.go", true, false, false, args);
    }

    @Bean
    public DirectExchange directExchangeDlx() {
        return new DirectExchange("ex.go.dlx", true, false);
    }

    @Bean
    public Queue queueDlx() {
        return new Queue("q.go.dlx", true, false, false);
    }

    /**
     * Delay queue switch
     *
     * @return
     */
    @Bean
    public DirectExchange delayExchange() {
        Map<String, Object> pros = new HashMap<>();
        //Set the switch to support delayed message push
        pros.put("x-delayed-message", "direct");
        DirectExchange directExchange = new DirectExchange("ex.delay", true, false, pros);
        directExchange.setDelayed(true);
        return directExchange;
    }

    /**
     * Delay queue
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>(3);
        return new Queue("q.delay");
    }

    /**
     * Bind switch to delay queue
     *
     * @return
     */
    @Bean
    public Binding bindingDelay(@Qualifier("delayQueue") Queue delayQueue,
                                @Qualifier("delayExchange") DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("q.delay");
    }

    @Bean
    @Resource
    public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("go").noargs();
    }

    @Bean
    @Resource
    public Binding bindingDlx(@Qualifier("queueDlx") Queue queueDlx, @Qualifier("directExchangeDlx") Exchange exchangeDlx) {
        return BindingBuilder.bind(queueDlx).to(exchangeDlx).with("go.dlx").noargs();
    }
}

6.5 creating producers

@Controller
@RequestMapping("//practice-dlx-order")
public class PracticeDlxOrderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private IPracticeDlxOrderService iPracticeDlxOrderService;

    @RequestMapping("/orderPage")
    public String orderPage() {
        return "/dlx/orderPage";
    }

    @RequestMapping("/payPage")
    public String payPage(HttpServletRequest httpServletRequest, ModelMap map) {

        String form1name = httpServletRequest.getParameter("form1name");
        String form1price = httpServletRequest.getParameter("form1price");

        PracticeDlxOrder practiceDlxOrder = new PracticeDlxOrder();
        practiceDlxOrder.setPay(false);//Paid: 1-paid, 0-unpaid
        practiceDlxOrder.setPrice(new BigDecimal(form1price));
        practiceDlxOrder.setName(form1name);
        practiceDlxOrder.setOrderTime(new Date());
        iPracticeDlxOrderService.getBaseMapper().insert(practiceDlxOrder);

        //Get id
        Long id = practiceDlxOrder.getId();
        map.addAttribute("id", id);
        rabbitTemplate.convertAndSend("ex.go", "go", String.valueOf(id));
        return "dlx/payPage";
    }

    //Payment successful
    @RequestMapping("/paySuccessful")
    public String paySuccessful(HttpServletRequest httpServletRequest) {
        String id = httpServletRequest.getParameter("form1id");
        UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>();
        dlxOrder.eq("id", Long.valueOf(id));
        dlxOrder.set("pay",true);
        iPracticeDlxOrderService.update(dlxOrder);
        return "/dlx/paySuccessful";
    }
}

6.6 creating consumers

@Component
@Slf4j
public class MqListener {

    @Autowired
    IPracticeDlxOrderService iPracticeDlxOrderService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "q.go.dlx")
    public void dlxListener(Message message, Channel channel) throws IOException {
        System.out.println("Payment timeout");
        Long id = Long.valueOf(new String(message.getBody(), "utf-8"));
        PracticeDlxOrder order = iPracticeDlxOrderService.lambdaQuery().eq(PracticeDlxOrder::getId, id).one();
        Boolean payStatue = order.getPay();
        //Judge whether to pay
        if (!payStatue) {//Unpaid, modification not timed out
            UpdateWrapper<PracticeDlxOrder> dlxOrder = new UpdateWrapper<>();
            dlxOrder.eq("id", id);
            dlxOrder.set("timeout", 1);
            iPracticeDlxOrderService.update(dlxOrder);
            log.info("Current time:{},Upon receipt of the request, msg:{},delayTime:{}", new Date(), message, new Date().toString());
            //If the payment is not made, send the app information to the user after 10s
            sendDelayMsg(id);
        }
}

    public Date dateRoll(Date date, int i, int d) {
        // Get the Calendar object and subject to the time passed in
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(date);
        // Scroll the current time for a fixed length of time and convert it to Date type assignment
        calendar.add(i, d);
        // Conversion to Date type and re assignment
        date = calendar.getTime();
        return date;
    }

    //Dead letter queue listening
    @RabbitListener(queues = "q.delay")
    public void delayListener(Message message, Channel channel) throws IOException {
        System.out.println(new String(message.getBody()));
    }

     /**
     * If the payment is not made, send a message to the user after 10s
     */
    public void sendDelayMsg(Long id){
        rabbitTemplate.setMandatory(true);
        //id + timestamp globally unique
        Date date = DateUtil.getDate(new Date(),1,10);
        CorrelationData correlationData = new CorrelationData(date.toString());

        //Specify the header delay time when sending a message
        rabbitTemplate.convertAndSend("ex.delay", "q.delay", "Your order number:" + id + "Not paid yet, do you still need it?",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //Set message persistence
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setDelay(10*1000);
                        return message;
                    }
                }, correlationData);
    }
}

🎊 It's done. You can Point me View results 🎊

If you find any mistakes in the article, I hope you can give criticism and correction in the comment area 🤝 If you think the article of nickname has helped you, please follow the column of nickname [RabbitMQ] to support nickname 😄, Praise the article of nickname 👍, comment ✍, Collection 🤞 Thank you~ ♥♥♥

Keywords: Java CentOS RabbitMQ Spring Boot Middleware

Added by billcoker on Mon, 28 Feb 2022 02:39:48 +0200