[TOC]
Dead letter
What is a dead letter
News becomes a dead letter without any consumers to consume
There are several situations when a message becomes a dead letter
- Message rejected (basic.reject/basic.nack), and request = false
- Message TTL expired
- Queue reaches maximum length
Dead Letter Exchange(DLX)
DLX
With DLX, when a message becomes a dead letter in a queue, it can be re publish ed to another exchange, which is DLX.
DLX is also a normal Exchange. It is no different from general Exchange. It can be specified on any queue. In fact, it is to set the properties of a queue. When there is dead letter in the queue, RabbitMQ will automatically send the dead letter message to the set DLX, and then it will be routed to another queue. You can listen to this queue for subsequent processing.
Dead letter queue settings
- Declare Exchange and queue of dead letter queue, and then bind
- Declare the normal queue Exchange and queue binding, but add the parameters arguments.put (x-dead-letter-Exchange "," you Dlx ") to the queue;
code implementation
producer
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Consumer manual ack and nack * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.abc"; // Declare exchange channel.exchangeDeclare(exchangeName, "topic"); String msg = "Normal message 1,routingKey:" + routingKey; AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build(); channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8")); // No consumer consumption for this message String msg2 = "Expired dead letter message 2, routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg2.getBytes("UTF-8")); String msg3 = "Expired dead letter message 3, routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg3.getBytes("UTF-8")); String msg4 = "Expired dead letter message 4, routingKey:" + routingKey; channel.basicPublish(exchangeName, routingKey, false, props, msg4.getBytes("UTF-8")); channel.close(); connection.close(); } }
producer can use message expiration to generate dead letter
Normal consumer
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * Consumer manual ack and nack * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Define Exchange for dead letter String dlxExchange = "dlx.exchange"; channel.exchangeDeclare(dlxExchange, "topic"); // Dead letter String dlxQueue = "dlx.queue"; channel.queueDeclare(dlxQueue, true, false, false, null); // #Indicates that all key s can be routed to the s dead letter queue String dlxRoutingKey = "#"; // Bind dead letter queue and exchange channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null); // Define normal consumer j listening queue String queueName = "test_dlx_queue"; String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; // Declare exchange channel.exchangeDeclare(exchangeName, "topic"); // Declare queue Map<String, Object> arguments = new HashMap<>(); // Set dead letter queue, arguments to be set on the declared queue arguments.put("x-dead-letter-exchange", dlxExchange); channel.queueDeclare(queueName, true, false, false, arguments); // Queue bound to exchange channel.queueBind(queueName, exchangeName, routingKey); channel.basicQos(1); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---Consumer-- " + new String(message.getBody(), "UTF-8")); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---Consumer--: cancelCallback "); } }; // For consumption message, autoAck must be set to false and ack manually channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
Operation result: only one normal message is consumed, and other expired ones are not consumed
Dlxconsmer, listening for messages in the consumption dead letter queue
package com.wyg.rabbitmq.javaclient.dlx; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * Listen to private message queue * * @author wyg0405@gmail.com * @date 2019-11-22 21:52 * @since JDK1.8 * @version V1.0 */ public class DLXConusmer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "dlx.queue"; String exchangeName = "dlx.exchange"; String routingKey = "#"; // Declare exchange channel.exchangeDeclare(exchangeName, "topic"); // Declare queue channel.queueDeclare(queueName, true, false, false, null); // Queue bound to exchange channel.queueBind(queueName, exchangeName, routingKey, null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("---Dead letter queue consumer---"); System.out.println(new String(message.getBody(), "UTF-8")); } finally { // consumer manual ack to broker channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---Consumer: cancelCallback"); } }; // Consumption message, autoAck must be set to false channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
Operation result: three expired messages enter the dead letter queue and are consumed