In the previous several articles, we introduced how to publish a simple message through RabbitMQ, then to work queue, multiple consumers to consume, and finally to work queue distribution and message response mechanism (ACK);
These modes we shared before are all deleted from the queue after being consumed. Ideally, they will not be re consumed. Imagine another scenario, such as the novel business I did before. After the user successfully logs in, he needs to synchronize the gold coins of the temporary account and the book information of the shelf to the official account.
If we integrate with the login, after the login is successful, if the user account or shelf synchronization fails, it will affect our entire login experience. In order to achieve better user awareness, do not need users to do more operations, then we use the way of message queuing to asynchronous synchronization.
Publish and subscribe mode
This is a flow chart of user data synchronization and RabbitMQ publishing and subscription. You may have noticed how there is an additional switch in the middle.
Note here that in the publish subscribe mode, the switch must be bound to the queue. If it is not bound, it will send a message directly, which will not be sent to any queue or consumed.
Switch type
There are four types of switches: direct, topic, headers and fanout. This time we mainly talk about fanout, because this is the switch type we need to use this time.
fanout, as its name suggests, is a broadcast mode. It pushes the message to all queues that subscribe to it.
code
producer
public class Send { /** * Switch name */ private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { // Get connection Connection connection = MQConnectUtil.getConnection(); // Create channel Channel channel = connection.createChannel(); // Declare switch fanout: distribution mode, split channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // Message content String msg = "I am a successful login message"; // send message channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("Message sent successfully:" + msg); channel.close(); connection.close(); } }
Consumer - sync account
public class Consumer1 { /** * Switch name */ private final static String EXCHANGE_NAME = "test_exchange_fanout"; private final static String QUEUE_NAME = "test_topic_publish_account"; public static void main(String[] args) throws IOException, TimeoutException { // Get connection Connection connection = MQConnectUtil.getConnection(); // Create channel Channel channel = connection.createChannel(); // Declaration queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); // Bind queue to switch channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // Ensure that only one message is received at a time, and that rabbitMQ sends messages to idle consumers every time channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("Sync account[1]: " + msg); Thread.sleep(1000); // Manual response channel.basicAck(envelope.getDeliveryTag(), false); } }; // listen queue channel.basicConsume(QUEUE_NAME, false, consumer); } }
Consumer - synchronous bookshelf
public class Consumer2 { /** * Switch name */ private final static String EXCHANGE_NAME = "test_exchange_fanout"; private final static String QUEUE_NAME = "test_topic_publish_book_case"; public static void main(String[] args) throws IOException, TimeoutException { // Get connection Connection connection = MQConnectUtil.getConnection(); // Create channel Channel channel = connection.createChannel(); // Declaration queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); // Bind queue to switch channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // Ensure that only one message is received at a time, and that rabbitMQ sends messages to idle consumers every time channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("Synchronous bookshelf[2]: " + msg); Thread.sleep(1000); // Manual response channel.basicAck(envelope.getDeliveryTag(), false); } }; // listen queue channel.basicConsume(QUEUE_NAME, false, consumer); } }
summary
Based on the need to synchronize user data, we can use multiple queues to synchronize user data in order to ensure that data synchronization does not affect each other and reduce the coupling. Improve the high availability of the whole system.
< P style = "text align: Center; font weight: bold; color: ා0e88eb; font size: 20px" > the sun arch is the only one who has contributed a lot</p>
< P style = "text align: Center; font weight: bold; color:; font size: 16px" > for more, please pay attention to:</p>