Message queue learning notes

Message queuing MQ

I took notes for the first time. I hope to leave a message to correct the deficiencies. I will continue to improve later*

1. Why MQ

1. Flow peak elimination
2. Application decoupling

When a business is composed of multiple system calls, the failure of any subsystem will lead to the abnormal operation of the whole business. After this operation is transformed into a message queue based method, the calls between systems are decoupled. Some information to be processed is cached in the message queue, so that it will not cause problems in one subsystem and exceptions in the whole system

3. Asynchronous processing

2.MQ classification

1.ActiveMq´╝î

2.KafKa, commonly used for big data processing, with high throughput (in millions)

Disadvantages:

3.RocketMQ

RabbitMq

RabbitMq

working principle:

1.Hello World

2.Work queues

2.1 concept:

When a producer sends a large number of messages, multiple worker threads need to receive messages to process a large number of messages. (Note: a message can only be processed sequentially, not multiple times)

2.2 polling distribution message

Send messages in turn when there are few messages

2.3 message response

In order to ensure that the message sending process is not lost, rabbitmq introduces a message response mechanism: when the consumer receives and processes the message, he tells rabbitmq that it has been processed, and rabbitmq can delete the message.

2.3. 3 automatic response

After the message is successfully sent, it is considered that the message has been successfully transmitted, that is, the consumer receives the message and considers that the message processing is successful and can be deleted

2.3. 4 method of message response

2.3. 5 messages automatically rejoin the team

If the consumer loses connection and does not send ACK confirmation, the message will be re queued and re processed by other consumers. In this way, even if a consumer dies unexpectedly, it can be guaranteed that the message will not be lost.

2.3. 6 message manual response

The message is not lost in the manual response and is put back in the queue for re consumption

producer

public class Producer {
    //Queue name
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitmqUtils.getChannel();

        //Declaration queue
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

        //Enter information from the console
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("Producer sends message" + message);
        }

    }
}

consumer

public class Consumer1 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitmqUtils.getChannel();

        System.out.println("C1 Short waiting time for message processing");

        DeliverCallback deliverCallback = (consumerTag , message)->{
            //Sleep for 1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Received small"+ new String(message.getBody()));
            //Manual answer
            /**
             * 1.Tag of message, tag
             * 2.Batch response
             */

            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        //Callback when canceling a message
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("Message consumption interrupted");
        };

        channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

2.4 persistence

Queue persistence

The queue still exists after the system is restarted

//Create a queue
channel.queueDeclare(QUEUQ_NAME,
                     false,false,false,null);

If the second parameter is true, it is a persistent queue,

Note that when there is a non persistent queue near the queue, it needs to be deleted and recreated to make it a persistent queue.

Message persistence

When sending a message, declare the message as a persistent message

channel.basicPublish("",QUEUQ_NAME,null,message.getBytes());

The third parameter 'props' is persistent_ TEXT_ When plan, this is a persistent message

channel.basicPublish("",QUEUQ_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());

2.5 unfair distribution

Different consumers have different ability to process messages. In order to improve the speed of message processing, unfair distribution is introduced

Note: unfair distribution is set in the consumer code

channel.basicQos(1);

2.6 pre value

You can specify the number of messages consumers get in advance,

channel.basicQos(x);

x represents the preset quantity

3. Release confirmation

If the message has not been saved to the disk during persistence, it will be lost if it is lost by the consumer. Therefore, the release confirmation mode is introduced, that is, the message must be saved to the disk after publication, and can be consumed only after the producer confirms that it has been saved successfully.

3.1 enable release confirmation mode

channel.confirmSelect();

3.2 single confirmation release

Send a confirmation, that is, after publishing a message, you can send the next message only after confirmation.

