[RabbitMQ] message response -- ack mechanism

catalogue

Message response

concept

Automatic response

Method of message response

Multiple explanation

Manual response implementation

1. Prepare tools

2. Producers

3. Two consumers with different sleep time

4. Effect display

Messages automatically rejoin the queue

Effect demonstration:

Message response

concept

It may take some time for a consumer to complete a task. If one of the consumers processes a long task and only completes part of it, suddenly it hangs up, which will lead to message loss. Once RabbitMQ delivers a message to the consumer, it immediately marks the message as deleted. In this case, a consumer suddenly hangs up and we will lose the message being processed. And the subsequent message sent to the consumer because it cannot be received.

In order to ensure that the message is not lost in the sending process, rabbitmq introduces a message response mechanism. The message response is: after receiving and processing the message, the consumer tells rabbitmq that it has been processed, and rabbitmq can delete the message.

Automatic response

The message is considered to have been successfully transmitted immediately after it is sent

Disadvantages: if the connection or channel is closed on the consumer's side before the message is received, the message will be lost. On the other hand, in this mode, the consumer can deliver overloaded messages without limiting the number of messages delivered. Of course, this may make the consumer receive too many messages that are too late to process, This leads to the backlog of these messages, eventually running out of memory, and finally these consumer threads are killed by the operating system

Therefore, in actual development, we should choose manual response

Method of message response

Channel.basicAck(): used for positive confirmation

RabbitMQ has known the message and processed it successfully. It can be discarded

Channel.basicNack(): used for negative confirmation

Channel.basicReject(): used for negative confirmation (recommended)

Multiple explanation

The advantage of manual response is that it can respond in batches and reduce network congestion

true and false of multiple mean different things

true stands for batch response to unresponsive messages on the channel

For example, there are messages 5, 6, 7, and 8 transmitting tags on the channel. The current tag is 8

At this time, the unacknowledged messages in 5-8 will be confirmed to receive the message response

false compared with the above

Only messages 5, 6 and 7 with tag=8 will be answered, and the three messages will not be confirmed

Manual response implementation

The default message adopts automatic response, so we need to change the automatic response to manual response if we want to achieve no loss in the process of message consumption

1. Prepare tools

/**
 * Tool class for connecting factory to create channel
 */
public class RabbitMQUtil {
    public static Channel getChannel() throws Exception {
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set the IP address of the factory and connect to the queue of RabbitMQ
        factory.setHost("192.168.31.65");
        //Set user name
        factory.setUsername("admin");
        //password
        factory.setPassword("123");
        //Create connection
        Connection connection = factory.newConnection();
        //Acquisition channel
        return connection.createChannel();
    }
}
/**
 * Sleep: simulate the time required to perform business logic operations
 */
public class SleepUtil {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

2. Producers

/**
 * Messages will not be lost during manual response and will be re consumed after being put back in the queue
 */
public class Producer {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        //Open release confirmation
        channel.confirmSelect();
        //Declaration queue
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        //Enter information from the console
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("Producer sends message" + message);
        }
    }
}

3. Two consumers with different sleep time

/**
 * Messages will not be lost during manual response and will be re consumed after being put back in the queue
 */
public class Worker01 {
    //The name of the queue
    public static final String TASK_QUEUE_NAME = "ack_queue";

    //Accept message
    public static void main(String[] args) throws Exception {
        //Acquisition channel
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("Consumer 1: waiting for acceptance___Short time");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //Sleeping for 1 second, it takes a certain time to simulate the execution of code
            SleepUtil.sleep(1);
            System.out.println("Received message" + new String(message.getBody()));
            //Conduct manual response
            /*
             * 1.tag of the message, indicating the unique identification of the message
             * 2.Should batch response not be allowed to prevent message loss
             * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        //Execute the following when the message is cancelled
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "Message consumption interrupted");
        };
        //Consumers consume information
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, cancelCallback); //Use manual response (false)
    }
}
/**
 * Messages will not be lost during manual response and will be re consumed after being put back in the queue
 */
public class Worker02 {
    //The name of the queue
    public static final String TASK_QUEUE_NAME = "ack_queue";

    //Accept message
    public static void main(String[] args) throws Exception {
        //Acquisition channel
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("Consumer 2: waiting for acceptance___Longer time");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //Sleeping for 10 seconds, it takes some time to simulate the execution of code
            SleepUtil.sleep(10);
            System.out.println("Received message" + new String(message.getBody()));
            //Conduct manual response
            /*
             * 1.tag of the message, indicating the unique identification of the message
             * 2.Is batch response allowed to prevent message loss
             * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        //Execute the following when the message is cancelled
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "Message consumption interrupted");
        };
        //Consumers consume information
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, cancelCallback); //Use manual response (false)
    }
}

4. Effect display

Under normal circumstances, the message sender sends two messages, and consumers 1 and 2 receive and process the messages respectively

Messages automatically rejoin the queue

If the consumer loses the connection for some reason (its channel has been closed, the connection has been closed or the TCP connection has been lost), resulting in the message not sending ACK confirmation, RabbitMQ will understand that the message has not been fully processed and will queue it again. If other consumers can handle it at this time, it will soon redistribute it to another consumer. In this way, even if a consumer dies occasionally, it can ensure that no message is lost.  

Effect demonstration:

Stop consumer 2 (sleep longer) in the above code on the way of consuming the message

The sender sends a message and stops consumer 2 after sending the message. It is reasonable for consumer 2 to process the message. However, due to its long processing time, consumer 2 is stopped before it is processed, that is, consumer 2 has not executed the ack code. At this time, it will see that the message has been received by consumer 1 , indicating that the message has been re queued, Then it is assigned to the consumer 1 who can process the message

 

Keywords: Java RabbitMQ Distribution

Added by Blockis on Mon, 31 Jan 2022 12:00:41 +0200