Advanced features of RabbitMQ: how to solve the problem of using RabbitMQ

I Problem analysis of RabbitMQ

1. After the message is sent, the sender does not know whether RabbitMQ has really received the message?

  • After the message is sent, the sender does not know whether RabbitMQ really received the message
  • If RabbitMQ is abnormal and the message is lost, the order processing process stops and the business is abnormal
  • You need to use the RabbitMQ sender confirmation mechanism to confirm whether the message is sent

2. After the message is sent, is the message really routed?

  • After the message is sent, the sender does not know whether the message is routed correctly. If the routing is abnormal, the message will be discarded
  • After the message is discarded, the order processing process stops and the business is abnormal
  • You need to use the RabbitMQ message return mechanism to confirm that the message is routed correctly

3. After the message is sent, the message volume is too large. Can the consumer handle it?

  • During the peak period of business, the performance of the sender and the receiver may be inconsistent, and a large number of messages are pushed to the receiver at the same time, resulting in the collapse of the service at the receiver
  • RabbitMQ consumer end flow restriction mechanism should be used to limit the message push speed and ensure the stability of the receiving end service

4. How to handle exceptions on the consumer side?

  • By default, when the consumer receives a message, the message will be automatically acknowledged (ACK)
  • When the message processing of the consumer is abnormal, the sender and the message middleware cannot know the message processing
  • You need to use the RabbitMQ consumer confirmation mechanism to confirm that the message is processed correctly

5. What if the queue is full?

  • By default, messages enter the queue and will exist forever until consumed
  • A large number of messages will put a lot of pressure on RabbitMQ
  • You need to use the RabbitMQ message expiration time to prevent a large backlog of messages

6. How to transfer expired messages

  • The message is set with an expiration time, and will be discarded directly after expiration
  • Messages that are directly discarded cannot alert the system for abnormal operation
  • Do you need to use RabbitMQ dead letter queue to collect expired messages for analysis

7. Summary

  • Sender acknowledgement mechanism
  • Message return mechanism
  • Consumer end current limiting mechanism
  • Consumer confirmation mechanism
  • Message expiration mechanism
  • Dead letter queue

II Advanced features of RabbitMQ

1. Principle of sender confirmation mechanism

1.1 problem analysis: has the message really been sent?

  • After the message is sent, the sender does not know whether RabbitMQ really received the message

  • If RabbitMQ is abnormal and the message is lost, the order processing process stops and the business is abnormal

  • You need to use the RabbitMQ sender confirmation mechanism to confirm the message sending

1.2 what is the sender acknowledgement mechanism

  • After the message is sent, if the middleware receives the message, it will give a reply to the sender
  • The producer (sender) receives the response to confirm whether the message is sent to the middleware normally

1.3 three confirmation mechanisms

  • Single synchronization confirmation
  • Multiple synchronization confirmation
  • Asynchronous acknowledgement

1.4 implementation method of single synchronization confirmation mechanism

  • Configure channel and enable the confirmation mode: channel confirmSelect()

  • Every time a message is sent, call channel The waitforconfirms() method returns a Boolean value and waits for confirmation

 ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
            //Enable confirmation mode
            channel.confirmSelect();
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            if (channel.waitForConfirms()) {
                log.info("RabbitMQ confirm OK");
            } else {
                log.info("RabbitMQ confirm Failed");
            }
}

1.5 implementation method of multiple synchronization confirmation mechanism: not recommended

  • Configure channel and enable the confirmation mode: channel confirmSelect()
  • After sending multiple messages, call channel.. The waitforconfirms() method returns a Boolean value and waits for confirmation
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
             channel.confirmSelect();
            for (int i = 0; i < 10; i++) {
                channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
                log.info("message sent");
            }
        }

Note: call channel The waitforconfirms() method returns a Boolean value. If it is true, multiple messages previously sent will be sent successfully. If it is false, multiple messages previously sent will be partially successful and partially unsuccessful, but not all messages will be unsuccessful.

