rabbitmq switch + dead letter queue + delay queue

5. Switch

In the previous section, we created a work queue. We assume that behind the work queue, each task is delivered to exactly one consumer (work process). In this section, we will do something completely different - we will convey the message to multiple consumers. This mode is called publish / subscribe

To illustrate this pattern, we will build a simple logging system. It will consist of two programs: the first program will issue log messages, and the second program is the consumer. We will start two consumers. One consumer will store the log on disk after receiving the message, and the other consumer will print the message on the screen after receiving the message. In fact, the log message sent by the first program will be broadcast to all consumers

5.1. Exchanges

5.1. 1. Exchange concept

The core idea of RabbitMQ messaging model is that messages produced by producers are never sent directly to queues. In fact, usually producers don't even know which queues these messages are delivered to.

On the contrary, the producer can only send messages to the exchange. The work of the switch is very simple. On the one hand, it receives messages from the producer, on the other hand, it pushes them into the queue. The switch must know exactly how to process the received message. Should these messages be put in a specific queue, or should they be put in many queues, or should they be discarded. This depends on the type of switch.

5.1. 2. Types of exchanges

There are the following types:
Direct, topic, headers, fan out

5.1. 3. Anonymous exchange

We didn't know anything about exchange earlier in this tutorial, but we can still send messages to queues. The reason why it can be implemented before is that we use the default exchange, which is identified by the empty string * * ("") *.

The first parameter is the name of the switch. An empty string indicates the default or unnamed switch: messages can be routed to the queue, which is actually specified by the routing key (binding key) binding key, if it exists

5.2. Temporary queue

In the previous chapter, we used queues with specific names (remember hello and ack_queue?). The name of the queue is crucial to us - we need to specify which queue of messages our consumers consume.

Whenever we connect to Rabbit, we need a new empty queue. Therefore, we can create a queue with random name, or let the server choose a random queue name for us. Secondly, once we disconnect the consumer, the queue will be deleted automatically.

Create a temporary queue as follows:
String queueName = channel.queueDeclare().getQueue();
After creation, it will grow like this:

5.3. Bindings

What is binding? Binding is actually a bridge between exchange and queue. It tells us that exchange is bound to that queue. For example, the following figure tells us that X is bound to Q1 and Q2

5.4. Fanout

5.4. 1. Introduction to fanout

Fanout is a very simple type. As you can guess from the name, it broadcasts all received messages to all queues it knows. There are some exchange types in the system by default

5.4.2. Fanout actual combat

The binding relationship between Logs and temporary queue is shown in the following figure

ReceiveLogs01 prints the received message on the console

package com.bcl.mq.five;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * fanout Fan out demo consumer 1
 * Broadcast all received messages to all queues he knows
 *
 * @author bcl
 * @date 2021/9/2
 */
public class ReceiveLogs01 {
    //Switch name
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * Declare a switch
         * 1.exchange Name of
         * 2.exchange Type of
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
         * Generate a temporary queue whose name is random
         * The queue is automatically deleted when the consumer disconnects from the queue
         */
        String queueName = channel.queueDeclare().getQueue();
        //Bind the temporary queue to our exchange, where the routingkey (also known as binding key) is an empty string
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("ReceiveLogs01 Waiting to receive message,Print the received message on the screen........... ");

        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ReceiveLogs01 The console prints the received message" + message);
        };

        //Consumer cancels message callback interface
        CancelCallback cancelCallback = consumerTag -> {};
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

ReceiveLogs02 prints the received message on the console

package com.bcl.mq.five;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * fanout Fan out demo consumer 2
 * Broadcast all received messages to all queues he knows
 *
 * @author bcl
 * @date 2021/9/2
 */
