RabbitMQ dead letter queue mechanism

The callback queue of RabbitMQ solves the reason why RabbitMQ cannot receive messages due to abnormal conditions, but it is generally used in enterprises

It is not practical to use the replay queue, but more uses the dead letter queue mechanism to ensure that the consumer can receive specific messages,

In fact, it is essentially a guarantee mechanism for the reliability of message consumers.

1, What is a dead letter queue

The full name of the dead letter queue is Dead Letter Exchange, so the private letter queue is abbreviated as DLX. When the producer sends a message

If the consumer fails to receive the message, the message will be sent to the dead letter queue to ensure the consumption of the message. Message consumption in RabbitMQ

In the processing mechanism, when there is a dead letter in the queue, RabbitMQ will automatically switch to the republished Exchange, so as to explain the reason

To the new queue mechanism to protect consumer consumption data.

2, Usage scenario of dead letter queue

Generally speaking, writing code requires a private message queue, so what is the situation that needs to consider the dead letter queue?

The following scenarios need to consider the situation of dead letter queue, which is summarized as follows:

  • The message that the consumer needs to receive is rejected (this rejection is a non resource behavior)
  • The lifetime of the message is expired
  • When the queue reaches the maximum length

3, Dead letter queue usage

When using the dead letter Queue, you need to set the Exchange and Queue of the dead letter Queue. In fact, the dead letter Queue can

A simpler way is to send messages to a under normal circumstances. Of course, the Exchange and Queue of a are

There is no problem, but in the process of message sending, it may be because A's message queue reaches the maximum or the TTL expires

And the consumer is refused to receive the message, then in this case, the message that the consumer needs to receive will be switched to B, then B

It is a dead letter Queue. Of course, there is no problem with the mapping relationship between the Exchange and Queue of the middle dead letter Queue.

4, Dead letter queue case practice

4.1 producer case code

        

package com.example.rabbitmq.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ProducerDlx
{
    private  static  final  String exchangeName="test_dlx_exchange";
    private  static  final  String routyKey="dlx.save";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        //Send specific data information through channel
        String msg = "Hello RabbitMQ QOS Message";

        for(int i=0;i<3;i++)
        {

            AMQP.BasicProperties properties= new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .build();

            channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
        }

    }
}

4.2. Consumer case code

package com.example.rabbitmq.dlx;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ConsumerDlx
{
    private static final String EXCHANGE = "test_dlx_exchange";
    private  static  final String queueName="test_dlx_queue";
    private  static  final  String routingKey="dlx.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();
            channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);

            Map<String,Object> arguments=new HashMap<String,Object>();

            //To declare the dead letter queue
            arguments.put("x-dead-letter-exchange","dlx.exchange");

            //The arguments attribute should be set to the declaration queue
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,EXCHANGE,routingKey);

            //Declare the dead letter queue (in fact, it can be regarded as a normal queue declaration)
            channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
            channel.queueDeclare("dlx.queue",true,false,false,null);
            channel.queueBind("dlx.queue","dlx.exchange","#"); //#Can match all routes

            //Set manual sign in method
            channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

In the mechanism of using dead letter queue, we must declare the mechanism of dead letter queue, that is, the code:

arguments.put("x-dead-letter-exchange","dlx.exchange");

Under normal circumstances, the message should have been sent to test_dlx_exchange, and then send it to test_dlx_queue

But this is only a normal case. In an abnormal case, the message will be sent to the dead letter queue, that is, to the new message queue

Exchange, which is the declared Dlx Exchange, and the message will also be sent to Dlx Queue.

4.3. Customize received messages

package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------\n");
        System.err.println("consumerTag:"+consumerTag);
        System.err.println("envelope:"+envelope);
        System.err.println("properties:"+properties);
        System.err.println("the message received:"+new String(body));

    }
}

4.4 demonstration of dead letter queue

In the screenshot above, the message will be sent to the queue under normal circumstances_ dlx_ Queue, we can also see that it is a dead letter queue, but

Due to the exception, we can finally see the message from the queue_ dlx_ Queue switch to Dlx Queue, so that even in the exception

In this case, the message can still be received. Thank you for reading and updating! Pay attention to my official account.

 

Added by mmcb on Thu, 27 Jan 2022 19:55:57 +0200