The callback queue of RabbitMQ solves the reason why RabbitMQ cannot receive messages due to abnormal conditions, but it is generally used in enterprises
It is not practical to use the replay queue, but more uses the dead letter queue mechanism to ensure that the consumer can receive specific messages,
In fact, it is essentially a guarantee mechanism for the reliability of message consumers.
1, What is a dead letter queue
The full name of the dead letter queue is Dead Letter Exchange, so the private letter queue is abbreviated as DLX. When the producer sends a message
If the consumer fails to receive the message, the message will be sent to the dead letter queue to ensure the consumption of the message. Message consumption in RabbitMQ
In the processing mechanism, when there is a dead letter in the queue, RabbitMQ will automatically switch to the republished Exchange, so as to explain the reason
To the new queue mechanism to protect consumer consumption data.
2, Usage scenario of dead letter queue
Generally speaking, writing code requires a private message queue, so what is the situation that needs to consider the dead letter queue?
The following scenarios need to consider the situation of dead letter queue, which is summarized as follows:
- The message that the consumer needs to receive is rejected (this rejection is a non resource behavior)
- The lifetime of the message is expired
- When the queue reaches the maximum length
3, Dead letter queue usage
When using the dead letter Queue, you need to set the Exchange and Queue of the dead letter Queue. In fact, the dead letter Queue can
A simpler way is to send messages to a under normal circumstances. Of course, the Exchange and Queue of a are
There is no problem, but in the process of message sending, it may be because A's message queue reaches the maximum or the TTL expires
And the consumer is refused to receive the message, then in this case, the message that the consumer needs to receive will be switched to B, then B
It is a dead letter Queue. Of course, there is no problem with the mapping relationship between the Exchange and Queue of the middle dead letter Queue.
4, Dead letter queue case practice
4.1 producer case code
package com.example.rabbitmq.dlx; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ProducerDlx { private static final String exchangeName="test_dlx_exchange"; private static final String routyKey="dlx.save"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("101.**.***.84"); connectionFactory.setPort(5672); connectionFactory.setUsername("wuya"); connectionFactory.setPassword("java"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //Send specific data information through channel String msg = "Hello RabbitMQ QOS Message"; for(int i=0;i<3;i++) { AMQP.BasicProperties properties= new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes()); } } }
4.2. Consumer case code
package com.example.rabbitmq.dlx; import com.example.rabbitmq.MyConsumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class ConsumerDlx { private static final String EXCHANGE = "test_dlx_exchange"; private static final String queueName="test_dlx_queue"; private static final String routingKey="dlx.#"; public static void main(String[] args) throws Exception { try{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("101.**.***.84"); connectionFactory.setPort(5672); connectionFactory.setUsername("wuya"); connectionFactory.setPassword("java"); connectionFactory.setVirtualHost("/"); Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); channel.exchangeDeclare(EXCHANGE,"topic",true,false,null); Map<String,Object> arguments=new HashMap<String,Object>(); //To declare the dead letter queue arguments.put("x-dead-letter-exchange","dlx.exchange"); //The arguments attribute should be set to the declaration queue channel.queueDeclare(queueName,true,false,false,arguments); channel.queueBind(queueName,EXCHANGE,routingKey); //Declare the dead letter queue (in fact, it can be regarded as a normal queue declaration) channel.exchangeDeclare("dlx.exchange","topic",true,false,null); channel.queueDeclare("dlx.queue",true,false,false,null); channel.queueBind("dlx.queue","dlx.exchange","#"); //#Can match all routes //Set manual sign in method channel.basicConsume(queueName,true,new MyConsumer(channel=channel)); }catch (Exception e){ e.printStackTrace(); } } }
In the mechanism of using dead letter queue, we must declare the mechanism of dead letter queue, that is, the code:
arguments.put("x-dead-letter-exchange","dlx.exchange");
Under normal circumstances, the message should have been sent to test_dlx_exchange, and then send it to test_dlx_queue
But this is only a normal case. In an abnormal case, the message will be sent to the dead letter queue, that is, to the new message queue
Exchange, which is the declared Dlx Exchange, and the message will also be sent to Dlx Queue.
4.3. Customize received messages
package com.example.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; public class MyConsumer extends DefaultConsumer { private Channel channel; /** * Constructs a new instance and records its association to the passed-in channel. * * @param channel the channel to which this consumer is attached */ public MyConsumer(Channel channel) { super(channel); this.channel=channel; } @Override public void handleDelivery( String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------------consumer---------------\n"); System.err.println("consumerTag:"+consumerTag); System.err.println("envelope:"+envelope); System.err.println("properties:"+properties); System.err.println("the message received:"+new String(body)); } }
4.4 demonstration of dead letter queue
In the screenshot above, the message will be sent to the queue under normal circumstances_ dlx_ Queue, we can also see that it is a dead letter queue, but
Due to the exception, we can finally see the message from the queue_ dlx_ Queue switch to Dlx Queue, so that even in the exception
In this case, the message can still be received. Thank you for reading and updating! Pay attention to my official account.