RabbitMQ switch, dead letter, SpringBoot integration and cluster construction

1. Switch Exchanges

1.1,Exchanges

What are Exchanges?

The core idea of RabbitMQ messaging model is that messages produced by producers are never sent directly to queues. In fact, usually producers don't even know which queues these messages are delivered to.

On the contrary, the producer can only send messages to the exchange. The work of the switch is very simple. On the one hand, it receives messages from the producer, on the other hand, it pushes them into the queue. The switch must know exactly how to process the received message. Should these messages be put in a specific queue, or should they be put in many queues, or should they be discarded. This depends on the type of switch.

Type of switches

  • Direct
  • Topic
  • Headers
  • Fan out

Anonymous exchange

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

The first parameter is the name of the switch. An empty string indicates the default or unnamed switch: messages can be routed to the queue, which is actually specified by the routing key (binding key) binding key, if it exists.

1.2 temporary queue

Whenever we connect to Rabbit, we need a new empty queue. Therefore, we can create a queue with random name, or let the server choose a random queue name for us. Secondly, once we disconnect the consumer, the queue will be deleted automatically.

Create a temporary queue as follows:

String queueName = channel.queueDeclare().getQueue();

1.3. bindings

binding is actually a bridge between exchange and queue. It tells us exchange and the queue
Columns are bound.

1.4 fan out

Fanout is a very simple type. As you can guess from the name, it broadcasts all received messages to all queues it knows. There are some exchange types in the system by default. The relationship between switch and temporary queue is as follows.

In short, a message can be consumed by multiple consumers after it is sent through the switch. A common example in life is the message sent by one person in wechat group or qq group, and other group members can receive it. The following is the code analysis. Here, two consumers need to bind to the switch to receive messages. The codes of the two consumers are consistent. Here's a new code. In addition, RabbitmqUtil here is a tool class for the production channel extracted before. You can refer to this blog: RabbitMQ installation and message sending and receiving of message middleware

    // Switch
    public static final String EXCHANGE = "log";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Claim switch
        channel.exchangeDeclare(EXCHANGE, "fanout");
        // Declare temporary queue
        String queueName = channel.queueDeclare().getQueue();
        // Bind
        channel.queueBind(queueName, EXCHANGE, "");
        System.out.println("Waiting to receive");

        // receive
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "Consumption interruption");
        };
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }

After that, the producer sends a message, which is bound to the switch in the same way.

    // Switch
    public static final String EXCHANGE = "log";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Claim switch
        channel.exchangeDeclare(EXCHANGE, "fanout");

        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()) {
            // Message content
            String message = sc.next();
            channel.basicPublish(EXCHANGE, "", null, message.getBytes());
            System.out.println("Sent successfully");
        }
    }

Then run it and send messages. You can see the effect. Messages will be received by both consumers, which is fanout.

1.5. Direct

The queue is only interested in the messages of the switch to which it is bound. Binding is represented by a parameter: routingKey, which can also be called binding key. We use code to create binding, and the meaning after binding is determined by its exchange type

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

For the direct type, that is, the bound key of the sending message is unique, that is, the sent message can only be received by a switch with a bound key value.

The consumer code is as follows: here, we can bind multiple key values to a queue, that is, bind info and warning to the console.

    public static final String EXCHANGE = "direct";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Claim switch
        channel.exchangeDeclare(EXCHANGE, "direct");
        // Declaration queue
        channel.queueDeclare("console", true, false, false, null);
        // Bind
        channel.queueBind("console", EXCHANGE, "info");
        channel.queueBind("console", EXCHANGE, "warning");
        System.out.println("Waiting to receive");

        // receive
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "Consumption interruption");
        };
        channel.basicConsume("console", true, deliverCallback, cancelCallback);
    }

Similarly, another consumer code is the same. You only need to modify the queue name and the bound key value. Finally, we can see it in the browser.

Finally, the producer. Here we send the message to the key value of info, that is, to the first consumer to check the effect.

    public static final String EXCHANGE = "direct";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()) {
            // Message content
            String message = sc.next();
            channel.basicPublish(EXCHANGE, "info", null, message.getBytes());
            System.out.println("Sent successfully");
        }
    }

1.6 Topic

Routing to messages of type topic switch_ The key cannot be written arbitrarily. It must meet certain requirements. It must be a word list separated by a dot. These words can be any word, such as "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" This type of. Of course, the word list cannot exceed 255 bytes at most.

In this rule list, there are two substitutes to be noted:

  • *(asterisk) can replace a word
  • #(pound sign) can replace zero or more words