1.6 implementation method of asynchronous confirmation mechanism: not recommended. There is thread safety problem during concurrency

  • Configure channel and enable the confirmation mode: channel confirmSelect()
  • Add a listener on the channel: addConfirmListener. After sending a message, this method will be called back to notify whether the message was sent successfully
  • Asynchronous confirmation can be single or multiple, depending on MQ
  ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            
          
            //3. Asynchronous and synchronous implementation mechanism
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                //Call handleAck after success
                //deliveryTag the message serial number of the sender is the number of messages sent
                //Multiple whether there are multiple
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("Ack, deliveryTag: {}, multiple: {}",  deliveryTag, multiple);
                }
                //After failure, call handleNack.
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("Nack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
                }
            });
          String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
          channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
        }

Flow chart of confirmation mechanism:

2. Message return mechanism

2.1 problem analysis: after the message is sent, is the message really routed?

  • After the message is sent, the sender does not know whether the message is routed correctly. If the routing is abnormal, the message will be discarded
  • After the message is discarded, the order processing process stops and the business is abnormal
  • You need to use the RabbitMQ message return mechanism to confirm that the message is routed correctly

2.2 what is the principle of message return mechanism?  

  • After the message is sent, the middleware will route the message
  • If the target queue is not found, the middleware will notify the sender
  • Return Listener will be called

2.3 opening method of message return mechanism

  • There is a key configuration item in RabbitMQ basic configuration: Mandatory

  • If Mandatory is false, RabbitMQ will directly discard messages that cannot be routed

  • If Mandatory is true, RabbitMQ will process messages that cannot be routed, and the handleReturn method will be called

    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

//Enable message return mechanism  
channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());

 ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.addReturnListener(new ReturnListener() {
                    //replyCode status information
                    //replyText reply message
                    //exchange switch
                    //routingKey routing key
                    //AMQP. Original data of basicproperties message
                    //body message content
                    @Override
                    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        log.info("Message Return: " +
                                "replyCode:{}, replyText:{}, exchange:{}, routingKey:{}, properties:{}, body:{}",
                                replyCode, replyText, exchange, routingKey, properties, new String(body));
                        //In addition to printing log, other business operations can be added
                    }
                });
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());

            }

 ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.addReturnListener(new ReturnCallback() {
                    @Override
                    public void handle(Return returnMessage) {
                        log.info("Message Return: returnMessage{}", returnMessage);

                        //In addition to printing log, other business operations can be added
                    }
                });
                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", true,null, messageToSend.getBytes());

Note: channel Addreturnlistener (New returnlistener()) and channel The methods of addreturnlistener (New returncallback()) are essentially the same, but the former one has more input parameters, and the latter one can use returnmessage Getxxx gets the parameters of the previous method

3. Consumer confirmation mechanism

3.1 problem analysis: after the message is sent, what should the consumer do to handle exceptions?

  • By default, when the consumer receives a message, the message will be automatically acknowledged (ACK)
  • When the message processing of the consumer is abnormal, the sender and the message middleware cannot know the message processing
  • You need to use the RabbitMQ consumer confirmation mechanism to confirm that the message is processed correctly

3.2 consumer ACK type

  • Automatic ACK: after receiving the message, the consumer will automatically sign in the message
  • Manual ACK: after receiving the message, the consumer will not automatically sign back the message. We need to explicitly sign the message in the business code

3.3 manual ACK type

  • Single manual ACK: multiple=false

  • Multiple manual acks: multiple=true

  • Single ACK is recommended

3.4 returning to the queue

  • If the return queue is set, after the message is NACK, it will return to the end of the queue and wait for further processing
  • It is generally not recommended to open the replay queue, because the abnormal message processed for the first time is basically abnormal when processed again

1. Modify channel autoAck in basicconsume is false

channel.basicConsume("queue.restaurant", false, deliverCallback, consumerTag -> {
            });

2. Sign for message

//1. Is there more than one receipt
void basicAck(long deliveryTag, boolean multiple) throws IOException;

DeliverCallback deliverCallback = (consumerTag, message) -> { 
    //Manually sign in messages
    if (message.getEnvelope().getDeliveryTag()%10 == 0){
      channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    }
}

