RabbitMQ pull mode mass consumption message

Consumers who implement RabbitMQ have two modes: Push and Pull.
The recommended way to implement push mode is to inherit DefaultConsumer base class, or use Spring AMQP's SimpleMessageListenerContainer.
Push mode is the most commonly used, but it is not applicable in some cases, such as:
Due to certain restrictions, consumers can only consume messages when certain conditions are met
Need to pull messages in batch for processing
Implement pull mode
RabbitMQ's Channel provides the basicGet method for pulling messages.

/**
 * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
 * @see com.rabbitmq.client.AMQP.Basic.Get
 * @see com.rabbitmq.client.AMQP.Basic.GetOk
 * @see com.rabbitmq.client.AMQP.Basic.GetEmpty
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @return a {@link GetResponse} containing the retrieved message data
 * @throws java.io.IOException if an error is encountered
 */
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

basicGet returns the GetResponse class.

public class GetResponse {
    private final Envelope envelope;
    private final BasicProperties props;
    private final byte[] body;
    private final int messageCount;

    // ...

Rabbitmq client version 4.0.3
When using basicGet to pull messages, please note:

basicGet
DefaultConsumer

Example code:

private void consume(Channel channel) throws IOException, InterruptedException {
    while (true) {
        if (!isConditionSatisfied()) {
            TimeUnit.MILLISECONDS.sleep(1);
            continue;
        }
        GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);
        if (response == null) {
            TimeUnit.MILLISECONDS.sleep(1);
            continue;
        }
        String data = new String(response.getBody());
        logger.info("Get message <= {}", data);
        channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
    }
}

Bulk pull message
RabbitMQ supports the client to pull messages in batches. The client can call basicGet method continuously to pull multiple messages. After processing, the client can receive one-time ACK. Note:

basicGet
basicAck

Example code:

String bridgeQueueName = extractorProperties.getBridgeQueueName();
int batchSize = extractorProperties.getBatchSize();
List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
long tag = 0;
while (responseList.size() < batchSize) {
    GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
    if (getResponse == null) {
        break;
    }
    responseList.add(getResponse);
    tag = getResponse.getEnvelope().getDeliveryTag();
}
if (responseList.isEmpty()) {
    TimeUnit.MILLISECONDS.sleep(1);
} else {
    logger.info("Get <{}> responses this batch", responseList.size());
    // handle messages
    channel.basicAck(tag, true);
}

About QueueingConsumer
QueueingConsumer uses BlockingQueue to buffer messages locally on the client side, and its nextDelivery method can also be used to implement pull mode (essentially BlockingQueue.take), but QueueingConsumer is now marked as Deprecated.

Keywords: Java RabbitMQ Spring

Added by dinosoup on Mon, 09 Dec 2019 07:25:57 +0200