First, we give some matching rules before writing the code, as shown in the figure below. (this is the switch information after the code is generated. Here, we put the screenshot first for understanding.)

Then we use the code for demonstration,

    public static final String EXCHANGE = "topic";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Claim switch
        channel.exchangeDeclare(EXCHANGE, "topic");
        // Declaration queue
        channel.queueDeclare("T1", true, false, false, null);
        // Bind
        channel.queueBind("T1", EXCHANGE, "*.error");
        channel.queueBind("T1", EXCHANGE, "*.info.*");
        System.out.println("T1 Waiting to receive");

        // receive
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
            System.out.println("Binding value:" + message.getEnvelope().getRoutingKey());
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "Consumption interruption");
        };
        channel.basicConsume("T1", true, deliverCallback, cancelCallback);
    }

For another consumer, we only need to change the topic topic matching rule, and then we add multiple messages to the producer to send and view. That consumer will receive the message for consumption.

    public static final String EXCHANGE = "topic";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        Map<String, String> map = new HashMap();
        map.put("info", "info");
        map.put("error", "error");
        map.put("error.info", "error.info");
        map.put("info.error", "info.error");
        map.put("a", "a");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            String message = entry.getValue();
            channel.basicPublish(EXCHANGE, key, null, message.getBytes());
            System.out.println("Sent successfully");
        }
    }

2. Dead letter queue

2.1 what is dead letter?

First clarify the definition from the conceptual explanation. Dead letter, as the name suggests, is a message that cannot be consumed. The literal meaning can be understood as follows. Generally speaking, the producer delivers the message to the broker or directly to the queue. The consumer takes out the message from the queue for consumption, but sometimes some messages in the queue cannot be consumed due to specific reasons, If there is no subsequent processing for such a message, it will become a dead letter. If there is a dead letter, there will be a dead letter queue.

Application scenario: in order to ensure that the message data of the order business is not lost, the dead letter queue mechanism of RabbitMQ needs to be used. When the message consumption is abnormal, the message is put into the dead letter queue Another example: after the user successfully places an order in the mall and clicks to pay, it will automatically become invalid if it is not paid within the specified time.

2.2 source of dead letter

  • Message TTL expired
  • The queue has reached the maximum length (the queue is full and can no longer add data to mq)
  • The message is rejected (basic.reject or basic.nack) and request = false


2.3 message TTL expiration

First of all, it is demonstrated here that two consumers are used to receive messages, one of which forwards ordinary messages to dead letter messages, and the other receives messages from dead letter queue. First, consumer 1:

    // General switch
    public static final String NORMAL_EXCHANGE = "normalExhange";
    // Dead letter switch
    public static final String DEAD_EXCHANGE = "deadExchange";
    // Normal queue
    public static final String NORMAL_QUEUE = "normalQueue";
    // Dead letter queue
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Claim switch
        channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
        channel.exchangeDeclare(DEAD_EXCHANGE, "direct");
        // Declaration queue
        Map<String, Object> map = new HashMap<>();
        // Expiration time
        // map.put("x-message-ttl",100000);
        // Specify the corresponding dead letter switch
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // Set the specified dead letter key
        map.put("x-dead-letter-routing-key", "dead");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        // Bind switches and queues
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
        // receive
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "Consumption interruption");
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
    }

After that, a new generator is added to send messages to the general switch, and the message delay is set.

    public static final String NORMAL_EXCHANGE = "normalExhange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // Send a dead letter
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message = i + "";
            channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes());
        }
        System.out.println("Sent successfully");
    }

After that, run the consumer. The corresponding switch and queue are created on the web side. Then we stop the consumer to simulate the consumer's downtime, and then run the generator. Here, we can see that because the consumer's service is stopped, the ordinary switch will forward the message to the dead letter queue, Another consumer can be added to receive messages from the dead letter queue:

    // Dead letter queue
    public static final String DEAD_QUEUE = "deadQueue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();
        // receive
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "Consumption interruption");
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
    }

2.4. The queue reaches the maximum length

First, we need to add a maximum length limit to the map among consumer 1,

	map.put("x-max-length",6);

In addition, there is no need to set the delay setting among producers. Then, similarly, start consumer 1 to generate the queue, stop consumer 1, and then start the producer to send messages. It can be seen that only 6 messages can be stored in the ordinary queue, and the additional 4 messages are stored in the dead letter queue.

2.5. Message rejected

We add a message reject to consumers. Here, we reject a message with message 5.



After that, we all run up and check the messages in the dead letter queue. We can see a message, that is, the rejected 5.

3. Delay queue