public class ReceiveLogs02 {
    //Switch name
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * Declare a switch
         * 1.exchange Name of
         * 2.exchange Type of
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
         * Generate a temporary queue whose name is random
         * The queue is automatically deleted when the consumer disconnects from the queue
         */
        String queueName = channel.queueDeclare().getQueue();
        //Bind the temporary queue to our exchange, where the routingkey (also known as binding key) is an empty string
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println("ReceiveLogs02 Waiting to receive message,Print the received message on the screen........... ");

        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ReceiveLogs02 The console prints the received message" + message);
        };

        //Consumer cancels message callback interface
        CancelCallback cancelCallback = consumerTag -> {
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

EmitLog sends a message to two consumers to receive

package com.bcl.mq.five;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * fanout Fan out demo producer
 * Broadcast all received messages to all queues he knows
 *
 * @author bcl
 * @date 2021/9/2
 */
public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * Declare a switch
             * 1.exchange Name of
             * 2.exchange Type of
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            Scanner sc = new Scanner(System.in);
            System.out.println("Please enter information");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("Producer sends message" + message);
            }
        }
    }
}

5.5. Direct exchange

5.5. 1. Review

In the previous section, we built a simple logging system. We can broadcast log messages to many recipients. In this section, we will add some special features to it - for example, we only let a consumer subscribe to some of the published messages. For example, we only store the serious error messages to the log file (to save disk space), and we can still print all log messages on the console. Let's review what is bindings again. Bindings are the bridge between switches and queues. It can also be understood that the queue is only interested in the messages of the switch to which it is bound. Binding is represented by the parameter: routingkey, which can also be called binding key. We use the code: channel queueBind(queueName, EXCHANGE_NAME, “routingKey”); The meaning after binding is determined by its exchange type.

5.5. 2. Introduction to direct exchange

In the previous section, our log system broadcasts all messages to all consumers. We want to make some changes. For example, we want the program that writes log messages to disk to receive only serious errors (errros) without storing warnings or information (info) log messages, so as to avoid wasting disk space. Fanout does not bring us much flexibility - it can only broadcast unconsciously. Here, we will replace it with direct. The working mode of this type is that the messages only go to the routingKey queue to which it is bound.

In the above figure, we can see that X binds two queues, and the binding type is direct. The binding key of queue Q1 is orange, and there are two binding keys of queue Q2: one is black and the other is green In this case, the producer publishes the message to exchange, and the message with the binding key orange will be published to queue Q1. Messages with the binding keys blackgreen and will be published to queue Q2, and messages of other message types will be discarded.

5.5. 3. Multiple binding

Of course, if the binding type of exchange is direct, but the key s of multiple queues it binds are the same, in this case, although the binding type is direct, it behaves a little similar to fanout, just like broadcasting, as shown in the above figure.

Consumer 1

package com.bcl.mq.six;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * Direct exchanger demo 1
 * Send messages to consumers directly bound and exchange bound routingKey
 *
 * @author bcl
 * @date 2021/9/2
 */
public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * Declare a switch
         * 1.exchange Name of
         * 2.exchange Type of
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /**
         * Generate a temporary queue whose name is random
         * The queue is automatically deleted when the consumer disconnects from the queue
         */
        String queueName = "console";
        channel.queueDeclare(queueName, false, false, false, null);

        //Bind the temporary queue to our exchange, where the routingkey (also known as binding key) is an empty string
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("ReceiveLogsDirect01 Waiting to receive message,Print the received message on the screen........... ");

        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ReceiveLogsDirect01 The console prints the received message" + message);
        };
        //Consumer cancels message callback interface
        CancelCallback cancelCallback = consumerTag -> {
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

Consumer 2

package com.bcl.mq.six;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * Direct exchange demo 2
 * Send messages to consumers directly bound and exchange bound routingKey
 *
 * @author bcl
 * @date 2021/9/2
 */
public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * Declare a switch
         * 1.exchange Name of
         * 2.exchange Type of
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /**
         * Generate a temporary queue whose name is random
         * The queue is automatically deleted when the consumer disconnects from the queue
         */
        String queueName = "disk";
        channel.queueDeclare(queueName, false, false, false, null);

        //Bind the temporary queue to our exchange, where the routingkey (also known as binding key) is an empty string
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println("ReceiveLogsDirect02 Waiting to receive message,Print the received message on the screen........... ");

        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("ReceiveLogsDirect02 The console prints the received message" + message);
        };
        //Consumer cancels message callback interface
        CancelCallback cancelCallback = consumerTag -> {
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

producer

package com.bcl.mq.six;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * Direct exchange demo producer
 * Send messages to consumers directly bound and exchange bound routingKey
 *
 * @author bcl
 * @date 2021/9/2
 */
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * Declare a switch
             * 1.exchange Name of
             * 2.exchange Type of
             */
//            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            Scanner sc = new Scanner(System.in);
            System.out.println("Please enter information: ");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                //routingKey direct exchanger
                channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
                System.out.println("Producer sends message" + message);
            }
        }
    }
}

