RabbitMQ message response mechanism
1.1 concept
It may take some time for a consumer to complete a task. What happens if one of the consumers handles a long task and suddenly hangs him up halfway. Once RabbitMQ sends a message to the consumer, it immediately marks the message as deleted. In this case, a consumer suddenly hangs up and we will lose the message being processed. And subsequent messages sent to the consumer will not be received.
In order to ensure that the message is not lost in the process of sending, rabbitmq introduces the message response mechanism. The message response is that after the consumer receives and processes the message, he tells rabbitmq that it has been processed, and rabbitmq can delete the message.
1.2 automatic response
The message is considered to have been successfully transmitted immediately after it is sent. This mode needs to make a trade-off between high throughput and data transmission security, because in this mode, if the connection or channel is closed on the consumer side before the message is received, the message will be lost. On the other hand, of course, this mode can deliver overloaded messages on the consumer side, There is no limit on the number of messages delivered. Of course, this may cause consumers to receive too many messages that are too late to process, resulting in the backlog of these messages, eventually running out of memory, and finally these consumer threads are killed by the operating system, Therefore, this model is only applicable when consumers can process these messages efficiently and at a certain rate
1.3 automatic response method
- Channel.basicAck (for positive confirmation)
RabbitMQ has known the message and processed it successfully. It can be discarded - Channel.basicNack (for negative confirmation)
- Channel.basicReject (for negative confirmation)
And channel Basicnack has one less parameter. It does not process the message. It is rejected directly and can be discarded
1.4 interpretation of multiple
***The advantage of manual response is that it can respond in batches and reduce network congestion***
true and false of multiple mean different things
- true stands for batch response to unresponsive messages on the channel
For example, if there are messages 5, 6, 7, and 8 transmitting tags on the channel, and the current tag is 8, then at this time
The unacknowledged messages in 5-8 will be confirmed to receive the message response - false compared with the above
Only messages 5, 6 and 7 with tag=8 will be answered, and the three messages will not be confirmed
1.5 messages automatically rejoin the team
If the consumer loses the connection for some reason (its channel has been closed, the connection has been closed or the TCP connection has been lost), resulting in the message not sending ACK confirmation, RabbitMQ will understand that the message has not been fully processed and will queue it again. If other consumers can handle it at this time, it will soon redistribute it to another consumer. In this way, even if a consumer dies occasionally, it can ensure that no message is lost
1.6 message manual response code
The default message adopts automatic response, so we need to change the automatic response to manual response in order to realize that the message is not lost in the process of consumption. The consumer adds the following code on the basis of the above code.
Tool class
public class RabbitMQUtils { public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.3.17"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
producer
public class Producer { public final static String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //Enter information from the console Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("Producer sends message:"+message); } } }
Consumer A
public class ConsumerA { public final static String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("consumer C1 Waiting to receive messages, short processing time...."); //Adopt manual response mechanism boolean aotoAck = false; channel.basicConsume(QUEUE_NAME,aotoAck, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try { TimeUnit.SECONDS.sleep(1); System.out.println("Received message:"+new String(delivery.getBody())); //Manual answer code /** * Parameter 1: tag indicating the message * Parameter 2: batch response */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("Message receiving failed...."); } }); } }
Consumer B
public class ConsumerB { public final static String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("consumer C2 Waiting to receive messages, long processing time...."); //Adopt manual response mechanism boolean aotoAck = false; channel.basicConsume(QUEUE_NAME,aotoAck, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try { TimeUnit.SECONDS.sleep(30); System.out.println("Received message:"+new String(delivery.getBody())); //Manual answer code /** * Parameter 1: tag indicating the message * Parameter 2: batch response */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { System.out.println("Message receiving failed...."); } }); } }
1.7 demonstration of manual response effect
Under normal circumstances, the message sender sends two messages C1 and C2, receives and processes the messages respectively
When the sender sends the message dd, the C2 consumer is stopped after sending the message, which is supposed to process the message. However, due to its long processing time, C2 is stopped before it is processed, that is, C2 has not executed the ack code. At this time, it will be seen that the message is received by C1, indicating that the message dd is re queued and then allocated to C1 that can process the message