RabbitMQ consumer manual ACK message confirmation mechanism

   ack - acknowledge (vt. acknowledge; acknowledge; reward; inform that it has been received; the meaning of confirmation). In RabbitMQ, it refers to a kind of behavior that consumers confirm after receiving a message. The focus is whether consumers can actually receive the message sent by MQ.

  it provides three confirmation methods:

  automatic confirmation acknowledge = "none": when the consumer receives the message, it will automatically give RabbitMQ a receipt to tell MQ that I have received the message. It doesn't care whether the business processing is successful or not after the consumer receives the message.

   manual confirmation acknowledge = "manual": after receiving the message, the consumer will not immediately tell RabbitMQ that the message has been received, but will manually confirm the message has been received to MQ by calling code after the business processing is successful. When business processing fails, some retry mechanisms can be made, and even MQ can send messages to consumers again.

   confirm acknowledge = "auto" according to the exception: this method handles the response (such as retransmission, confirmation, etc.) through the type of exception thrown. This method is more troublesome.

   once a message is received by the consumer, it will automatically confirm the receipt to MQ and remove the response message from the RabbitMQ message cache. However, in the actual business processing, the message received but the business processing is abnormal. In the automatic recognition mode, the message that failed in the business processing is equivalent to being discarded. If manual confirmation is set, you need to manually call channel after the business processing is completed Basicack(), manually sign in. If the business processing fails, manually call channel The basicnack () method rejects and asks MQ to resend the message.

  if you do not configure any acknowledge, the signing in is automatically confirmed by default.

  1. Consumer configuration:

server:
  port: 81

spring:
  rabbitmq:
    host: www.996.com
    port: 5672
    username: rabbitadmin
    password: rabbitadmin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #Manual

  2. Consumer interface method:

/**
 * Test ACK
 * @param message
 */
public void receiveMessage(String message);

   3. Implementation of consumer interface method:

/**
 * Test ACK
 * @param message
 */
@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE})
public void receiveMessage(String message) {
    System.out.println("Received MQ Message:"+message);
    //Processing business
    System.out.println("Processing business");
}

  4. The producer sends queue messages

/**
 * convertAndSend(String exchange, String routingKey, Object object),c
 *
 * @param message
 * @return
 */
@Override
public void sendMessage(String message) {
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE,RabbitMqConfig.DIRECT_ROUTINGKEY,message,correlationData);
}

  5. Queue

   6. The consumer received the queue message successfully, but the message in the queue was not deleted, but the status changed to unconfirmed status

   7. Change the message receiving method and perform manual ACK confirmation

/**
 * Test ACK
 * @param message
 */
@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE})
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

    try {
        System.out.println("Received MQ Message:"+message);
        //Processing business
        System.out.println("Processing business");
        //Manual Ack
        /**
         * Manual Ack parameter description
         * basicAck(long deliveryTag, boolean multiple)
         * deliveryTag: Batch processing label, for example: there are 5 messages to be consumed in this queue, then the label of this batch of data will increase from 1 to 5, and when it is 5, it will manually ack multiple: batch processing
         *
         */
        System.out.println("deliveryTag: " + deliveryTag);
        channel.basicAck(deliveryTag,true);
    }catch (Exception e){
        e.getMessage();
    }

}

   8. The consumer receives the queue message and confirms the ACK manually

   9. If an exception is handled in the business, it can be refused, simulate the business

/**
 * Test ACK
 * @param message
 */
@Override
@RabbitListener(queues = {RabbitMqConfig.DIRECT_QUEUE})
public void receiveMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

    try {
        System.out.println("Received MQ Message:"+message);
        //Processing business
        System.out.println("Processing business"+1/0);
        //Manual Ack
        /**
         * Manual Ack parameter description
         * basicAck(long deliveryTag, boolean multiple)
         * deliveryTag: Batch processing label, for example: there are 5 messages to be consumed in this queue, then the label of this batch of data will increase from 1 to 5, and when it is 5, it will manually ack multiple: batch processing
         *
         */
        System.out.println("deliveryTag: " + deliveryTag);
        channel.basicAck(deliveryTag,true);
    }catch (Exception e){
        e.printStackTrace();
        /**
         * basicNack(long deliveryTag, boolean multiple, boolean requeue)
         * requeue: Send back to queue
         */
        try {
            channel.basicNack(deliveryTag,false,true);
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }

}

   10. Test. After an exception, the message will be sent back to the queue and monitored again to continue consumption. By default, consumption will not be consumed after retrying consumption for 3 times

Keywords: RabbitMQ

Added by SWI03 on Sun, 16 Jan 2022 02:57:05 +0200