5.6. Topics

5.6. 1. Previous types of questions

In the previous section, we improved the logging system. Instead of using a fanout switch that can only broadcast at will, we use a direct switch to selectively receive logs.

Although the direct switch has improved our system, it still has limitations - for example, the log type we want to receive is info Base and info Advance, a queue only wants info Base, then direct can't do it at this time. At this time, only topic type can be used

5.6.2. Topic requirements

Routing to messages of type topic switch_ The key cannot be written arbitrarily. It must meet certain requirements. It must be a word list separated by a dot. These words can be any word, such as "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" This type of. Of course, the word list cannot exceed 255 bytes at most. In this rule list, there are two substitutions that we should pay attention to

  • *(asterisk) can replace a word
  • #(pound sign) can replace zero or more words

5.6.3. Topic matching cases

The following figure shows the binding relationship

  • Q1 – > bound is

A string (. orange.) with 3 words in the middle

  • Q2 – > bound is

The last word is three words of rabbit (.. rabbit)
The first word is multiple words of lazy (lazy. #)

The figure above is a queue binding diagram. Let's take a look at the data reception between them

  • quick.orange.rabbit received by queue Q1Q2
  • lazy. orange. The elephant was received by queue Q1Q2
  • quick.orange.fox received by queue Q1
  • lazy.brown.fox received by queue Q2
  • lazy. pink. Although rabbit satisfies two bindings, it is only received by queue Q2 once
  • quick.brown.fox does not match. Any binding will not be received by any queue and will be discarded
  • quick.orange.male.rabbit is a four word mismatch, and any binding will be discarded
  • lazy.orange.male.rabbit is four words but matches Q2

When the queue binding relationship is as follows, it should be noted that when a queue binding key is #, the queue will receive all data, which is a bit like fanout. If there is no # and * in the queue binding key, the queue binding type is direct

5.6. 4. Actual combat

1. Consumers 1

package com.bcl.mq.seven;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * Switch topic mode
 * Consumer 1
 *
 * @author bcl
 * @date 2021/9/3
 */
public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //Declare Q1 queue and binding relationship
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("Q1 Waiting to receive message........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Q1 Receive queue:" + queueName + ",Binding key:" + delivery.getEnvelope().getRoutingKey() + ",news:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

2. Consumers 2

package com.bcl.mq.seven;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * Switch topic mode
 * Consumer 2
 *
 * @author bcl
 * @date 2021/9/3
 */
public class ReceiveLogsTopic02 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //Declare Q1 queue and binding relationship
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("Q2 Waiting to receive message........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Q2 Receive queue:" + queueName + ",Binding key:" + delivery.getEnvelope().getRoutingKey() + ",news:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

3. Producers

package com.bcl.mq.seven;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * Switch topic mode
 * producer
 *
 * @author bcl
 * @date 2021/9/3
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            /**
             * Q1-->Bound is
             * String with 3 words in the middle (*. orange. *)
             * Q2-->Bound is
             * The last word is three words of rabbit (*. *. rabbit)
             * The first word is multiple words of lazy (lazy. #)
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit", "By queue Q1Q2 Received");
            bindingKeyMap.put("lazy.orange.elephant", "By queue Q1Q2 Received");
            bindingKeyMap.put("quick.orange.fox", "By queue Q1 Received");
            bindingKeyMap.put("lazy.brown.fox", "By queue Q2 Received");
            bindingKeyMap.put("lazy.pink.rabbit", "Although two bindings are satisfied, they are only used by the queue Q2 Receive once");
            bindingKeyMap.put("quick.brown.fox", "Any binding that does not match will not be received by any queue and will be discarded");
            bindingKeyMap.put("quick.orange.male.rabbit", "If the four words do not match, any binding will be discarded");
            bindingKeyMap.put("lazy.orange.male.rabbit", "It's four words, but it doesn't match Q2");
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("Producer sends message" + message);
            }
        }
    }
}

Operation results:

The producer sends a message that the four words do not match, and any binding will be discarded
The message sent by the producer does not match. Any binding will not be received by any queue and will be discarded
The message sent by the producer is received by queue Q1Q2
The message sent by the producer is received by queue Q2
The message sent by the producer is received by queue Q1Q2
The message sent by the producer is received by queue Q1
Although the producer sends a message that satisfies two bindings, it is only received by queue Q2 once
The message sent by the producer is four words, but matches Q2

Q1 waiting to receive message
Q1 receive queue: Q1, binding key: lazy orange. Elephant, message: received by queue Q1Q2
Q1 receive queue: Q1, bind key: quick orange. Rabbit, message: received by queue Q1Q2
Q1 receive queue: Q1, bind key: quick orange. Fox, message: received by queue Q1

Q2 waiting to receive message
Q2 receive queue: Q2, binding key: lazy orange. Elephant, message: received by queue Q1Q2
Q2 receive queue: Q2, binding key: lazy brown. Fox, message: received by queue Q2
Q2 receive queue: Q2, bind key: quick orange. Rabbit, message: received by queue Q1Q2
Q2 receive queue: Q2, binding key: lazy pink. Rabbit, message: although two bindings are satisfied, it is only received by queue Q2 once
Q2 receive queue: Q2, binding key: lazy orange. male. Rabbit, message: it's four words, but it matches Q2

6. Dead letter queue

6.1. Concept of dead letter

First clarify the definition from the conceptual explanation. Dead letter, as the name suggests, is a message that cannot be consumed. The literal meaning can be understood as follows. Generally speaking, the producer delivers the message to the broker or directly to the queue. The consumer takes out the message from the queue for consumption, but sometimes some messages in the queue cannot be consumed due to specific reasons, If there is no subsequent processing for such a message, it will become a dead letter. If there is a dead letter, there will be a dead letter queue.

Application scenario: in order to ensure that the message data of the order business is not lost, the dead letter queue mechanism of RabbitMQ needs to be used. When the message consumption is abnormal, the message is put into the dead letter queue Another example: after the user successfully places an order in the mall and clicks to pay, it will automatically become invalid if it is not paid within the specified time

6.2. Source of dead letter

  1. Message TTL expired
  2. The queue has reached the maximum length (the queue is full and can no longer add data to mq)
  3. The message is rejected (basic.reject or basic.nack) and request = false

6.3. Dead letter actual combat

6.3. 1. Code architecture diagram

6.3. 2. Message TTL expires

Consumer C1 code (after startup, close the consumer to simulate that it cannot receive a message)

package com.bcl.mq.eight;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * Dead letter queue
 * Consumer 1
 *
 * @author bcl
 * @date 2021/9/3
 */
public class Consumer01 {
    //Common switch name
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //Dead letter switch name
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //Common queue name
    public static final String NORMAL_QUEUE = "normal_queue";
    //Dead letter queue name
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Declare the type of dead letter and common switch as direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //Declare dead letter queue
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //Bind dead letter switch and dead letter queue
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        //Declare normal queue
        Map<String, Object> arguments = new HashMap<>();
        //arguments.put("x-message-ttl", 100000);
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "lisi");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        //Bind common switch and common queue
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("Waiting to receive message...");
        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 The message received is:" + message);
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }

}

Producer code

package com.bcl.mq.eight;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;

/**
 * Dead letter queue
 * producer
 *
 * @author bcl
 * @date 2021/9/3
 */
public class Producer {
    //Common switch name
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Dead letter message setting TTL time 10000 = 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info: " + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
        }
    }
}

