RabbitMQ advanced features - dead letter queue

[TOC]

Dead letter

What is a dead letter

News becomes a dead letter without any consumers to consume

There are several situations when a message becomes a dead letter

  1. Message rejected (basic.reject/basic.nack), and request = false
  2. Message TTL expired
  3. Queue reaches maximum length

Dead Letter Exchange(DLX)

DLX

With DLX, when a message becomes a dead letter in a queue, it can be re publish ed to another exchange, which is DLX.

DLX is also a normal Exchange. It is no different from general Exchange. It can be specified on any queue. In fact, it is to set the properties of a queue. When there is dead letter in the queue, RabbitMQ will automatically send the dead letter message to the set DLX, and then it will be routed to another queue. You can listen to this queue for subsequent processing.

Dead letter queue settings

  1. Declare Exchange and queue of dead letter queue, and then bind
  2. Declare the normal queue Exchange and queue binding, but add the parameters arguments.put (x-dead-letter-Exchange "," you Dlx ") to the queue;

code implementation

producer

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Consumer manual ack and nack
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 13:25
 * @since JDK1.8
 * @version V1.0
 */

public class Producer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.abc";
        // Declare exchange
        channel.exchangeDeclare(exchangeName, "topic");

        String msg = "Normal message 1,routingKey:" + routingKey;
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build();
        channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8"));
        // No consumer consumption for this message
        String msg2 = "Expired dead letter message 2, routingKey:" + routingKey;
        channel.basicPublish(exchangeName, routingKey, false, props, msg2.getBytes("UTF-8"));
        String msg3 = "Expired dead letter message 3, routingKey:" + routingKey;
        channel.basicPublish(exchangeName, routingKey, false, props, msg3.getBytes("UTF-8"));
        String msg4 = "Expired dead letter message 4, routingKey:" + routingKey;
        channel.basicPublish(exchangeName, routingKey, false, props, msg4.getBytes("UTF-8"));
        channel.close();
        connection.close();
    }

}

producer can use message expiration to generate dead letter

Normal consumer

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * Consumer manual ack and nack
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 14:07
 * @since JDK1.8
 * @version V1.0
 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        // Define Exchange for dead letter
        String dlxExchange = "dlx.exchange";
        channel.exchangeDeclare(dlxExchange, "topic");
        // Dead letter
        String dlxQueue = "dlx.queue";
        channel.queueDeclare(dlxQueue, true, false, false, null);
        // #Indicates that all key s can be routed to the s dead letter queue
        String dlxRoutingKey = "#";
        // Bind dead letter queue and exchange
        channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null);

        // Define normal consumer j listening queue
        String queueName = "test_dlx_queue";
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        // Declare exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // Declare queue

        Map<String, Object> arguments = new HashMap<>();
        // Set dead letter queue, arguments to be set on the declared queue
        arguments.put("x-dead-letter-exchange", dlxExchange);
        channel.queueDeclare(queueName, true, false, false, arguments);
        // Queue bound to exchange
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.basicQos(1);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {

                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("---Consumer-- " + new String(message.getBody(), "UTF-8"));

            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---Consumer--: cancelCallback ");
            }
        };

        // For consumption message, autoAck must be set to false and ack manually
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

Operation result: only one normal message is consumed, and other expired ones are not consumed

Dlxconsmer, listening for messages in the consumption dead letter queue

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * Listen to private message queue
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 21:52
 * @since JDK1.8
 * @version V1.0
 */

public class DLXConusmer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "dlx.queue";
        String exchangeName = "dlx.exchange";
        String routingKey = "#";

        // Declare exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // Declare queue
        channel.queueDeclare(queueName, true, false, false, null);
        // Queue bound to exchange
        channel.queueBind(queueName, exchangeName, routingKey, null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    System.out.println("---Dead letter queue consumer---");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                } finally {
                    // consumer manual ack to broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---Consumer: cancelCallback");
            }
        };

        // Consumption message, autoAck must be set to false
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

Operation result: three expired messages enter the dead letter queue and are consumed

Keywords: Java RabbitMQ

Added by abcd1234 on Sat, 23 Nov 2019 22:08:58 +0200