catalogue
Manual response implementation
3. Two consumers with different sleep time
Messages automatically rejoin the queue
Message response
concept
It may take some time for a consumer to complete a task. If one of the consumers processes a long task and only completes part of it, suddenly it hangs up, which will lead to message loss. Once RabbitMQ delivers a message to the consumer, it immediately marks the message as deleted. In this case, a consumer suddenly hangs up and we will lose the message being processed. And the subsequent message sent to the consumer because it cannot be received.
In order to ensure that the message is not lost in the sending process, rabbitmq introduces a message response mechanism. The message response is: after receiving and processing the message, the consumer tells rabbitmq that it has been processed, and rabbitmq can delete the message.
Automatic response
The message is considered to have been successfully transmitted immediately after it is sent
Disadvantages: if the connection or channel is closed on the consumer's side before the message is received, the message will be lost. On the other hand, in this mode, the consumer can deliver overloaded messages without limiting the number of messages delivered. Of course, this may make the consumer receive too many messages that are too late to process, This leads to the backlog of these messages, eventually running out of memory, and finally these consumer threads are killed by the operating system
Therefore, in actual development, we should choose manual response
Method of message response
Channel.basicAck(): used for positive confirmation
RabbitMQ has known the message and processed it successfully. It can be discarded
Channel.basicNack(): used for negative confirmation
Channel.basicReject(): used for negative confirmation (recommended)
Multiple explanation
The advantage of manual response is that it can respond in batches and reduce network congestion
true and false of multiple mean different things
true stands for batch response to unresponsive messages on the channel
For example, there are messages 5, 6, 7, and 8 transmitting tags on the channel. The current tag is 8
At this time, the unacknowledged messages in 5-8 will be confirmed to receive the message response
false compared with the above
Only messages 5, 6 and 7 with tag=8 will be answered, and the three messages will not be confirmed
Manual response implementation
The default message adopts automatic response, so we need to change the automatic response to manual response if we want to achieve no loss in the process of message consumption
1. Prepare tools
/** * Tool class for connecting factory to create channel */ public class RabbitMQUtil { public static Channel getChannel() throws Exception { //Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //Set the IP address of the factory and connect to the queue of RabbitMQ factory.setHost("192.168.31.65"); //Set user name factory.setUsername("admin"); //password factory.setPassword("123"); //Create connection Connection connection = factory.newConnection(); //Acquisition channel return connection.createChannel(); } }
/** * Sleep: simulate the time required to perform business logic operations */ public class SleepUtil { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
2. Producers
/** * Messages will not be lost during manual response and will be re consumed after being put back in the queue */ public class Producer { public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtil.getChannel(); //Open release confirmation channel.confirmSelect(); //Declaration queue channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); //Enter information from the console Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer sends message" + message); } } }
3. Two consumers with different sleep time
/** * Messages will not be lost during manual response and will be re consumed after being put back in the queue */ public class Worker01 { //The name of the queue public static final String TASK_QUEUE_NAME = "ack_queue"; //Accept message public static void main(String[] args) throws Exception { //Acquisition channel Channel channel = RabbitMQUtil.getChannel(); System.out.println("Consumer 1: waiting for acceptance___Short time"); DeliverCallback deliverCallback = (consumerTag, message) -> { //Sleeping for 1 second, it takes a certain time to simulate the execution of code SleepUtil.sleep(1); System.out.println("Received message" + new String(message.getBody())); //Conduct manual response /* * 1.tag of the message, indicating the unique identification of the message * 2.Should batch response not be allowed to prevent message loss * */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; //Execute the following when the message is cancelled CancelCallback cancelCallback = consumerTag -> { System.out.println(consumerTag + "Message consumption interrupted"); }; //Consumers consume information channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, cancelCallback); //Use manual response (false) } }
/** * Messages will not be lost during manual response and will be re consumed after being put back in the queue */ public class Worker02 { //The name of the queue public static final String TASK_QUEUE_NAME = "ack_queue"; //Accept message public static void main(String[] args) throws Exception { //Acquisition channel Channel channel = RabbitMQUtil.getChannel(); System.out.println("Consumer 2: waiting for acceptance___Longer time"); DeliverCallback deliverCallback = (consumerTag, message) -> { //Sleeping for 10 seconds, it takes some time to simulate the execution of code SleepUtil.sleep(10); System.out.println("Received message" + new String(message.getBody())); //Conduct manual response /* * 1.tag of the message, indicating the unique identification of the message * 2.Is batch response allowed to prevent message loss * */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; //Execute the following when the message is cancelled CancelCallback cancelCallback = consumerTag -> { System.out.println(consumerTag + "Message consumption interrupted"); }; //Consumers consume information channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, cancelCallback); //Use manual response (false) } }
4. Effect display
Under normal circumstances, the message sender sends two messages, and consumers 1 and 2 receive and process the messages respectively
Messages automatically rejoin the queue
If the consumer loses the connection for some reason (its channel has been closed, the connection has been closed or the TCP connection has been lost), resulting in the message not sending ACK confirmation, RabbitMQ will understand that the message has not been fully processed and will queue it again. If other consumers can handle it at this time, it will soon redistribute it to another consumer. In this way, even if a consumer dies occasionally, it can ensure that no message is lost.
Effect demonstration:
Stop consumer 2 (sleep longer) in the above code on the way of consuming the message
The sender sends a message and stops consumer 2 after sending the message. It is reasonable for consumer 2 to process the message. However, due to its long processing time, consumer 2 is stopped before it is processed, that is, consumer 2 has not executed the ack code. At this time, it will see that the message has been received by consumer 1 , indicating that the message has been re queued, Then it is assigned to the consumer 1 who can process the message