Consumer C2 code (after the above steps are completed, start C2 consumer to consume the messages in the dead letter queue)

package com.bcl.mq.eight;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * Dead letter queue
 * Consumer 2
 * Consume messages from the dead letter queue
 *
 * @author bcl
 * @date 2021/9/3
 */
public class Consumer02 {
    //Dead letter queue name
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Consumer02 Waiting to receive dead letter queue messages........... ");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer02 Message received" + message);
        };

        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }
}

Consumer02 waiting to receive dead letter queue messages
Consumer02 received message info: 1
Consumer02 received message info: 2
Consumer02 received message info: 3
Consumer02 received message info: 4
Consumer02 received message info: 5
Consumer02 received message info: 6
Consumer02 received message info: 7
Consumer02 received message info: 8
Consumer02 received message info: 9
Consumer02 received message info: 10

6.3. 3. The queue has reached the maximum length

1. Remove TTL attribute from message producer code

package com.bcl.mq.eight;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.AMQBasicProperties;

/**
 * Dead letter queue
 * producer
 *
 * @author bcl
 * @date 2021/9/3
 */
public class Producer {
    //Common switch name
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Dead letter message setting TTL time 10000 = 10s
//        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String message = "info: " + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
        }
    }
}

2. C1 consumer modifies the following code (after startup, close the consumer to simulate that it cannot receive a message)

