summary
By default, if a Message is correctly received by the consumer, it will be removed from the Queue
If a Queue is not subscribed by any consumers, the messages in the Queue will be cached. When a consumer subscribes, they will be sent immediately. When the Message is correctly received by the consumer, it will be removed from the Queue
I Message delivery confirmation
1.1. What is a failure or success in sending a message? How to confirm?
When the message cannot be routed to the queue, the confirmation message routing fails. When the message is successfully routed, confirm the message after all the queues to be sent are successfully sent. For the persistent queue, it means writing to the disk, and for the image queue, it means that all images are successfully received
1.2.ConfirmCallback
By implementing the ConfirmCallback interface, a callback is triggered after the message is sent to the Broker to confirm whether the message reaches the Broker server, that is, only whether it reaches the Exchange server correctly
@Component public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); //Specify ConfirmCallback } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("Message unique ID:"+correlationData); System.out.println("Confirmation result:"+ack); System.out.println("Failure reason:"+cause); }
You also need to add a configuration in the configuration file
spring: rabbitmq: publisher-returns: true
1.3.ReturnCallback
By implementing the ReturnCallback interface, the startup message fails to return, such as triggering a callback when the route cannot reach the queue
@Component public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnCallback(this); //Specify ReturnCallback } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("Message body message : "+message); System.out.println("Message body message : "+replyCode); System.out.println("Description:"+replyText); System.out.println("Switch used by message exchange : "+exchange); System.out.println("Routing keys used by messages routing : "+routingKey); } }
You also need to add a configuration in the configuration file
spring: rabbitmq: publisher-returns: true
II Message consumption confirmation
2.1. How do message consumers inform Rabbit that message consumption is successful?
1. Confirm whether the Message is received correctly through ACK. Each Message must be acknowledged. You can go to ACK manually or automatically
Automatic confirmation will be confirmed immediately after the message is sent to the consumer, but there is a possibility of losing the message. If the consumer logic throws an exception, that is, the consumer fails to process the message successfully, it is equivalent to losing the message
2. If the message has been processed, but the subsequent code throws an exception and uses Spring for management, the consumer business logic will roll back, which also causes the loss of practical messages
3. If you manually confirm, when the consumer calls ACK, nack and reject to confirm, you can perform some operations after the business fails. If the message is not ACK, it will be sent to the next consumer
4. If a service forgets the ACK, RabbitMQ will not send data to it, because RabbitMQ believes that the processing capacity of the service is limited
5. The ACK mechanism can also limit the current, such as sleeping for a few seconds when a message is received
6. Message confirmation modes include:
AcknowledgeMode.NONE: automatic confirmation
AcknowledgeMode.AUTO: confirm according to the situation
AcknowledgeMode.MANUAL: manual confirmation
2.2. Acknowledgement message (local method processing message)
By default, message consumers automatically ack messages. If you want to ack manually, you need to change the confirmation mode to manual
spring: rabbitmq: listener: simple: acknowledge-mode: manual
Or start the manual ack in the RabbitListenerContainerFactory
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //Enable manual ack return factory; }
confirmation message
@RabbitHandler public void processMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) { System.out.println(message); try { channel.basicAck(tag,false); // confirmation message } catch (IOException e) { e.printStackTrace(); } }
Note that the basicAck method needs to pass two parameters
deliveryTag (unique ID): after a consumer registers with RabbitMQ, a Channel will be established, and RabbitMQ will use basic The delivery method pushes the message to the consumer. This method carries a delivery tag, which represents the unique ID of the message delivered by RabbitMQ to the Channel. It is a monotonically increasing positive integer. The scope of the delivery tag is limited to the Channel
multiple: in order to reduce network traffic, manual confirmation can be batch processed. When this parameter is true, delivery can be confirmed at one time_ All messages with tag less than or equal to the incoming value
2.3. Manually deny and reject messages
Test: Send a message containing the error attribute in the header
When the consumer gets the message and checks that the header contains error, it will nack the message (manually confirm the message and return the message to the queue)
@RabbitHandler public void processMessage2(String message, Channel channel,@Headers Map<String,Object> map) { System.out.println(message); if (map.get("error")!= null){ System.out.println("Error message"); try { channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //Deny the news return; } catch (IOException e) { e.printStackTrace(); } } try { channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //confirmation message } catch (IOException e) { e.printStackTrace(); } }
At this time, the console prints repeatedly, indicating that the message has been re queued and then re consumed after being nack
hello Error message hello Error message hello Error message hello Error message
You can also reject the message. The message will be discarded and will not be returned to the queue
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //Reject message
2.4. Confirmation message (global processing message)
2.4.1 automatic confirmation
One problem involved is that if an exception is thrown when processing a message, the processing fails, but the message is deleted by Rabbit due to automatic confirmation, resulting in message loss
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("consumer_queue"); // Listening queue container.setAcknowledgeMode(AcknowledgeMode.NONE); // NONE stands for automatic confirmation container.setMessageListener((MessageListener) message -> { //Message listening and processing System.out.println("====Message received====="); System.out.println(new String(message.getBody())); //It is equivalent to throwing some mistakes in their own consumption logic throw new NullPointerException("consumer fail"); }); return container; }
2.4.2 manual confirmation message
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("consumer_queue"); // Listening queue container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Manual confirmation container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { //Message processing System.out.println("====Message received====="); System.out.println(new String(message.getBody())); if(message.getMessageProperties().getHeaders().get("error") == null){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("The message has been confirmed"); }else { //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); System.out.println("Message reject"); } }); return container; }
In addition to NONE and MANUAL, the acknowledge mode also has AUTO. It will decide whether to confirm or reject (whether to re-enter the queue) according to the execution of the method
If the message is successfully consumed (success means that no exception is thrown in the process of consumption), it is automatically confirmed
When the AmqpRejectAndDontRequeueException exception is thrown, the message will be rejected and request = false (do not re-enter the queue)
When the ImmediateAcknowledgeAmqpException exception is thrown, the consumer will be confirmed
For other exceptions, the message will be rejected and request = true (if only one consumer listens to the queue at this time, there is a risk of life and death cycle, and multiple consumers will also cause a great waste of resources, which must be avoided in the development process). You can set it through setDefaultRequeueRejected (the default is true)
@Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("consumer_queue"); // Listening queue container.setAcknowledgeMode(AcknowledgeMode.AUTO); // Confirm the message according to the situation container.setMessageListener((MessageListener) (message) -> { System.out.println("====Message received====="); System.out.println(new String(message.getBody())); //Throw a NullPointerException exception and re-enter the queue //throw new NullPointerException("message consumption failed"); //When the exception thrown is AmqpRejectAndDontRequeueException, the message will be rejected and request = false //throw new AmqpRejectAndDontRequeueException("message consumption failed"); //When the ImmediateAcknowledgeAmqpException exception is thrown, the consumer will be confirmed throw new ImmediateAcknowledgeAmqpException("Message consumption failed"); }); return container; }
Message reliability
Persistence
exchange should be persistent
The queue needs to be persistent
message to be persistent
Message confirmation
Start consumption return (@ ReturnList annotation, the producer can know which messages are not sent)
Message confirmation between producer and Server (broker)
Message confirmation between consumer and Server (broker)
The above summary is all personal and online experience summary. If there are similarities, please understand. Welcome to discuss technology, pay attention and continue to update later