RabbitMQ Application Demo: supports hot plug of multiple consumer instances

Demand background

There are several reasons why multiple consumer instances need to be deployed in the project:

  1. Based on the reliability improvement requirements of the system. Especially for the container deployment scheme based on micro service architecture, multi instance deployment of micro service is the basic requirement to ensure the reliability of the system.
  2. The demand of system load sharing can improve the overall response efficiency of the system through multi instance deployment.

The coping scenarios include: 1) starting multiple consumer instances at the same time, which can process MQ dispatched messages in parallel. 2) When one of the consumer instances hangs up and restarts, it can continue to process the messages in the message queue. 3) Dynamic deployment adds a consumer instance, which can be immediately involved in the process of receiving and processing messages.

Scheme analysis

To meet the application scenarios described above, there are two points to deal with: 1) modify the automatic confirmation message to active confirmation. 2) The number of "message prefetches" is set to 1

1. Modify the automatic confirmation message to active confirmation
Message confirmation is that the consumer informs the MQ server that the received message has been processed. After receiving the message, confirm that if the consumer hangs up during processing, these messages will be lost and will not be allocated to other consumers for processing.

It is modified that after the consumer has processed all the business logic, it can actively send a confirmation message to the MQ service, which can ensure that the message is not lost.

2. Set the number of "message prefetching" to 1
The MQ server distributes the received message "fairly" to the consumers listening to the same message queue, and does not know the resource consumption required for message processing. This may cause one consumer instance to deal with heavy tasks all the time and the other to deal with light tasks all the time. Tired to death, idle to death.

"Message prefetch quantity is set to 1" means that consumers will not assign the next task until they have processed one task, and everyone has the opportunity to take the lead. We should shoulder the heavy burden.

Demo sample code

  1. Consumption message method
    /**
     * Consumption news
     */
    public void consumeMessage() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.*.*.*");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("******");

        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // Only one message processing is prefetched at a time
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);

            // No automatic confirmation, active confirmation after consumption
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, new MQMessageHandler(channel), tag -> {
            });
        } catch (Exception e) {
            LOGGER.error("Consume message exception.", e);
        }
    }
  1. Message processing class
package com.elon.consumer.service;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * MQ Message consumption processing class
 *
 * @author elon
 * @since 2022-03-06
 */
public class MQMessageHandler implements DeliverCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQMessageHandler.class);

    private Channel channel;

    public MQMessageHandler(Channel channel) {
        this.channel = channel;
    }

    @Override
    public void handle(String s, Delivery delivery) throws IOException {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        LOGGER.info("Receive message from queue:{}", message);
        try {
            Thread.sleep(3 * 1000);
            long tag = delivery.getEnvelope().getDeliveryTag();

            // Take the initiative to confirm after handling the task
            channel.basicAck(tag, false);
        } catch (InterruptedException e) {
            LOGGER.error("Handle error.", e);
        }
    }
}

Keywords: Java RabbitMQ Spring Boot message queue

Added by cmaclennan on Sun, 06 Mar 2022 12:55:33 +0200