//Set the normal queue length limit arguments put(“x-max-length”, 6);

code:

package com.bcl.mq.eight;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * Dead letter queue
 * Consumer 1
 *
 * @author bcl
 * @date 2021/9/3
 */
public class Consumer01 {
    //Common switch name
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //Dead letter switch name
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //Common queue name
    public static final String NORMAL_QUEUE = "normal_queue";
    //Dead letter queue name
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Declare the type of dead letter and common switch as direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //Declare dead letter queue
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //Bind dead letter switch and dead letter queue
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        //Declare normal queue
        Map<String, Object> arguments = new HashMap<>();
        //arguments.put("x-message-ttl", 100000);
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "lisi");
        //Set normal queue length limit
        arguments.put("x-max-length", 6);

        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        //Bind common switch and common queue
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("Waiting to receive message...");
        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 The message received is:" + message);
        };

        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
        });
    }

}

Note that the original queue needs to be deleted at this time because the parameters have changed

3. C2 consumer code remains unchanged (start C2 consumer)

Consumer02 waiting to receive dead letter queue messages
Consumer02 received message info: 1
Consumer02 received message info: 2
Consumer02 received message info: 3
Consumer02 received message info: 4

6.3. 4. Message rejected

1. The message producer code is the same as that of the producer

2.C1 consumer code (after startup, close the consumer to simulate that it cannot receive a message)

//Consumer received message callback interface delivercallback delivercallback = (consumertag, delivery) - >{
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info:5")) {
System.out.println("the message received by Consumer01 is:" + message + ", and this message is rejected");
_// false if you do not re insert it, it is called dead letter
_ channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("the message received by Consumer01 is:" + message ");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//Start the manual response channel basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
});

code

package com.bcl.mq.eight;

import com.bcl.mq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

/**
 * Dead letter queue
 * Consumer 1
 *
 * @author bcl
 * @date 2021/9/3
 */
public class Consumer01 {
    //Common switch name
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //Dead letter switch name
    public static final String DEAD_EXCHANGE = "dead_exchange";

    //Common queue name
    public static final String NORMAL_QUEUE = "normal_queue";
    //Dead letter queue name
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //Declare the type of dead letter and common switch as direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //Declare dead letter queue
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        //Bind dead letter switch and dead letter queue
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");

        //Declare normal queue
        Map<String, Object> arguments = new HashMap<>();
        //arguments.put("x-message-ttl", 100000);
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "lisi");
        //Set normal queue length limit
//        arguments.put("x-max-length", 6);

        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
        //Bind common switch and common queue
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("Consumer01 Waiting to receive message...");
        //Consumer receive message callback interface
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if (message.equals("info:5")) {
                System.out.println("Consumer01 The message received is:" + message + ",This message was rejected");
                //false if you do not re insert it, it is called dead letter
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer01 The message received is:" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        //Turn on manual answer
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
        });
    }

}