Disadvantages: the publishing speed is very slow

 Channel channel = RabbitmqUtils.getChannel();

        //Queue declaration
        channel.queueDeclare("q1",true,false,false,null);

        //Open release confirmation
        channel.confirmSelect();

        //start time
        long begin = System.currentTimeMillis();

        //Batch send messages
        for(int i =0;i<100;i++){
            String msg  = i+"";
            channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8));
            //Confirm after sending a single message
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("Message sent successfully");
            }
        }

        //End time
        long end = System.currentTimeMillis()-begin;
        System.out.println("Time consuming for individual acknowledgement messages"+end+"ms");
    }

3.3 batch confirmation

        //Batch send messages
        for(int i =0;i<100;i++){
            String msg  = i+"";
            channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8));

            //When 100 entries are reached, batch confirmation is performed in turn
            if(i%100 == 0){
                //confirm
                channel.waitForConfirms();
            }
        }

3.4 asynchronous confirmation Publishing

Send all the messages first. When the switch confirms, if the callback ackcallback is received but not received, it will also call back the ackcallback. In this way, the message that failed to be sent will be received asynchronously, and it can be sent again at this time.

 public static void m3() throws IOException, InterruptedException {
        Channel channel = RabbitmqUtils.getChannel();

        //Queue declaration
        channel.queueDeclare("q1",true,false,false,null);

        //Open release confirmation
        channel.confirmSelect();

        //start time
        long begin = System.currentTimeMillis();

        //The message preparation listener listens for which messages succeed and which messages fail
        //Function to confirm successful callback of message
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{
            System.out.println("Confirmed message" + deliveryTag);
        };

        //Function of message acknowledgement failure callback
        /**
         * Parameter 1 Tags for messages
         * Parameter 2 Is batch confirmation not performed
         */

        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            System.out.println("Unacknowledged messages" + deliveryTag);
        };
        channel.addConfirmListener(ackCallback,nackCallback);

        //Send message asynchronously
        for(int i =0;i<1000;i++){
            String msg  = i+"";
            channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8));

            
        }


        //End time
        long end = System.currentTimeMillis()-begin;
        System.out.println("Time consuming for batch confirmation messages"+end+"ms");

    }

How to handle asynchronous unacknowledged messages
public static void m3() throws IOException, InterruptedException {
        Channel channel = RabbitmqUtils.getChannel();

        //Queue declaration
        channel.queueDeclare("q1",true,false,false,null);

        //Open release confirmation
        channel.confirmSelect();

        /*
        *Thread safe and orderly hash table, suitable for high concurrency
        * 1.Associate sequence numbers with messages
        * 2.Batch delete
        * 3.Support high concurrency
        * */
        ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();

        //start time
        long begin = System.currentTimeMillis();

        //The message preparation listener listens for which messages succeed and which messages fail
        //Function to confirm successful callback of message
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{

            if(multiple){
                //2. Delete the confirmed message
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag);
                longStringConcurrentNavigableMap.clear();
            }else {
                concurrentSkipListMap.remove(deliveryTag);
            }
            System.out.println("Confirmed message" + deliveryTag);

        };

        //Function of message acknowledgement failure callback
        /**
         * Parameter 1 Tags for messages
         * Parameter 2 Is batch confirmation not performed
         */

        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            //3. Print as confirmation message
            String msg = concurrentSkipListMap.get(deliveryTag);
            System.out.println("Unacknowledged message: " + msg );
            System.out.println("Unacknowledged messages" + deliveryTag);


        };
        channel.addConfirmListener(ackCallback,nackCallback);

        //Send message asynchronously
        for(int i =0;i<1000;i++){
            String msg  = i+"";
            channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8));
            //1. Record messages sent
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),msg);

        }


        //End time
        long end = System.currentTimeMillis()-begin;
        System.out.println("Time consuming for batch confirmation messages"+end+"ms");

    }

4. Switch

4.1 publish / subscribe mode

4.2 switch concept:

The core idea history of RammitMQ's messaging model: the producer's message will not be sent directly to the queue. Usually, the producer does not know which queue the message is delivered to

Therefore, the producer can only send messages to the switch, which receives messages from the producer and pushes them into the queue.

