RabbitMQ: message sending confirmation and message receiving confirmation (ACK)

summary

By default, if a Message is correctly received by the consumer, it will be removed from the Queue

If a Queue is not subscribed by any consumers, the messages in the Queue will be cached. When a consumer subscribes, they will be sent immediately. When the Message is correctly received by the consumer, it will be removed from the Queue

I Message delivery confirmation

1.1. What is a failure or success in sending a message? How to confirm?

When the message cannot be routed to the queue, the confirmation message routing fails. When the message is successfully routed, confirm the message after all the queues to be sent are successfully sent. For the persistent queue, it means writing to the disk, and for the image queue, it means that all images are successfully received

1.2.ConfirmCallback

By implementing the ConfirmCallback interface, a callback is triggered after the message is sent to the Broker to confirm whether the message reaches the Broker server, that is, only whether it reaches the Exchange server correctly

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);            //Specify ConfirmCallback
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("Message unique ID:"+correlationData);
        System.out.println("Confirmation result:"+ack);
        System.out.println("Failure reason:"+cause);
    }

You also need to add a configuration in the configuration file

spring:
  rabbitmq:
    publisher-returns: true 

1.3.ReturnCallback

By implementing the ReturnCallback interface, the startup message fails to return, such as triggering a callback when the route cannot reach the queue

@Component
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnCallback(this);             //Specify ReturnCallback
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("Message body message : "+message);
        System.out.println("Message body message : "+replyCode);
        System.out.println("Description:"+replyText);
        System.out.println("Switch used by message exchange : "+exchange);
        System.out.println("Routing keys used by messages routing : "+routingKey);
    }
}

You also need to add a configuration in the configuration file

spring:
  rabbitmq:
    publisher-returns: true 

II Message consumption confirmation

2.1. How do message consumers inform Rabbit that message consumption is successful?

1. Confirm whether the Message is received correctly through ACK. Each Message must be acknowledged. You can go to ACK manually or automatically
Automatic confirmation will be confirmed immediately after the message is sent to the consumer, but there is a possibility of losing the message. If the consumer logic throws an exception, that is, the consumer fails to process the message successfully, it is equivalent to losing the message

2. If the message has been processed, but the subsequent code throws an exception and uses Spring for management, the consumer business logic will roll back, which also causes the loss of practical messages

3. If you manually confirm, when the consumer calls ACK, nack and reject to confirm, you can perform some operations after the business fails. If the message is not ACK, it will be sent to the next consumer

4. If a service forgets the ACK, RabbitMQ will not send data to it, because RabbitMQ believes that the processing capacity of the service is limited

5. The ACK mechanism can also limit the current, such as sleeping for a few seconds when a message is received

6. Message confirmation modes include:
AcknowledgeMode.NONE: automatic confirmation
AcknowledgeMode.AUTO: confirm according to the situation
AcknowledgeMode.MANUAL: manual confirmation

2.2. Acknowledgement message (local method processing message)

By default, message consumers automatically ack messages. If you want to ack manually, you need to change the confirmation mode to manual

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

Or start the manual ack in the RabbitListenerContainerFactory

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);             //Enable manual ack
    return factory;
}

confirmation message

@RabbitHandler
public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    System.out.println(message);
    try {
        channel.basicAck(tag,false);            // confirmation message
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Note that the basicAck method needs to pass two parameters

deliveryTag (unique ID): after a consumer registers with RabbitMQ, a Channel will be established, and RabbitMQ will use basic The delivery method pushes the message to the consumer. This method carries a delivery tag, which represents the unique ID of the message delivered by RabbitMQ to the Channel. It is a monotonically increasing positive integer. The scope of the delivery tag is limited to the Channel

multiple: in order to reduce network traffic, manual confirmation can be batch processed. When this parameter is true, delivery can be confirmed at one time_ All messages with tag less than or equal to the incoming value

2.3. Manually deny and reject messages

Test: Send a message containing the error attribute in the header

When the consumer gets the message and checks that the header contains error, it will nack the message (manually confirm the message and return the message to the queue)

@RabbitHandler
public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) {
    System.out.println(message);
    if (map.get("error")!= null){
        System.out.println("Error message");
        try {
            channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //Deny the news
            return;
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    try {
        channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //confirmation message
    } catch (IOException e) {
        e.printStackTrace();
    }
}

At this time, the console prints repeatedly, indicating that the message has been re queued and then re consumed after being nack

hello
 Error message
hello
 Error message
hello
 Error message
hello
 Error message

You can also reject the message. The message will be discarded and will not be returned to the queue

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);        //Reject message

2.4. Confirmation message (global processing message)

2.4.1 automatic confirmation

One problem involved is that if an exception is thrown when processing a message, the processing fails, but the message is deleted by Rabbit due to automatic confirmation, resulting in message loss

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");                 // Listening queue
    container.setAcknowledgeMode(AcknowledgeMode.NONE);     // NONE stands for automatic confirmation
    container.setMessageListener((MessageListener) message -> {         //Message listening and processing
        System.out.println("====Message received=====");
        System.out.println(new String(message.getBody()));
        //It is equivalent to throwing some mistakes in their own consumption logic
        throw new NullPointerException("consumer fail");
    });
    return container;
}

2.4.2 manual confirmation message

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");              // Listening queue
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);        // Manual confirmation
    container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {      //Message processing
        System.out.println("====Message received=====");
        System.out.println(new String(message.getBody()));
        if(message.getMessageProperties().getHeaders().get("error") == null){
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("The message has been confirmed");
        }else {
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("Message reject");
        }

    });
    return container;
}

In addition to NONE and MANUAL, the acknowledge mode also has AUTO. It will decide whether to confirm or reject (whether to re-enter the queue) according to the execution of the method

If the message is successfully consumed (success means that no exception is thrown in the process of consumption), it is automatically confirmed

When the AmqpRejectAndDontRequeueException exception is thrown, the message will be rejected and request = false (do not re-enter the queue)

When the ImmediateAcknowledgeAmqpException exception is thrown, the consumer will be confirmed

For other exceptions, the message will be rejected and request = true (if only one consumer listens to the queue at this time, there is a risk of life and death cycle, and multiple consumers will also cause a great waste of resources, which must be avoided in the development process). You can set it through setDefaultRequeueRejected (the default is true)

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("consumer_queue");              // Listening queue
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);     // Confirm the message according to the situation
    container.setMessageListener((MessageListener) (message) -> {
        System.out.println("====Message received=====");
        System.out.println(new String(message.getBody()));
        //Throw a NullPointerException exception and re-enter the queue
        //throw new NullPointerException("message consumption failed");
        //When the exception thrown is AmqpRejectAndDontRequeueException, the message will be rejected and request = false
        //throw new AmqpRejectAndDontRequeueException("message consumption failed");
        //When the ImmediateAcknowledgeAmqpException exception is thrown, the consumer will be confirmed
        throw new ImmediateAcknowledgeAmqpException("Message consumption failed");
    });
    return container;
}

Message reliability

Persistence

exchange should be persistent

The queue needs to be persistent

message to be persistent

Message confirmation

Start consumption return (@ ReturnList annotation, the producer can know which messages are not sent)

Message confirmation between producer and Server (broker)

Message confirmation between consumer and Server (broker)

The above summary is all personal and online experience summary. If there are similarities, please understand. Welcome to discuss technology, pay attention and continue to update later

Keywords: MQ Software development

Added by bob2588 on Wed, 02 Feb 2022 03:17:43 +0200