3.C2 consumer code unchanged
Start consumer 1 and then consumer 2

Consumer02 waiting to receive dead letter queue messages
Consumer02 received message info:5

Consumer01 waiting to receive message
The message received by Consumer01 is: info:1
The message received by Consumer01 is: info:2
The message received by Consumer01 is: info:3
The message received by Consumer01 is: info:4
The message received by Consumer01 is: info:5. This message is rejected
The message received by Consumer01 is: info:6
The message received by Consumer01 is: info:7
The message received by Consumer01 is: info:8
The message received by Consumer01 is: info:9
The message received by Consumer01 is: info:10

7. Delay queue

7.1. Delay queue concept

The delay queue is orderly. The most important feature is reflected in its delay attribute. The elements in the delay queue want to be taken out and processed after or before the specified time. In short, the delay queue is a queue used to store the elements that need to be processed at the specified time.

7.2. Delay queue usage scenario

  • 1. If the order is not paid within ten minutes, it will be automatically cancelled
  • 2. If the newly created store has not uploaded goods within ten days, it will automatically send a message reminder.
  • 3. After successful registration, if the user does not log in within three days, a short message reminder will be sent.
  • 4. The user initiates a refund, and if it is not handled within three days, notify the relevant operators.
  • 5. After the scheduled meeting, all participants shall be notified to attend the meeting ten minutes before the scheduled time point

These scenarios have a feature that a task needs to be completed at a specified time point after or before an event occurs. For example, when an order generation event occurs, check the payment status of the order ten minutes later, and then close the unpaid order; It seems that using a scheduled task, polling the data all the time, checking once a second, taking out the data to be processed, and then processing is finished? If the amount of data is small, this can be done. For example, for the demand of "automatic settlement if the bill is not paid within one week", if the time is not strictly limited, but a week in a loose sense, running a regular task every night to check all unpaid bills is indeed a feasible scheme. However, for scenes with large amount of data and strong timeliness, For example: "if the order is not paid within ten minutes, it will be closed ", there may be a lot of unpaid order data in the short term, even reaching the level of millions or even tens of millions during the event. It is obviously undesirable to still use polling for such a huge amount of data. It is likely that the inspection of all orders can not be completed in one second. At the same time, it will put great pressure on the database, fail to meet business requirements and have low performance.

7.3. TTL in rabbitmq

What is TTL? TTL is the attribute of a message or queue in RabbitMQ, indicating the maximum lifetime of a message or all messages in the queue, in milliseconds. In other words, if a message has the TTL attribute set or enters the queue with the TTL attribute set, the message will become a "dead letter" if it is not consumed within the time set by the TTL. If both the TTL of the queue and the TTL of the message are configured, the smaller value will be used. There are two ways to set the TTL.

7.3. 1. Queue setting TTL

The first is to set the "x-message-ttl" attribute of the queue when creating the queue

7.3. 2. Message setting TTL

Another way is to set TTL for each message

7.3. 3. Difference between the two

If the TTL attribute of the queue is set, once the message expires, it will be discarded by the queue (if the dead letter queue is configured, it will be thrown into the dead letter queue). In the second way, even if the message expires, it may not be discarded immediately, because whether the message expires is determined before it is delivered to the consumer. If the current queue has a serious message backlog, Then expired messages may survive for a long time; In addition, you need to
It should be noted that if TTL is not set, the message will never expire. If TTL is set to 0, the message will be discarded unless it can be delivered directly to the consumer at this time.
In the previous section, we introduced the dead letter queue and just introduced TTL. So far, the two elements of using RabbitMQ to realize the delay queue have been collected. Next, we just need to integrate them and add a little seasoning to make the delay queue fresh. Think about it. The delay queue is just how long you want messages to be processed. TTL can just make messages become dead letters after how long they are delayed. On the other hand, messages that become dead letters will be delivered to the dead letter queue. In this way, consumers only need to consume the messages in the dead letter queue all the time, because the messages inside are messages that they want to be processed immediately.

7.4. Integrating springboot

7.4. 1. Create project

