RabbitMQ tutorial implements publishing confirmation PublishConfirm

Publisher confirms is an extension of RabbitMQ to realize reliable publishing. When publisher confirmation is enabled on the channel, if the messages published by the client are asynchronously confirmed by the proxy, it means that they have been processed on the server. In this tutorial, we will use publisher confirms to ensure that published messages have reached the agent safely. We will introduce several strategies for using publisher confirms and explain their advantages and disadvantages

RabitMQ installation
How to install: https://blog.csdn.net/Beijing_L/article/details/119042261

Launch the release confirmation function of Channel

Release confirmation is RabbitMQ for amqp0 9.1 protocol extension, so this function is not enabled by default. This function can be enabled through the confirmSelect method. After the pipeline is established, this function can be enabled without processing every message. For example:

Channel ch = connection.createChannel();
ch.confirmSelect();

Method signature

/**
* Enables publisher acknowledgements on this channel. (Whether the release confirmation function is enabled for the pipeline)
* @see com.rabbitmq.client.AMQP.Confirm.Select
* @throws java.io.IOException if an error is encountered
*/
Confirm.SelectOk confirmSelect() throws IOException;

Strategy 1: individual message release confirmation

Let's start with the simplest way to understand release confirmation. Refer to the following code

Note that the following code some jdk1 Characteristics of 7

  • Int value 5_ 000 equals 5000 JDK1. A new feature is added in 7. You can add underscores between values in int to improve readability
  • Try statement (try with resource). When the try block exits, it will automatically call the close method to close the resource
try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);

            ch.confirmSelect();//Development release confirmation
            long start = System.nanoTime();
            //Loop send message
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                ch.basicPublish("", queue, null, body.getBytes());
                //Waiting for confirmation, of which 5_ 000 is jdk1 7 characteristics
                ch.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }

After creating a Channel, use confirmSelect to enable the publish confirmation function, and use the waitForConfirmsOrDie method to wait for confirmation. Once the message is confirmed by the RabbitMQ agent, the method will return. If the message is not confirmed within the timeout, Or the message is NACK ed (negative means that the RabbitMQ agent cannot handle it for some reason). This method will throw an exception. After the exception occurs, the error message is usually recorded or sent again.

/** Wait until all messages published since the last call have been either ack'd or nack'd by the broker; or until timeout elapses.
* Wait after the last message release until all messages released are confirmed or denied by the agent; Or until it times out
* If the timeout expires a TimeoutException is thrown.
* Timeout throw TimeoutException exception
* If any of the messages were nack'd, waitForConfirmsOrDie will throw an IOException.
* If the message is rejected, an IOException exception will be thrown
*When called on a non-Confirm channel, it will throw an IllegalStateException.
* When the method is called, the channel does not open the release confirmation and throws an IllegalStateException
* @throws java.lang.IllegalStateException
*/
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

This technology is very simple, but it also has a major disadvantage: it significantly reduces the speed of release. Because waitForConfirmsOrDie will wait until the message is confirmed after execution. It is a synchronous Helper. When waiting for a message, it will prevent all subsequent messages from being published. The throughput of this method will not exceed hundreds of messages per second. But it's enough for some applications

Strategy 2: batch message publishing confirmation

The example of improvement strategy 1 provides a way to send messages in batches. Send messages in batches and wait for confirmation after the whole batch is sent

try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();
            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);
            ch.confirmSelect();

            int batchSize = 100;
            int outstandingMessageCount = 0;

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                ch.basicPublish("", queue, null, body.getBytes());
                outstandingMessageCount++;

                if (outstandingMessageCount == batchSize) {
                    ch.waitForConfirmsOrDie(5_000);
                    outstandingMessageCount = 0;
                }
            }

            if (outstandingMessageCount > 0) {
                ch.waitForConfirmsOrDie(5_000);
            }
            long end = System.nanoTime();
            System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }

Waiting for a batch of messages to be sent and then confirmed greatly improves the throughput, so there is no need to wait for the confirmation of a single message (for remote RabbitMQ nodes, you can wait up to 20-30 times). The disadvantage is: if the sending fails, we don't know what error happened, so we may need to keep a whole batch in memory to record some meaningful information or republish the whole batch of messages. This solution is still synchronous, so it still prevents subsequent message publishing and processing

Strategy 3: asynchronous publish confirmation