3. Reject manually and return to the queue: not recommended

    @Bean
    Channel rabbitChannel() throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
//Single article
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
//Multiple strips
if (message.getEnvelope().getDeliveryTag()%10 == 0){
  channel.basicNack(message.getEnvelope().getDeliveryTag(),true,true);
}

4. Consumer side current limiting mechanism

4.1 problem analysis: after the message is sent, the message volume is too large. Can the consumer handle it?

  • During the peak period of business, the performance of the sender and the receiver may be inconsistent, and a large number of messages are pushed to the receiver at the same time, resulting in the collapse of the service at the receiver
  • RabbitMQ consumer end flow restriction mechanism should be used to limit the message push speed and ensure the stability of the receiving end service

4.2 rabbitmq QoS service quality assurance

  • To solve the above problems, RabbitMQ develops Qos (quality of service assurance) function
  • The Qos function ensures that new messages are not consumed before a certain number of messages are confirmed
  • The premise of Qos is that auto ack = false is not used

4.3 Qos principle

  • The Qos principle is that when a certain number of messages on the consumer end are not confirmed by ACK, RabbitMQ will not push new messages to the consumer end
  • RabbitMQ uses Qos mechanism to limit the flow at the consumer end

4.4 parameter setting of consumer end current limiting mechanism

  • prefetchCount: the maximum number of unconfirmed messages pushed for a consumer

  • gloal:

    • true: limit the current for the entire consumer end
    • false: current limiting for the current channel
  • prefetchSize:0: the size limit of a single message, usually 0

  • prefetchSize and global. RabbitMQ is not implemented yet

 channel.basicQos(5);
 @Async
    public void handleMessage() throws IOException, TimeoutException, InterruptedException {
        log.info("start linstening message");
        channel.exchangeDeclare(
                "exchange.order.restaurant",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                null);

        channel.queueDeclare(
                "queue.restaurant",
                true,
                false,
                false,
                null);

        channel.queueBind(
                "queue.restaurant",
                "exchange.order.restaurant",
                "key.restaurant");

        channel.basicQos(5);
        channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
        });
        while (true) {
            Thread.sleep(100000);
        }
    }

5. Consumer expiration mechanism: not used alone, but in combination with dead letter mechanism

5.1 problem analysis: what if the queue is full?

  • By default, messages enter the queue and will exist forever until consumed

  • A large number of messages will put a lot of pressure on RabbitMQ

  • You need to use the RabbitMQ message expiration time to prevent a large backlog of messages

    5.2 rabbitmq expiration time (TTL)

  • The expiration time of RabbitMQ is called TTL (Time to Live), and the lifetime is

  • The expiration time of RabbitMQ is divided into message TTL and queue TTL

  • Message TTL sets the expiration time of a single message

  • Queue TTL sets the expiration time of all messages in the queue

    5.3 how to find a suitable TTL

  • TTL setting mainly considers technical architecture and business

  • TTL should be significantly longer than the average restart time of the service

  • It is recommended that TTL be longer than the peak time of business

Note: TTL is not recommended to be used alone here, which will lead to erroneous deletion of messages, which will be used together with dead letter later

1. Set the expiration time of a single message

    //1. Implementation of single synchronization confirmation mechanism
        channel.confirmSelect();
        //Set expiration time
        AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("15000").build();
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
        log.info("message sent");
        if (channel.waitForConfirms()) {
            log.info("RabbitMQ confirm OK");
        } else {
            log.info("RabbitMQ confirm Failed");
        }

2. Set the expiration time of the queue: if the queue already exists and the expiration time of the queue is added, you need to delete the previous queue and restart the service

