Reliability guarantee of rabbitmq message publishing

 

In the design of RabbitMQ, producers and consumers are deliberately "decoupled", that is, the release of news and the consumption of news are decoupled.
In RabbitMQ, there are different delivery mechanisms (producers), but each mechanism has a certain impact on performance. Generally speaking, the reliability of fast speed is low, and the performance of good reliability is poor. What's the specific situation
The usage needs to be determined according to your application, so there is no best way, only the most appropriate way.

 

Impact of various reliability mechanisms on Performance

Next, we introduce these delivery mechanisms

1. Unsecured mode

Publish your message through basic publish and use the correct switch and routing information. Your message will be received and sent to the appropriate server
In the queue. However, if there are network problems, or messages cannot be routed, or RabbitMQ itself has problems, this method has risks. Therefore, message sending without guarantee is generally not recommended.

2. Failure confirmation mode

 

Failure confirmation diagram

When sending a message, set the mandatory flag to tell RabbitMQ that if the message is not routable, it should be returned to the sender and notify the sender of failure. You can think of it this way. Turn on mandatory
Yes, turn on fault detection mode

Note: it will only let RabbitMQ notify you of failure, not success. If the message is routed correctly to the queue, the publisher is not notified. The problem is that it is impossible to ensure that the publication of the message must be successful, because the message of notification failure may be lost

Code demonstration:

Producer Code:

1.When publishing a message, the mandatory by true
2.channal Add callback processing for message routing failure

public class MandatoryProducer {
    //Queue name
    public static final String DIRECT_NAME = "lb_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a direct queue
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //Add callback processing for message publishing failure
        channel.addReturnListener(
                (replyCode, replyText, exchange, routingKey, properties, body) -> {
                    String message = new String(body);
                    System.out.println("Returned message:" + message);
                    System.out.println("Returned replycode:" + replyCode);
                    System.out.println("Returned replyText:" + replyText);
                    System.out.println("Returned exchange:" + exchange);
                    System.out.println("Returned routeKey:" + routingKey);
                });
        for (int i = 1; i < 4; i++) {
            if (i != 1) {
                //When publishing this message, scribble a routing key so that it cannot match the corresponding queue to see whether it will enter the callback processing method above
                channel.basicPublish(DIRECT_NAME, "Random fight", true, null, ("hello,world!" + i).getBytes());
            } else {
                //The following are normal publications that can be matched to the queue
                channel.basicPublish(DIRECT_NAME, "lb", true, null, ("hello,world!" + i).getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}

Consumer: only create a queue that binds the routing key 'lb' and consume the queue message

public class MandatoryConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a queue
        String queueName = "lb-queue";
        channel.queueDeclare(queueName, false, false, false, null);
        //The queue binds the switch "lb_direct" and specifies the routing key as "LB"
        channel.queueBind(queueName, "lb_direct", "lb");
        //Consume messages posted to the queue
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("Consumer received message***:" + new String(body));
            }
        });
    }
}

Start the consumer to wait for the consumption message, and then start the producer production message

The consumer will receive hellworld2 and hellworld3 messages

 

Because hellworld cannot route to a specific queue, the producer enters the callback processing method for routing failure

 

3. Sender confirmation mode

Based on the performance of transactions, RabbitMQ team has come up with a better solution for us, that is, the sender confirmation mode is adopted, which is lighter than transactions, and the performance impact is almost negligible.

Principle: the producer sets the channel to confirm mode. Once the channel enters confirm mode, all messages published on the channel will be assigned a unique ID (starting from 1)
An id confirms the message between the producer and RabbitMQ.
For non routable messages, when the switch finds that the message cannot be routed to any queue, it will confirm it, indicating that the message has been received. If the sender sets the mandatory mode, it will be called first
addReturnListener listener.

For a routable message, after the message is delivered to all matching queues, the broker will send an acknowledgement to the producer (including the unique ID of the message), which makes the producer know that the message has been correct
The destination queue is reached. If the message and queue are persistent, the confirmation message will be sent after the message is written to the disk. The delivery tag field in the confirmation message returned by the broker to the producer contains
The serial number of the confirmation message.

 

The biggest advantage of the confirm mode is that it can be asynchronous. Once a message is published, the producer application can continue to send the next message while waiting for the channel to return the confirmation. When the message is most
After the confirmation is finally received, the producer application can process the confirmation message through the callback method. If RabbitMQ loses the message due to its internal error, it will send a nack message to the production
The caller application can also process the nack message in the callback method to determine the next step.

Three implementation methods of Confirm:

Method 1: channel Waitforconfirms() normal sender confirmation mode; When the message reaches the exchange, it will return true.

Code demonstration:

producer:

/**
 * producer
 */
public class ConfirmProducer {

    //Switch name
    public static final String DIRECT_NAME = "lb_direct";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a direct queue
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //Enable sender confirmation mode
        channel.confirmSelect();
        channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes());
        //Judge whether the message just published in the channel is successful
        if (channel.waitForConfirms()) {
            System.out.println("msg send success");
        } else {
            System.out.println("msg send fail");
        }
        channel.close();
        connection.close();
    }
}

Execution result: we can find that the message has been published successfully.

 

Note: in the figure above, the routing key given by us is "random", which cannot match the queue. However, in this mode, the information of successful publishing is still returned. Therefore, it should be emphasized here that successful message publishing only means that the message has been successfully sent to the switch, does not mean that the message has been successfully routed to the corresponding queue, nor does it mean that the message has been successfully consumed.