When the agent asynchronously confirms the published message, it only needs to register a callback at the client and get the notification of these confirmations through the callback. There are two callback methods. The first method is the method to complete the confirmation, and the second method is the method rejected by RabbitMQ agent, which is generally lost. The simple code is as follows

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
    // code when message is confirmed
}, (sequenceNumber, multiple) -> {
    // code when message is nack-ed
});

Let's first look at the addConfirmListener interface signature. Its parameters are composed of two callback interfaces ConfirmCallback

    /**
     * Add a lambda-based {@link ConfirmListener}.
     * @see ConfirmListener
     * @see ConfirmCallback
     * @param ackCallback callback on ack
     * @param nackCallback call on nack (negative ack)
     * @return the listener that wraps the callbacks
     */
    ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);

Let's look at the definition of the callback interface: the ConfirmCallback callback has two parameters, the first is the serial number deliveryTag, and the second is a Boolean value

public interface ConfirmCallback {

    void handle(long deliveryTag, boolean multiple) throws IOException;

}
  • deliveryTag parameter: that is, the sequenceNumber in listening. The serial number identifies the number of confirmation or negative messages. The serial number can be generated through Channel#getNextPublishSeqNo()
  • multiple parameter: a Boolean value. If false, only one message will be confirmed / denied. If true, all messages with lower or equal serial numbers will be confirmed / denied

The simple usage of the sequence is to put it into the MAP, where KEY = sequence number and VALUE = message. After sending the message, the sequence number and message are stored in the MAP at the same time. When the message is sent and confirmed, it is removed from the MAP through the sequence. Finally, when both are removed, it means that the messages are sent successfully,

The method of using MAP is relatively simple and clear. It directly associates the serial number with the message, and it is easier to clear the entries of a given sequence ID. It supports concurrent access, because the confirm callback is invoked in the thread owned by the client library and should be different from the publishing thread. In addition to using a complex mapping implementation, there are other methods to track outstanding confirmations, such as using a simple concurrent hash mapping and a variable to track the lower bound of the release sequence, but they are usually more complex and do not belong to the tutorial.

In summary, asynchronous processing of publisher validation typically requires the following steps:

  • Provides a method of associating a publication serial number with a message. It can be MAP or other customers
  • Register an acknowledgement listener on the channel to be notified when the publisher's acknowledgement / negative acknowledgement arrives, so as to perform appropriate actions, such as recording or republishing negative acknowledgement messages. The correlation mechanism of serial number to message may also need some cleaning in this step.
  • Track the publication serial number before publishing a message.

The complete reference code is as follows

try (Connection connection = createConnection()) {
            Channel ch = connection.createChannel();

            String queue = UUID.randomUUID().toString();
            ch.queueDeclare(queue, false, false, true, null);
            ch.confirmSelect();//Open release confirmation
            //Define Map to establish sequence and message relationship
            ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
            //Define successful callback function
            ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //multiple=false all previous messages are removed
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                            sequenceNumber, true
                    );
                    confirmed.clear();
                } else {
                    //multiple=false only clears itself, so successful messages are removed from the map
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            //The addConfirmListener method has two parameters. The first is successful confirmation. This callback has been defined above. The second parameter is proxy veto, that is, confirmation fails
            ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
                String body = outstandingConfirms.get(sequenceNumber);
                System.err.format(
                        "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
                        body, sequenceNumber, multiple
                );
                cleanOutstandingConfirms.handle(sequenceNumber, multiple);
            });

            long start = System.nanoTime();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String body = String.valueOf(i);
                //Generate the sequence and save the message and sequence relationship in MAP
                outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
                ch.basicPublish("", queue, null, body.getBytes());
            }

            if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
                throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
            }

            long end = System.nanoTime();
            System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
        }

Article summary

In applications, it is important to ensure that published messages reach the RabbitMQ agent. Publisher validation is a feature of RabbitMQ to help meet this requirement. Publisher confirms are asynchronous in nature, but they can also be processed synchronously. There is no clear way to implement publisher confirms, which is usually due to limitations in the application and the whole system. Typical technologies are:

  • Publish messages separately and wait for confirmation synchronously: simple, but the throughput is very limited.
  • Publish messages in batches and wait for confirmation to synchronize a batch: simple and reasonable throughput, but it is difficult to reason when there is a problem.
  • Asynchronous processing: the best performance and use of resources. It is well controlled in case of errors, but it can be implemented correctly.

Previous: RabbitMQ tutorial remote procedure call RPC

Keywords: Java RabbitMQ message queue

Added by FredFredrickson2 on Thu, 23 Dec 2021 13:45:15 +0200