3.1 what is the delay queue?

The delay queue is orderly. The most important feature is reflected in its delay attribute. The elements in the delay queue want to be taken out and processed after or before the specified time. In short, the delay queue is a queue used to store the elements that need to be processed at the specified time.

3.2. Integration of RabbitMQ and SpringBoot

For SpringBoot, refer to: Detailed explanation of SpringBoot
Swagger can refer to: Swagger2 details

We integrate RabbitMQ in Springboot. First, we import related dependencies;

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ Test dependency-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

Create two queues QA and QB, and set their TTL S to 10S and 40S respectively. Then create a switch X and a dead letter switch Y, both of which are direct. Create a dead letter queue QD. Their binding relationship is as follows: demonstrate the code architecture diagram:

First add the configuration class:

package com.lzq.rabbit.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    // General switch
    public static final String X_EXCHANGE = "xExchange";
    // Dead letter switch
    public static final String Y_DEAD_EXCHANGE = "yExchange";
    // Normal queue
    public static final String QUEUE_A = "Q_A";
    public static final String QUEUE_B = "Q_B";
    // Dead letter queue
    public static final String DEAD_QUEUE_C = "Q_C";

    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_EXCHANGE);
    }

    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //Declare the dead letter switch bound by the current queue
        args.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        //Declare the dead letter routing key of the current queue
        args.put("x-dead-letter-routing-key", "YC");
        //Declare the TTL of the queue
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //Declare the dead letter switch bound by the current queue
        args.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        //Declare the dead letter routing key of the current queue
        args.put("x-dead-letter-routing-key", "YC");
        //Declare the TTL of the queue
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    @Bean("queueC")
    public Queue queueC() {
        return QueueBuilder.durable(DEAD_QUEUE_C).build();
    }

    // Bind
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    @Bean
    public Binding queueCBindingY(@Qualifier("queueC") Queue queueC, @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueC).to(yExchange).with("YC");
    }
}

Then add a controller to send the request and simulate sending the message:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/{message}")
    public void sendMsg(@PathVariable String message){
        System.out.println(message+""+new Date().toString());
        // send message
        rabbitTemplate.convertAndSend("xExchange","XA","Message from 10 s :"+message);
        rabbitTemplate.convertAndSend("xExchange","XB","Message from 40 s :"+message);
    }

Finally, add a consumer

    @RabbitListener(queues = "Q_C")
    public void receiveC(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.out.println("Dead letter:" + msg + "" + new Date().toString());
    }

Send request: http://localhost:8080/send/12345

The first message becomes a dead letter message after 10S and is consumed by the consumer. The second message becomes a dead letter message after 40S and is consumed. Such a delay queue is completed.

However, if it is used in this way, it is necessary to add a queue every time a new time demand is added. There are only two time options: 10S and 40S. If it needs to be processed after one hour, it is necessary to add a queue with TTL of one hour. If it is to book a meeting room and notify it in advance, Isn't it necessary to add countless queues to meet the demand?

Add a new queue QD to bind with X and Y switches. New code:

    public static final String QUEUE_D = "Q_D";
    
    @Bean("queueD")
    public Queue queueD() {
        Map<String, Object> args = new HashMap<>(3);
        //Declare the dead letter switch bound by the current queue
        args.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
        //Declare the dead letter routing key of the current queue
        args.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_D).withArguments(args).build();
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueD).to(xExchange).with("XC");
    }

Add an interface to test:

    @GetMapping("/sendLetter/{message}/{ttlTime}")
    public void sendLetterMsg(@PathVariable String message,@PathVariable String ttlTime){
        // send message
        rabbitTemplate.convertAndSend("xExchange","XC","Message from 10 s :"+message+", Duration:" + ttlTime,msg->{
            // Set duration
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
        log.info("Current time:{},Length of time to send a message{}millisecond TTL Message to queue C:{}", new Date(),ttlTime, message);
    }

4. Release confirmation - Advanced

In the production environment, RabbitMQ is restarted due to some unknown reasons. During RabbitMQ restart, the delivery of producer messages fails, resulting in message loss, which needs to be processed and recovered manually. So we began to think, how can we reliably deliver RabbitMQ messages? Especially in such extreme cases, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages?

4.1 release and confirm the springboot version

Here, a simple publish confirmation is used. First, add a configuration class:

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE = "confirmExchange";
    public static final String CONFRIM_QUEUE = "confirmQueue";
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFRIM_QUEUE).build();
    }
    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

Request to open a new interface:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/confirm/{message}")
    public void confirm(@PathVariable String message){
        rabbitTemplate.convertAndSend("confirmExchange","key1","Message from:"+message);
    }