4.3 types of exchans

There are several types of switches in total

direct (routing), topic (topic), headers (headers), fan out (fan out, type of publish / subscribe)

Anonymous Exchanges

4.4 temporary queue

If there is no persistent queue, it will be deleted when disconnected.

Create a temporary queue with a random name

channel.queueDeclare().getQueue();

4.5 binding

The switch can bind multiple queues and judge the sent messages through the RoutingKey

4.6 Fanout, publish and subscribe

This type of switch broadcasts the received messages to all bound queues

Declare a switch. The first parameter (switch name), the second parameter (switch name)

 channel.exchangeDeclare("los","fanout");

Bind queue

channel.queueBind(queue,exchanges)

4.7 direct

Adding routingkey during binding

queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)

4.8 topic

Compared with the more powerful types of direct and fanout,

In tpoic, you can specify the routingKey according to your * and # to achieve the effects of single sending, partial group sending and group sending

Where: * stands for one word,,,,, # stands for multiple words

Note: the routingKey size cannot exceed 255 bytes

5. Dead letter queue

5.1 concept of dead letter queue

  • Dead letter queue: DLX, dead letter exchange
  • With DLX, when a message becomes a dead message in a queue, it can be re publish ed to another Exchange, which is DLX

5.2 source of dead letter
  • Message TTL expired
  • The queue has reached the maximum length (the queue is full and can no longer be added to the data mq)
  • The message is rejected (basic.reject or basic.nack) and request = false
5.2 dead letter case code
5.2. 1. Message TTL expires

Producer code

public class producer {
    //General switch
    public static String NORMAL_EXAHANGE = "normalExchange";

    //Dead letter switch
    public static String DEAD_EXAHANGE = "deadExchange";

    //Normal queue
    public static String NORMAL_QUEUE = "normalQueue";

    //Dead letter queue
    public static String DEAD_QUEUE = "deadQueue";


    public static void main(String[] args) throws Exception{

        //Acquisition channel
        Channel channel = RabbitmqUtils.getChannel();

        //Dead letter message, set TTL time 10s
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();


        for (int i =0;i<=10;i++) {
            String msg = "info" + i;
            channel.basicPublish(NORMAL_EXAHANGE, "normal", properties, msg.getBytes(StandardCharsets.UTF_8));
        }
    }
}

Ordinary consumer code: / / send to the dead letter exchange if receiving fails

    public static void main(String[] args) throws Exception{

        //Acquire channel
        Channel channel = RabbitmqUtils.getChannel();


        //Declare common switch
        channel.exchangeDeclare(NORMAL_EXAHANGE, BuiltinExchangeType.DIRECT);

        //Declare dead letter switch
        channel.exchangeDeclare(DEAD_EXAHANGE,BuiltinExchangeType.DIRECT);

        //Declare normal queue
        Map<String, Object> arguments = new HashMap<>();
        //Set dead letter switch with normal queue
        arguments.put("x-dead-letter-exchange",DEAD_EXAHANGE);
        //Set dead letter routingKey
        arguments.put("x-dead-letter-routing-key","dead");


        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

        //Declare dead letter queue
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);


        //binding
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXAHANGE,"normal");
        channel.queueBind(DEAD_QUEUE,DEAD_EXAHANGE,"dead");

        //Claim to receive message
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("Consumer1 Received messages"+new String(message.getBody()));
        };

        //The cancellation message is a callback
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("Message consumption interrupted");
        };

        //
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);

    }

Dead letter consumer code: / / consume the messages in the dead letter queue directly through the name of the dead letter queue

        public static void main(String[] args) throws Exception{

            //Acquire channel
            Channel channel = RabbitmqUtils.getChannel();


            //Claim to receive message
            DeliverCallback deliverCallback = (consumerTag, message)->{
                System.out.println("Consumer1 Received messages"+new String(message.getBody()));
            };

            //Callback when canceling a message
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("Message consumption interrupted");
            };

            //
            channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);



        }