@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
    log.info("start linstening message");
    channel.exchangeDeclare(
            "exchange.order.restaurant",
            BuiltinExchangeType.DIRECT,
            true,
            false,
            null);
    //Set the expiration time of the queue
    Map<String, Object> args=new HashMap<>(16);
    args.put("x-message-ttl",15000);
    channel.queueDeclare(
            "queue.restaurant",
            true,
            false,
            false,
            args);

    channel.queueBind(
            "queue.restaurant",
            "exchange.order.restaurant",
            "key.restaurant");

    channel.basicQos(5);
    channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
    });
    while (true) {
        Thread.sleep(100000);
    }
}
        //Set the expiration time of the queue
        Map<String, Object> args=new HashMap<>(16);
        //Set the expiration time of messages in the queue
        args.put("x-message-ttl",15000);
        //Setting the expiration time of the queue is not recommended
        args.put("x-expire",15000);

6. Dead letter queue: collect expired messages

6.1 problem analysis: how to transfer expired messages

  • The message is set with an expiration time, and will be discarded directly after expiration
  • Messages that are directly discarded cannot alert the system for abnormal operation
  • Do you need to use RabbitMQ dead letter queue to collect expired messages for analysis

6.2 what is a dead letter queue

  • Dead letter queue: the queue is configured with DLX attribute (dead letter exchange)

  • When a message becomes a dead message, it can be re sent to the north without reaching another Exchange, which is also an ordinary Exchange

  • After the dead letter is routed by the dead letter switch, it usually enters a fixed queue

Description: when the producer sends a message and is routed to the Queue by the Exchange switch, but the message is placed in the Queue for too long. After exceeding the set TTL, it will become a dead letter. The dead letter will be retransmitted to a DL Exchange. DL Exchange is also an ordinary Exchange, but we give him a DL ID, DL Exchange will route messages to DL Queue, which will be monitored manually or by special exception handling programs. This process is called the transfer of dead letter.

6.3 how to become a dead letter?  

  • The message is rejected (reject/nack) and request = false

    //Manual rejection
    channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
    
  • Message expiration (TTL expiration)

    //Set the expiration time of messages in the queue
    args.put("x-message-ttl",15000);
    
  • The queue has reached its maximum length

    //Sets the maximum length of the queue
    args.put("x-max-length",5); 
    

6.4 setting method of dead letter queue

  • Set the switch and queue for forwarding and receiving dead letters

    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
  • Add parameters to the queue where dead letter needs to be set:

    • x-dead-letter-exchange=dlx.exchange
 @Async
    public void handleMessage() throws IOException, TimeoutException, InterruptedException {
        log.info("start linstening message");
        //Declare dead letter switch
        channel.exchangeDeclare("exchange.dlx",
                BuiltinExchangeType.TOPIC,
                true,
                false,
                null);
        //Declare dead letter queue
        channel.queueDeclare("queue.dlx",
                true,
                false,
                false,
                null);
        //Bind exchange and queue
        channel.queueBind("queue.dlx","exchange.dlx","#");
        channel.exchangeDeclare(
                "exchange.order.restaurant",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                null);
        //Set the expiration time of the queue
        Map<String, Object> args=new HashMap<>(16);
        //Set the expiration time of messages in the queue
        args.put("x-message-ttl",15000);
        //Set the expiration time of the queue
//        args.put("x-expire",15000);
        //Set dead letter queue
        args.put("x-dead-letter-exchange","exchange.dlx");
        channel.queueDeclare(
                "queue.restaurant",
                true,
                false,
                false,
                args);

        channel.queueBind(
                "queue.restaurant",
                "exchange.order.restaurant",
                "key.restaurant");
				//Set the current limiting mechanism at the consumer end
        channel.basicQos(5);
        channel.basicConsume("queue.restaurant", true, deliverCallback, consumerTag -> {
        });
        while (true) {
            Thread.sleep(100000);
        }
    }

Declare dead letter switch, queue, binding exchange and queue

 //Declare dead letter queue switch
channel.exchangeDeclare("exchange.dlx",
                        BuiltinExchangeType.TOPIC,
                        true,
                        false,
                        null);
        //Declare dead letter queue
        channel.queueDeclare("queue.dlx",
                true,
                false,
                false,
                null);
        //Bind exchange and queue
        channel.queueBind("queue.dlx","exchange.dlx","#");

Keywords: RabbitMQ

Added by spellbinder on Wed, 02 Feb 2022 15:54:05 +0200