Add a consumer:

@Slf4j
@Component
public class ConfirmConsum {
    @RabbitListener(queues = ConfirmConfig.CONFRIM_QUEUE)
    public void receive(Message message, Channel channel) throws Exception {
        log.info("Current time:{},Receive the message{}", new Date().toString(), message);
    }
}

Add a callback interface: and in the configuration file:

spring.rabbitmq.publisher-confirm-type=correlated

Then write the interface

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * A callback method for the switch whether it receives a message or not
     * Message related data
     * ack Switch, receive message
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("Switch has received id by:{}News of", id);
        } else {
            log.info("The switch has not received id by:{}news,For reasons:{}", id, cause);
        }
    }
}

Send messages. When the name of the switch in the controller is modified to send messages, the switch does not exist, which leads to the loss of messages.

4.2 message fallback

Mandatory parameter
When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message. If it is found that the message is not routable, the message will be directly discarded. At this time, the producer does not know the event that the message is discarded. So how can I get messages that can't be routed to help me find a way to deal with them? At least let me know. I can handle it myself. By setting the mandatory parameter, the message can be returned to the producer when the destination is unreachable during message delivery.

Override the returnedMessage method in the callback interface,

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setMandatory(true);
        //Set who will handle the fallback message
        rabbitTemplate.setReturnsCallback(this);

    }

    /**
     * A callback method for the switch whether it receives a message or not
     * Message related data
     * ack Switch, receive message
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("Switch has received id by:{}News of", id);
        } else {
            log.info("The switch has not received id by:{}news,For reasons:{}", id, cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println(returnedMessage.getMessage());
        log.error(" News {}, Switched {} Return, return reason :{}, route key:{}",
                returnedMessage.getMessage().getBody(), returnedMessage.getExchange(),
                returnedMessage.getReplyText(), returnedMessage.getRoutingKey());
    }
}

Modify the routingkey value in the message sending interface to simulate that the switch cannot send messages to the queue. Run the code to view the results:

4.3 backup switch

With the mandatory parameter and fallback message, we gain the perception of undeliverable messages, and have the opportunity to find and process producer messages when they cannot be delivered. But sometimes, we don't know how to handle these messages that can't be routed. We can make a log at most, then trigger an alarm, and then handle them manually. It is not elegant to handle these unrouted messages through logs, especially when the producer's service has multiple machines, manual copying logs will be more troublesome and error prone. Moreover, setting the mandatory parameter will increase the complexity of producers and need to add logic to deal with these returned messages. What if you don't want to lose messages and increase the complexity of producers? In the previous article on setting the dead letter queue, we mentioned that the dead letter switch can be set for the queue to store those failed messages, but these non routable messages have no chance to enter the queue, so the dead letter queue cannot be used to save messages.

In RabbitMQ, there is a backup switch mechanism, which can deal with this problem well. What is a backup switch? The backup switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a switch, we create a spare tire for it. When the switch receives a non routable message, it will forward the message to the backup switch for forwarding and processing, Usually, the backup switch is Fanout, so that all messages can be delivered to the bound queue. Then we bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter the queue. Of course, we can also establish an alarm queue to monitor and alarm with independent consumers.

5. RabbitMQ -- idempotency, priority, inertia

5.1 idempotency

What is idempotency?

The results of one request or multiple requests initiated by the user for the same operation are consistent, and there will be no side effects due to multiple clicks. The simplest example is payment. The user pays after purchasing goods, and the payment deduction is successful. However, when the result is returned, the network is abnormal. At this time, the money has been deducted. The user clicks the button again, and the second deduction will be made. The result is returned successfully. The user queries the balance and finds that more money has been deducted, and the daily record has become two. In the previous single application system, we only need to put the data operation into the transaction and roll back immediately when an error occurs, but there may be network interruption or exceptions when responding to the client.

Message repeat consumption

When a consumer consumes a message in MQ, MQ has sent the message to the consumer. When the consumer returns an ack to MQ, the network is interrupted, so MQ does not receive a confirmation message. The message will be re sent to other consumers, or sent to the consumer again after the network is reconnected. In fact, the consumer has successfully consumed the message, Causing consumers to consume repeated messages.

Solution ideas

The idempotency of MQ consumers is generally solved by using the global id or writing a unique id, such as timestamp or UUID or order. Consumers can also use the id of MQ to judge whether they consume messages in MQ, or they can generate a global unique id according to their own rules. Each time they consume a message, they use the id to judge whether the message has been consumed.

Idempotency guarantee on the consumer side

During the business peak of massive order generation, messages may occur repeatedly at the production end. At this time, the consumer must realize idempotency, which means that our messages will never be consumed many times, even if we receive the same message. The mainstream idempotency in the industry has two operations

  • The unique ID + fingerprint code mechanism uses the database primary key to remove duplication
  • Using the atomicity of redis to realize

Unique ID + fingerprint code mechanism

Fingerprint code: the unique information code given by some of our rules or timestamps plus other services. It is not necessarily generated by our system. It is basically spliced from our business rules, but we must ensure the uniqueness. Then we use the query statement to judge whether this id exists in the database. The advantage is to realize a simple splicing, Then query and judge whether it is repeated; The disadvantage is that in the case of high concurrency, if it is a single database, there will be a write performance bottleneck. Of course, we can also use database and table to improve the performance, but it is not our most recommended way.

Redis atomicity

Using redis to execute setnx command is naturally idempotent. So as to realize non repeated consumption

5.2 priority queue

Usage scenario

In our system, there is a scene of urging payment of orders. Taobao will push the orders placed by our customers on tmall to us in time. If they fail to pay within the time set by the user, they will push a SMS reminder to the user. It's a simple function, right? However, tmall businesses must be divided into large customers and small customers for us, right, For example, big businesses such as apple and Xiaomi can at least make us a lot of profits a year, so of course, their orders must be given priority. In the past, our back-end system used redis to store regular polling. We all know that redis can only use List as a simple message queue and can not achieve a priority scenario, Therefore, after the order quantity is large, RabbitMQ is used for transformation and optimization. If it is found that the order of a large customer is given a relatively high priority, otherwise it is the default priority.

Add priority

It can be specified among consumers in the code, that is, it can be set through map:

Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);

actual combat

Add consumer:

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        //Set the maximum priority of the queue. The maximum priority can be set to 255. The official website recommends 1-10. If the setting is too high, compare memory and CPU
        Map<String, Object> params = new HashMap();
        params.put("x-max-priority", 10);
        channel.queueDeclare(QUEUE_NAME, true, false, false, params);

        //How to perform interface callback for consumption of pushed messages
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println(message);
        };
        //A callback interface for canceling consumption, such as when the queue is deleted during consumption
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("Message consumption interrupted");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }

Add producer:

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitmqUtil.getChannel();

        //Give the message a priority attribute
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();

        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            if (i == 5) {
                channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
            } else {
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
            System.out.println("Send message complete:" + message);
        }
    }

After the producer sends all messages, run the consumer to consume the message. You can see that the message of 5 has priority, that is, 5 will be consumed first.

5.3. Inert queue

Usage scenario

RabbitMQ from 3.6 The concept of inert queue was introduced in version 0. Inert queue will store messages to disk as much as possible, and they will be loaded into memory when consumers consume the corresponding messages. One of its important design goals is to support longer queues, that is, to support more message storage. When consumers cannot consume messages for a long time due to various reasons (such as offline, downtime, or shutdown due to maintenance, etc.), it is necessary to have an inert queue.

By default, when the producer sends messages to RabbitMQ, the messages in the queue will be stored in memory as much as possible, so that messages can be sent to consumers more quickly. Even for persistent messages, a backup will reside in memory while being written to disk. When RabbitMQ needs to free memory, it will page the messages in memory to disk. This operation will take a long time, block the queue operation, and then fail to receive new messages. Although RabbitMQ developers have been upgrading relevant algorithms, the effect is not ideal, especially when the message volume is very large.

Two modes

Queues have two modes: default and lazy. The default mode is default, which is in 3.6 No changes are required for versions prior to 0. Lazy mode is the mode of inert queue. You can call channel The queuedeclare method can be set in the parameter or by Policy. If a queue is set by both methods, the Policy method has higher priority. If you want to change the mode of an existing queue by declaration, you can only delete the queue first and then declare a new queue again.

When the queue is declared, you can set the mode of the queue through the "x-queue-mode" parameter, with the values of "default" and "lazy". The following example demonstrates the declaration details of an inert queue:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Memory overhead comparison


When 1 million messages are sent and each message occupies about 1KB, the memory occupied by the ordinary queue is 1.2GB, while the inert queue only occupies 1.5MB. This is because the data of the inert queue exists on the disk.

6. RabbitMQ cluster construction

We already have a virtual machine that can be used. We only need to clone this virtual machine and two virtual machines as nodes.

Keywords: RabbitMQ Spring Boot cluster

Added by nylex on Wed, 29 Dec 2021 04:10:33 +0200