5.2. 2 the queue has reached the maximum length

When modifying the code of queue declaration, the maximum length parameter is added

     //Declare normal queue
        Map<String, Object> arguments = new HashMap<>();
        //Set dead letter switch with normal queue
        arguments.put("x-dead-letter-exchange",DEAD_EXAHANGE);
        //Set dead letter routingKey
        arguments.put("x-dead-letter-routing-key","dead");
        
        //Sets the maximum length of a normal pair of columns
        arguments.put("x-max-length",6);
5.2. 3 message rejected

Modify the message confirmation return function. / / manually answer the judgment message and reject it

//Refuse

channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
//Claim to receive message
DeliverCallback deliverCallback = (consumerTag,message)->{
    String msg1 = new String(message.getBody());
    if(msg1.equals("info5")){
        System.out.println("Consumer1 Received messages"+msg1+"Rejected");
        channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
    }else {
        System.out.println("Consumer1 Received messages" + msg1);
        channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    }
};

//Manual answer

channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);

6. Delay queue

7. Release confirmation advanced

Some unknown reasons in the production environment lead to rabbitmq downtime or restart, producer message delivery failure and message loss. Manual recovery and restart are required. In order to ensure the security and reliability of the message, release confirmation needs to be processed, and the confirmCallBack interface must be used to process the result of message confirmation. At this time, the confirmation callback interface must be opened, Because the publish confirmation mode is turned off by default, it needs to be turned on manually:

spring.rabbitmq.publisher-confirm-type=correlated

rabbitmq server down, producer failed to send message

7.1 example, switch confirmation

Create a publish confirmation configuration class to declare switches and queues

@Configuration
public class ConfirmConfig {
 public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
 public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
 //Declare business Exchange
 @Bean("confirmExchange")
 public DirectExchange confirmExchange(){
 return new DirectExchange(CONFIRM_EXCHANGE_NAME);
 }
 // Claim confirmation queue
 @Bean("confirmQueue")
 public Queue confirmQueue(){
 return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
 }
 // Declare and confirm queue binding relationship
 @Bean
 public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
 @Qualifier("confirmExchange") DirectExchange exchange){
 return BindingBuilder.bind(queue).to(exchange).with("key1");
 } }

The id of the message specified in the message producer is through CorrelationData, which is the interface used in the message callback interface to receive message parameters, confirm success, and the failure reason. It sends an error message, such as the queue of the specified routingKey does not exist or rabbitmq goes down:

@GetMapping("/sendMsg/{msg}")
public void sendMsg(@PathVariable(value = "msg") String msg){
    //Callback interface
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,"key2"
                                    ,msg,correlationData);
    log.info("The content of the message sent is: {}",msg);
}

