In the design of RabbitMQ, producers and consumers are deliberately "decoupled", that is, the release of news and the consumption of news are decoupled.
In RabbitMQ, there are different delivery mechanisms (producers), but each mechanism has a certain impact on performance. Generally speaking, the reliability of fast speed is low, and the performance of good reliability is poor. What's the specific situation
The usage needs to be determined according to your application, so there is no best way, only the most appropriate way.
Impact of various reliability mechanisms on Performance
Next, we introduce these delivery mechanisms
1. Unsecured mode
Publish your message through basic publish and use the correct switch and routing information. Your message will be received and sent to the appropriate server
In the queue. However, if there are network problems, or messages cannot be routed, or RabbitMQ itself has problems, this method has risks. Therefore, message sending without guarantee is generally not recommended.
2. Failure confirmation mode
Failure confirmation diagram
When sending a message, set the mandatory flag to tell RabbitMQ that if the message is not routable, it should be returned to the sender and notify the sender of failure. You can think of it this way. Turn on mandatory
Yes, turn on fault detection mode
Note: it will only let RabbitMQ notify you of failure, not success. If the message is routed correctly to the queue, the publisher is not notified. The problem is that it is impossible to ensure that the publication of the message must be successful, because the message of notification failure may be lost
Code demonstration:
Producer Code:
1.When publishing a message, the mandatory by true 2.channal Add callback processing for message routing failure
public class MandatoryProducer { //Queue name public static final String DIRECT_NAME = "lb_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a direct queue channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT); //Add callback processing for message publishing failure channel.addReturnListener( (replyCode, replyText, exchange, routingKey, properties, body) -> { String message = new String(body); System.out.println("Returned message:" + message); System.out.println("Returned replycode:" + replyCode); System.out.println("Returned replyText:" + replyText); System.out.println("Returned exchange:" + exchange); System.out.println("Returned routeKey:" + routingKey); }); for (int i = 1; i < 4; i++) { if (i != 1) { //When publishing this message, scribble a routing key so that it cannot match the corresponding queue to see whether it will enter the callback processing method above channel.basicPublish(DIRECT_NAME, "Random fight", true, null, ("hello,world!" + i).getBytes()); } else { //The following are normal publications that can be matched to the queue channel.basicPublish(DIRECT_NAME, "lb", true, null, ("hello,world!" + i).getBytes()); } } channel.close(); connection.close(); } }
Consumer: only create a queue that binds the routing key 'lb' and consume the queue message
public class MandatoryConsumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a queue String queueName = "lb-queue"; channel.queueDeclare(queueName, false, false, false, null); //The queue binds the switch "lb_direct" and specifies the routing key as "LB" channel.queueBind(queueName, "lb_direct", "lb"); //Consume messages posted to the queue channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("Consumer received message***:" + new String(body)); } }); } }
Start the consumer to wait for the consumption message, and then start the producer production message
The consumer will receive hellworld2 and hellworld3 messages
Because hellworld cannot route to a specific queue, the producer enters the callback processing method for routing failure
3. Sender confirmation mode
Based on the performance of transactions, RabbitMQ team has come up with a better solution for us, that is, the sender confirmation mode is adopted, which is lighter than transactions, and the performance impact is almost negligible.
Principle: the producer sets the channel to confirm mode. Once the channel enters confirm mode, all messages published on the channel will be assigned a unique ID (starting from 1)
An id confirms the message between the producer and RabbitMQ.
For non routable messages, when the switch finds that the message cannot be routed to any queue, it will confirm it, indicating that the message has been received. If the sender sets the mandatory mode, it will be called first
addReturnListener listener.
For a routable message, after the message is delivered to all matching queues, the broker will send an acknowledgement to the producer (including the unique ID of the message), which makes the producer know that the message has been correct
The destination queue is reached. If the message and queue are persistent, the confirmation message will be sent after the message is written to the disk. The delivery tag field in the confirmation message returned by the broker to the producer contains
The serial number of the confirmation message.
The biggest advantage of the confirm mode is that it can be asynchronous. Once a message is published, the producer application can continue to send the next message while waiting for the channel to return the confirmation. When the message is most
After the confirmation is finally received, the producer application can process the confirmation message through the callback method. If RabbitMQ loses the message due to its internal error, it will send a nack message to the production
The caller application can also process the nack message in the callback method to determine the next step.
Three implementation methods of Confirm:
Method 1: channel Waitforconfirms() normal sender confirmation mode; When the message reaches the exchange, it will return true.
Code demonstration:
producer:
/** * producer */ public class ConfirmProducer { //Switch name public static final String DIRECT_NAME = "lb_direct"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a direct queue channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT); //Enable sender confirmation mode channel.confirmSelect(); channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes()); //Judge whether the message just published in the channel is successful if (channel.waitForConfirms()) { System.out.println("msg send success"); } else { System.out.println("msg send fail"); } channel.close(); connection.close(); } }
Execution result: we can find that the message has been published successfully.
Note: in the figure above, the routing key given by us is "random", which cannot match the queue. However, in this mode, the information of successful publishing is still returned. Therefore, it should be emphasized here that successful message publishing only means that the message has been successfully sent to the switch, does not mean that the message has been successfully routed to the corresponding queue, nor does it mean that the message has been successfully consumed.
Next, combine the above failure confirmation mode to see the results
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a direct queue channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT); //Add callback processing for message routing failure channel.addReturnListener( (replyCode, replyText, exchange, routingKey, properties, body) -> { String message = new String(body); System.out.println("Returned message:" + message); System.out.println("Returned replycode:" + replyCode); System.out.println("Returned replyText:" + replyText); System.out.println("Returned exchange:" + exchange); System.out.println("Returned routeKey:" + routingKey); }); //Enable sender confirmation mode channel.confirmSelect(); channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes()); if (channel.waitForConfirms()) { System.out.println("msg send success"); } else { System.out.println("msg send fail"); } channel.close(); connection.close(); }
It can be found that the message is published successfully, but the routing fails
Mode 2: channel Waitforconfirmsordie() batch confirmation mode; The following code will be executed only after all messages are sent using the synchronization method. As long as a message does not arrive at the switch, an IOException exception will be thrown.
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a direct queue channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT); //Enable sender confirmation mode channel.confirmSelect(); //Send 4 messages for (int i = 0; i < 4; i++) { channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes()); } //Change the above confirmation method to batch confirmation channel.waitForConfirmsOrDie(); channel.close(); connection.close(); }
Method 3: channel Addconfirmlistener() asynchronously listens to the sender's confirmation mode
public class ConfirmAsyncProducer { //Switch name public static final String DIRECT_NAME = "lb_direct"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a direct queue channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT); //Enable sender confirmation mode channel.confirmSelect(); //Send confirmation listening channel.addConfirmListener(new ConfirmListener() { //deliveryTag increments by 1 every time a message is successfully sent @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("Send successful callback,deliveryTag=" + deliveryTag); //doSomeThingAfterSuccess..... } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //doSomeThingAfterFail..... } }); //Send four messages for (int i = 0; i < 4; i++) { channel.basicPublish(DIRECT_NAME, "Random fight", true, null, "hello,world!".getBytes()); } channel.close(); connection.close(); } }
Start the consumer, 4 messages have been sent successfully, and the deliveryTag increases automatically each time
4. Standby switch
It is specified when declaring the switch for the first time to provide a pre-existing switch. If the primary switch cannot route messages, the messages will be routed to the new standby switch
Use the standby switch. As usual, declare the Queue and the standby switch, and bind the Queue to the standby switch. Then, when declaring the primary switch, set the standby switch to the primary switch through the switch parameter, alternate exchange.
producer:
public class BackupProducer { //Normal switch public static final String DIRECT_NAME = "lb_direct"; //Standby switch public static final String DIRECT_BACKUP_NAME = "lb_backup_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare standby switch channel.exchangeDeclare(DIRECT_BACKUP_NAME, BuiltinExchangeType.FANOUT); //Declare master switch Map<String, Object> hashMap = new HashMap<>(); //Set the name of the standby switch through key value pairs hashMap.put("alternate-exchange", DIRECT_BACKUP_NAME); //Declare the main switch. The last parameter is the hashMap above channel.exchangeDeclare(DIRECT_NAME, BuiltinExchangeType.DIRECT, false, false, hashMap); channel.basicPublish(DIRECT_NAME, "Unknown routing key", false, null, "this is backup exchange test".getBytes()); channel.close(); connection.close(); } }
consumer:
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //Declare a queue String queueName = "lb-backup-queue"; channel.queueDeclare(queueName, false, false, false, null); //It doesn't matter if the backup switch is bound with the routing key, because the backup switch is a fanout type switch channel.queueBind(queueName, BackupProducer.DIRECT_BACKUP_NAME, DirectProducer.ROUTE_KEY); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println(consumerTag + ":" + "receiver[" + envelope.getRoutingKey() + "] : " + new String(body)); } }); }
Start the consumer and then the producer to send messages
The results show that when the main switch sends a message and cannot send it to the specified queue, it will be sent to the standby switch, and the queue bound to the standby switch will receive the message
5. Services
After the transaction is started, the following steps are performed 1.Producer send Tx.Select 2.Server reply Tx.Select-Ok 3.Producer sends message 4.Producer send Tx.Commit 5.Server reply Tx.Commit-Ok 6.If an exception occurs before submission, the producer sends Tx.Rollback 7.Server reply Tx.Rollback-Ok
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("maomaoyu.xyz"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TRANSACTION_EXCHANGE, BuiltinExchangeType.DIRECT); try { //Open transaction channel.txSelect(); channel.basicPublish(TRANSACTION_EXCHANGE, "lb1", true, null, "hello world".getBytes()); //Transaction commit channel.txCommit(); } catch (Exception e) { e.printStackTrace(); //Abnormal rollback occurred channel.txRollback(); } }
Sending and receiving of data packets between client and server
Transactions are very performance consuming and synchronous. If multiple messages need to be sent, it is recommended to submit them at one time. If they are submitted separately each time, steps 4 and 5 need to be executed each time, which will consume more performance.