RabbitMQ message reply

Message response

concept

It may take some time for a consumer to complete a task. What happens if one of the consumers processes a long task and only completes part of it, and suddenly it hangs up. Once RabbitMQ delivers a message to the consumer, it immediately marks the message for deletion. In this case, a consumer suddenly hangs up, and we will lose the message being processed. And the subsequent message sent to the consumer because it cannot be received.
In order to ensure that the message is not lost during sending, rabbitmq introduces a message response mechanism. The message response is: after receiving and processing the message, the consumer tells rabbitmq that it has been processed, and rabbitmq can delete the message.

Automatic response

The message is considered to have been successfully transmitted immediately after it is sent. This mode requires a trade-off between high throughput and data transmission security, because in this mode, if the connection or channel is closed on the consumer's side before the message is received, the message will be lost. On the other hand, of course, this mode can deliver overloaded messages on the consumer's side, There is no limit on the number of messages delivered. Of course, this may cause consumers to receive too many messages that are too late to process, resulting in the backlog of these messages, eventually running out of memory, and finally these consumer threads are killed by the operating system, Therefore, this model is only applicable when consumers can process these messages efficiently and at a certain rate.

Method of message response

  1. Channel. Basicack (for positive confirmation)
    RabbitMQ knows the message and processes it successfully. It can be discarded
  2. Channel. Basicnack (for negative confirmation)
  3. .Channel. Basicreject (for negative confirmation)
    And channel One less parameter than basicnack
    If the message is rejected without processing, it can be discarded

Multiple explanation

The advantage of manual response is that it can respond in batches and reduce network congestion

true and false of multiple mean different things

  • true means to batch respond to messages that are not answered on the channel
    For example, if there are messages 5, 6, 7, and 8 on the channel that transmit the tag, and the current tag is 8, then the unacknowledged messages of 5-8 will be confirmed to receive the message response
  • false compared with the above
    Only messages 5, 6 and 7 with tag=8 will be answered, and the three messages will not be confirmed

Message auto rejoin

If the consumer loses the connection for some reason (its channel has been closed, the connection has been closed or the TCP connection has been lost), resulting in the message not sending ACK confirmation, RabbitMQ will understand that the message has not been fully processed and will queue it again. If other consumers can handle it at this time, it will soon redistribute it to another consumer. In this way, even if a consumer dies occasionally, it can ensure that no message is lost

Message manual response code

The default message adopts automatic response, so if we want to realize that the message is not lost in the process of consumption, we need to change the automatic response to manual response. The consumer adds the code drawn in red below on the basis of the above code.

Message producer

public class Task02 {
 private static final String TASK_QUEUE_NAME = "ack_queue";
 public static void main(String[] argv) throws Exception {
 try (Channel channel = RabbitMqUtils.getChannel()) {
 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
 Scanner sc = new Scanner(System.in);
 System.out.println("Please enter information");
 while (sc.hasNext()) {
 String message = sc.nextLine();
 channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
 System.out.println("Producer sends message" + message);
 }
 }
 }
}

Consumer 01

public class Work03 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("C1 Short waiting time for receiving messages");
 //How to process messages when consuming messages
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 SleepUtils.sleep(1);
 System.out.println("Message received:"+message);
 /**
 * 1.Message tag
 * 2.Whether to answer unanswered messages in batch
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 //Manual response is adopted
 boolean autoAck=false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 System.out.println(consumerTag+"Consumer cancels consumer interface callback logic");
 });
 }
}

Consumer 02

public class Work04 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("C2 Long waiting time for receiving messages");
 //How to process messages when consuming messages
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 SleepUtils.sleep(30);
 System.out.println("Message received:"+message);
 /**
 * 1.Message tag
 * 2.Whether to answer unanswered messages in batch
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 //Manual response is adopted
 boolean autoAck=false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 System.out.println(consumerTag+"Consumer cancels consumer interface callback logic");
 });
 }
}

Sleep tools

public class SleepUtils {
 public static void sleep(int second){
 try {
 Thread.sleep(1000*second);
 } catch (InterruptedException _ignored) {
 Thread.currentThread().interrupt();
 }
 }
}

Manual response effect demonstration

Under normal circumstances, the message sender sends two messages C1 and C2, receives and processes the messages respectively

After the sender sends the message dd, the C2 consumer is stopped. Normally, the C2 processes the message. However, because it takes a long time to process it, C2 is stopped before it is finished, that is, C2 has not executed the ack code. At this time, it will be seen that the message is received by C1, indicating that the message dd is re queued, It is then allocated to C1 that can process messages

Keywords: Java RabbitMQ Spring Boot Network Protocol

Added by PHPHorizons on Sun, 16 Jan 2022 17:57:20 +0200