At this time, if the callback interface is not opened, it will be found that the message has been lost and has not been processed. Configure the message callback interface:

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnsCallback {

    public MyCallBack(@Autowire RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setConfirmCallback(this);
    }


    /**
     * Switch confirmation callback method
     * 1. Switch received, callback
     *     1.1  correlationData Save the ID and related information of the callback message
     *     1.2  ack  Switch receives message true
     *     1.3  cause There is no reason for success
     * 2. The switch didn't receive it
     *     2.1 correlationData Save the ID and related information of the callback message
     *     2.2 ack  The switch did not receive the message false
     *     2.3 cause  Reasons for failure
     * @param correlationData Save the ID and related information of the callback message
     * @param ack Does the switch receive messages
     * @param cause Reasons for failure
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if (ack){
            log.info("Switch has received ID by: {}News of",id);
        }else {
            log.info("Switch not received ID by: {}News of,The reason is: {}",id,cause);
        }
    }
}

Then, when the switch, routingkey and queue have problems, the failure information will be returned.

7.2 fallback message

7.2.1 Mandatory parameter

**When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message. If it is found that the message is not routable, the message will be directly discarded. At this time, the producer does not know the event that the message is discarded** So how can I get messages that can't be routed to help me find a way to deal with them? At least let me know. I can handle it myself. By setting the mandatory parameter, the message can be returned to the producer when the destination is unreachable during message delivery.

Modify profile

spring:
  rabbitmq:
    host: 39.106.41.153
    port: 5672
    username: root
    password: root
    publisher-confirm-type: correlated
    publisher-returns: true

Callback method

package com.vleus.rabbitmq.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author vleus
 * @date 2021 At 23:34 on July 28
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //injection
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /*
    Switch confirmation callback method:
    1,Send a message and the switch receives the callback:
        1.1 correlationData Save the ID and related information of the callback message;
        1.2 Switch receives message true
        1.3 cause null
    2,Sending message switch failed to receive message callback
     2.1 correlationData Save the ID and related information of the callback message;
     2.2 Switch receives message false
     2.3 cause The reason why the switch failed to receive messages
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("The switch has received the message,id by{}", id);
        } else {
            log.info("The switch has received it id by{}News of,For reasons: {}", id, cause);
        }
    }

    //The implementation returns the message to the producer when the destination is unreachable during message delivery
    //Fallback is performed only when the message cannot reach the destination
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        Message message = returned.getMessage(); //news
        String exchange = returned.getExchange(); //Switch
        String replyText = returned.getReplyText(); //
        String routingKey = returned.getRoutingKey(); //Routing key
        log.error("news{},Switched{}return,The reason for return is: {},route key by: {}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }
}

7.3 backup switch

With the mandatory parameter and fallback message, we gain the perception of undeliverable messages, and have the opportunity to find and process producer messages when they cannot be delivered. But sometimes, we don't know how to handle these messages that can't be routed. We can make a log at most, then trigger an alarm, and then handle them manually. It is not elegant to handle these unrouted messages through logs, especially when the producer's service has multiple machines, manual copying logs will be more troublesome and error prone. Moreover, setting the mandatory parameter will increase the complexity of producers and need to add logic to deal with these returned messages. What if you don't want to lose messages and increase the complexity of producers? In the previous article on setting the dead letter queue, we mentioned that the dead letter switch can be set for the queue to store those failed messages, but these non routable messages have no chance to enter the queue, so the dead letter queue cannot be used to save messages. In RabbitMQ, there is a backup switch mechanism, which can deal with this problem well. What is a backup switch? The backup switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a switch, we create a spare tire for it. When the switch receives a non routable message, it will forward the message to the backup switch for forwarding and processing, Usually, the backup switch is Fanout, so that all messages can be delivered to the bound queue. Then we bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter the queue. Of course, we can also establish an alarm queue to monitor and alarm with independent consumers.

Statement:

Declare the corresponding backup switch when declaring the switch

    //Claim switch
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).
                durable(true)
                .alternate(BACKUP_EXCHANGE). //Forward to backup switch
                build();
//        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }


Details:
Configuration class

@Configuration
public class ConfirmConfig {

    //Define switch
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    //queue
    public static final String CONFIRM_QUEUE = "confirm.queue";

    //routingKey
    public static final String ROUTING_KEY = "key1";

    //Backup switch
    public static final String BACKUP_EXCHANGE = "backup_exchange";

    //Backup queue
    public static final String BACKUP_QUEUE = "backup_queue";

    //Alarm queue
    public static final String WARNING_QUEUE = "warning_queue";

    //Claim switch
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).
                durable(true)
                .alternate(BACKUP_EXCHANGE). //Forward to backup switch
                build();
//        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //Declaration queue
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //binding
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,
                                        @Qualifier("confirmExchange")DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }

    //Backup switch
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE);
    }

    //Declare backup queue
    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    //Declare alarm queue
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }

    //The backup queue is bound to the backup switch
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue")Queue backupQueue,
                                              @Qualifier("backupExchange")FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    //The alarm queue is bound to the backup switch
    @Bean
    public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue")Queue warningQueue,
                                              @Qualifier("backupExchange")FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }

}

Consumer:

@Slf4j
@Component
public class WarningConsumer {

    //Receive alarm messages
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)
    public void consumeWarningQueue(Message message) {
        String msg = new String(message.getBody());

        log.error("The alarm found a non routable message: {}",msg);
    }
}

Note: Backup switches take precedence over fallback messages

8. Priority queue

8.1 usage scenarios

If there is an order payment urging scenario in our system, Taobao will push the order placed by our customer on tmall to us in time. If the user fails to pay within the time set by the user, it will push a text message reminder to the user. But for Taobao, it must be divided into large customers and small customers, right, such as apple, A big business like Xiaomi can at least create a lot of profits for us in a year, so of course, their orders must be given priority. In the past, our back-end system used redis to store regular polling. We all know that redis can only use List as a simple message queue, but can not achieve a priority scenario, Therefore, after the order quantity is large, RabbitMQ is used for transformation and optimization. If it is found that the order of a large customer is given a relatively high priority, otherwise it is the default priority.

be careful:

The following things need to be done to enable the queue to achieve priority: the queue needs to be set as the priority queue, the message needs to be set as the priority of the message, and the consumer needs to wait until the message has been sent to the queue to consume, because this is the opportunity to sort the messages

8.2 example

producer

public class Producer {
	 private static final String QUEUE_NAME="hello";
	 public static void main(String[] args) throws Exception {
		 try (Channel channel = RabbitMqUtils.getChannel()) {
		 //Give the message a priority attribute
		 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
		 for (int i = 1; i <11; i++) {
			 String message = "info"+i;
			 if(i==5){
				 channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
			 }else{
			 	channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			 }
			 	System.out.println("Send message complete:" + message);
			 }
		 }
	 }
}

consumer

public class Consumer {
	 private static final String QUEUE_NAME="hello";
	 public static void main(String[] args) throws Exception {
		 Channel channel = RabbitMqUtils.getChannel();
		 //Set the maximum priority of the queue. The maximum priority can be set to 255. The official website recommends 1-10. If the setting is too high, compare memory and CPU
		 Map<String, Object> params = new HashMap();
		 params.put("x-max-priority", 10);
		 channel.queueDeclare(QUEUE_NAME, true, false, false, params);
		 System.out.println("Consumers start waiting for consumption......");
		 DeliverCallback deliverCallback=(consumerTag, delivery)->{
		 String receivedMessage = new String(delivery.getBody());
		 System.out.println("Message received:"+receivedMessage);
		 };
		 channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{
		 System.out.println("Called when the consumer cannot consume the message, such as when the queue is deleted");
		 });
	 }
}

9. Inert queue

9.1 concept

A queue in which messages are saved on disk

9.2 usage scenarios

When consumers go offline or cannot consume, they use inert queues to avoid a large number of messages piling up in memory

9.3 declaration method

Just add a parameter when declaring the queue

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

10. Cluster

A single MQ can not deal with the situation that a large number of producers send messages, so a cluster is introduced. Multiple MQS form a cluster

10.1 image queue

RabbitMQ is the default cluster mode and does not guarantee the high availability of queues. Although queue information, switches and bindings can be copied to any node in the cluster, the queue content will not be copied. Although this mode solves the node pressure of a project group, the downtime of queue nodes directly leads to the unavailability of the queue and can only wait for restart, Therefore, in order to work normally even when the queue node goes down or fails, it is necessary to copy the queue content to each node in the cluster and create a mirror queue.

10.2 Federation plug-in

The Federation plug-in is used to copy queue messages between different RabbitMQ clusters. The cluster can be an intranet or a public network, which is transparent to applications, that is, applications will not perceive it and do not need to write relevant code.

Keywords: Java RabbitMQ

Added by Firestorm ZERO on Fri, 24 Dec 2021 10:29:19 +0200