Next, combine the above failure confirmation mode to see the results

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a direct queue
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //Add callback processing for message routing failure
        channel.addReturnListener(
                (replyCode, replyText, exchange, routingKey, properties, body) -> {
                    String message = new String(body);
                    System.out.println("Returned message:" + message);
                    System.out.println("Returned replycode:" + replyCode);
                    System.out.println("Returned replyText:" + replyText);
                    System.out.println("Returned exchange:" + exchange);
                    System.out.println("Returned routeKey:" + routingKey);
                });
        //Enable sender confirmation mode
        channel.confirmSelect();
        channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes());
        if (channel.waitForConfirms()) {
            System.out.println("msg send success");
        } else {
            System.out.println("msg send fail");
        }
        channel.close();
        connection.close();
    }

 

It can be found that the message is published successfully, but the routing fails

Mode 2: channel Waitforconfirmsordie() batch confirmation mode; The following code will be executed only after all messages are sent using the synchronization method. As long as a message does not arrive at the switch, an IOException exception will be thrown.

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a direct queue
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //Enable sender confirmation mode
        channel.confirmSelect();
        //Send 4 messages
        for (int i = 0; i < 4; i++) {
            channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes());
        }
        //Change the above confirmation method to batch confirmation
        channel.waitForConfirmsOrDie();
        channel.close();
        connection.close();
    }

Method 3: channel Addconfirmlistener() asynchronously listens to the sender's confirmation mode

public class ConfirmAsyncProducer {

    //Switch name
    public static final String DIRECT_NAME = "lb_direct";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a direct queue
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT);
        //Enable sender confirmation mode
        channel.confirmSelect();
        //Send confirmation listening
        channel.addConfirmListener(new ConfirmListener() {
            //deliveryTag increments by 1 every time a message is successfully sent
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                 System.out.println("Send successful callback,deliveryTag=" + deliveryTag);
                //doSomeThingAfterSuccess.....
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //doSomeThingAfterFail.....
            }
        });
        //Send four messages
        for (int i = 0; i < 4; i++) {
            channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes());
        }
        channel.close();
        connection.close();
    }
}

Start the consumer, 4 messages have been sent successfully, and the deliveryTag increases automatically each time

 

4. Standby switch

It is specified when declaring the switch for the first time to provide a pre-existing switch. If the primary switch cannot route messages, the messages will be routed to the new standby switch

Use the standby switch. As usual, declare the Queue and the standby switch, and bind the Queue to the standby switch. Then, when declaring the primary switch, set the standby switch to the primary switch through the switch parameter, alternate exchange.

 

producer:

public class BackupProducer {

    //Normal switch
    public static final String DIRECT_NAME = "lb_direct";
    //Standby switch
    public static final String DIRECT_BACKUP_NAME = "lb_backup_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare standby switch
        channel.exchangeDeclare(DIRECT_BACKUP_NAME, BuiltinExchangeType.FANOUT);
        //Declare master switch
        Map<String, Object> hashMap = new HashMap<>();
        //Set the name of the standby switch through key value pairs
        hashMap.put("alternate-exchange", DIRECT_BACKUP_NAME);
        //Declare the main switch. The last parameter is the hashMap above
        channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT, false, false, hashMap);
        channel.basicPublish(DIRECT_NAME, "Unknown routing key", false, null, "this is backup exchange test".getBytes());
        channel.close();
        connection.close();
    }
}

consumer:

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //Declare a queue
        String queueName = "lb-backup-queue";
        channel.queueDeclare(queueName, false, false, false, null);
        //It doesn't matter if the backup switch is bound with the routing key, because the backup switch is a fanout type switch
        channel.queueBind(queueName, BackupProducer.DIRECT_BACKUP_NAME, DirectProducer.ROUTE_KEY);
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println(consumerTag + ":" + "receiver[" + envelope.getRoutingKey() + "] :  " + new String(body));
            }
        });
    }

Start the consumer and then the producer to send messages

 

The results show that when the main switch sends a message and cannot send it to the specified queue, it will be sent to the standby switch, and the queue bound to the standby switch will receive the message

5. Services

After the transaction is started, the following steps are performed
    1.Producer send Tx.Select
    2.Server reply Tx.Select-Ok
    3.Producer sends message
    4.Producer send Tx.Commit
    5.Server reply Tx.Commit-Ok
    6.If an exception occurs before submission, the producer sends Tx.Rollback
    7.Server reply Tx.Rollback-Ok

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("maomaoyu.xyz");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(TRANSACTION_EXCHANGE, BuiltinExchangeType.DIRECT);
        try {
            //Open transaction
            channel.txSelect();
            channel.basicPublish(TRANSACTION_EXCHANGE, "lb1", true, null, "hello world".getBytes());
            //Transaction commit
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
            //Abnormal rollback occurred
            channel.txRollback();
        }
    }

 

Sending and receiving of data packets between client and server

Transactions are very performance consuming and synchronous. If multiple messages need to be sent, it is recommended to submit them at one time. If they are submitted separately each time, steps 4 and 5 need to be executed each time, which will consume more performance.

Keywords: PHP RabbitMQ Distribution

Added by tracivia on Wed, 19 Jan 2022 12:13:48 +0200