1. Create a springBoot project springBoot rabbit

2.pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.bcl.mq</groupId>
    <artifactId>springboot-rabbit</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbit</name>
    <description>com.bcl.mq</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--RabbitMQ rely on-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ Test dependency-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.application.properties

spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123

**4.** Add Swagger configuration class

package com.bcl.mq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq Interface documentation")
                .description("This document describes rabbitmq Microservice interface definition")
                .version("1.0")
                .contact(new Contact("enjoy6288", "http://atguigu.com", "1551388580@qq.com")).build();
    }
}

7.5. Queue TTL

7.5. 1. Code architecture diagram

  • Create two queues QA and QB, and set their TTL S to 10S and 40S respectively,
  • Then create a switch X and a dead letter switch Y, both of which are direct,
  • Create a dead letter queue QD, and their binding relationship is as follows

7.5. 2. Configuration file

package com.bcl.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    //Common switch name
    public static final String X_EXCHANGE = "X";
    //Dead letter switch name
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    //Common queue name
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //Dead letter queue name
    public static final String DEAD_LETTER_QUEUE = "QD";

    //Declare xExchange alias
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    //Declare yExchange alias
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //Declare that the queueA queue TTL is 10ms
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> arguments = new HashMap<>();
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //The expiration time is in ms
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    //Declare that the queueB queue TTL is 40ms
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> arguments = new HashMap<>();
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //The expiration time is in ms
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    //queueA binding xExchange
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //queueB binding xExchange
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //Declare dead letter queue
    @Bean("queueD")
    public Queue queueD() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
//        return new Queue(DEAD_LETTER_QUEUE);
    }

    //queueD binding yExchange
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

7.5. 3. Message producer

package com.bcl.mq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@RestController
@Slf4j
@RequestMapping("/tt1")
public class SendMsgController {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("current time :{},Send a message to two TTL queue:{}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X", "XA", "Message from ttl For 10 S Queue of: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "Message from ttl For 40 S Queue of: " + message);
    }
}

7.5. 4 message consumer (listener)

package com.bcl.mq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    /**
     * Listening private message queue
     * If the message is not consumed after timeout, it will be thrown into the bound dead letter queue
     * @author bcl
     * @date 2021/9/6 2:56 PM
     */
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("Current time:{},Received dead letter queue information:{}", new Date().toString(), msg);
    

Test: initiate a request http://localhost:8080/tt1/sendMsg/ Hahaha 2

  • The first message becomes a dead letter message after 10S and is consumed by the consumer. The second message becomes a dead letter message after 40S and is consumed. Such a delay queue is completed.
  • However, if it is used in this way, it is necessary to add a queue every time a new time demand is added. There are only two time options: 10S and 40S. If it needs to be processed after one hour, it is necessary to add a queue with TTL of one hour. If it is to book a meeting room and notify it in advance, Isn't it necessary to add countless queues to meet the demand?

7.6. Delay queue optimization

7.6. 1. Code architecture diagram

A new queue QC is added here. The binding relationship is as follows. The queue does not set TTL time

7.6. 2. Configuration file class code

package com.bcl.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    //Common switch name
    public static final String X_EXCHANGE = "X";
    //Dead letter switch name
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    //Common queue name
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String QUEUE_C = "QC";
    //Dead letter queue name
    public static final String DEAD_LETTER_QUEUE = "QD";

    //Declare xExchange alias
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    //Declare yExchange alias
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //Declare that the queueA queue TTL is 10ms
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> arguments = new HashMap<>(3);
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //The expiration time is in ms
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    //Declare that the queueB queue TTL is 40ms
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> arguments = new HashMap<>(3);
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //The expiration time is in ms
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }
    //Declare that the queueB queue TTL is 40ms
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> arguments = new HashMap<>(3);
        //Normal queue setting dead letter switch
        arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //Set the dead letter routing key parameter for the normal queue. The key is a fixed value
        arguments.put("x-dead-letter-routing-key", "YD");
        //The expiration time is in ms
//        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }

    //queueA binding xExchange
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //queueB binding xExchange
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //queueC binding xExchange
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

    //Declare dead letter queue
    @Bean("queueD")
    public Queue queueD() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
//        return new Queue(DEAD_LETTER_QUEUE);
    }

    //queueD binding yExchange
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

7.6. 3. Message producer code

Initiate request
http://localhost:8080/ttl/sendExpirationMsg/ Hello 1 / 20000
http://localhost:8080/ttl/sendExpirationMsg/ Hello 2 / 2000

It seems that there is no problem, but at the beginning, I introduced how to set TTL on message properties, Messages may not "die" on time, because RabbitMQ will only check whether the first message expires. If it expires, it will be thrown into the dead letter queue. If the delay time of the first message is very long and the delay time of the second message is very short, the second message will not be executed first.

7.7. The rabbitmq plug-in implements delay queues

The problem mentioned above is indeed a problem. If the TTL on message granularity cannot be realized and it dies in time at the set TTL time, it cannot be designed into a general delay queue. Then how to solve it? Next, let's solve the problem.

7.7. 1. Install the delay queue plug-in

1. General installation

Download on the official website https://www.rabbitmq.com/community-plugins.html , Download rabbitmq_delayed_message_exchange plug-in, and then unzip and place it in the plug-in directory of RabbitMQ. Enter the plgins directory under the RabbitMQ installation directory, execute the following command to make the plug-in effective, and then restart RabbitMQ/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2.docker installation

https://my.oschina.net/bocl/blog/5196731

7.7. 2. Code structure diagram

A new queue, delayed, is added here Queue, a custom switch delayed For exchange, the binding relationship is as follows:

7.7. 3. Configuration file class code

In our customized switch, this is a new exchange type. This type of message supports the delayed delivery mechanism. After message delivery, it will not be delivered to the target queue immediately, but will be stored in the mnesia (a distributed data system) table. When the delivery time is reached, it will be delivered to the target queue.

package com.bcl.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * Delay plug-in queue configuration
 * @author bcl

 * @date 2021/9/6
 */
@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //Custom switch what we define here is a delay switch
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();//Custom switch type
        args.put("x-delayed-type", "direct");
        /**
         * 1.Switch name
         * 2.Switch Type 
         * 3.Need persistence
         * 4.Need persistence
         * 5.Other parameters
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

7.7. 4. Message producer code

    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    /**
     * Plug in based message sending
     *
     * @author bcl
     * @date 2021/9/6
     */
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                correlationData -> {
                    correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData;
                });
        log.info(" Current time: {}, Send a delay message {} Milliseconds of information to the queue delayed.queue:{}", new Date(), delayTime, message);
    }

7.7. 5. Message consumer code

Initiate request:
http://localhost:8080/tt1//sendDelayMsg/baby1/20000
http://localhost:8080/tt1//sendDelayMsg/baby2/2000
The second message was consumed first, in line with expectations

Current time: Mon Sep 06 17:07:00 CST 2021, sending a message with a delay of 20000 milliseconds to the queue delayed queue:baby1
Current time: Mon Sep 06 17:07:02 CST 2021, sending a message with a delay of 2000 ms to the queue delayed queue:baby2
Current time: Mon Sep 06 17:07:04 CST 2021, received delay queue information: baby2
Current time: Mon Sep 06 17:07:20 CST 2021, received delay queue information: baby1

7.8. summary

Delay queue is very useful in situations where delay processing is required. Using RabbitMQ to implement delay queue can make good use of the characteristics of RabbitMQ, such as reliable message sending, reliable message delivery and dead letter queue to ensure that messages are consumed at least once and messages that are not processed correctly will not be discarded. In addition, through the characteristics of RabbitMQ cluster, the single point of failure problem can be well solved, and the delay queue will not be unavailable or messages will not be lost because a single node hangs up. Of course, there are many other options for delay queue, such as using Java's DelayQueue, Redis's zset, Quartz or kafka's time wheel. These methods have their own characteristics, depending on the applicable scenarios

Keywords: Java RabbitMQ Distribution message queue

Added by nileshkulkarni on Mon, 27 Dec 2021 01:21:58 +0200