RabbitMQ publish subscription mode, synchronizing user data

In the previous several articles, we introduced how to publish a simple message through RabbitMQ, then to work queue, multiple consumers to consume, and finally to work queue distribution and message response mechanism (ACK);

These modes we shared before are all deleted from the queue after being consumed. Ideally, they will not be re consumed. Imagine another scenario, such as the novel business I did before. After the user successfully logs in, he needs to synchronize the gold coins of the temporary account and the book information of the shelf to the official account.

If we integrate with the login, after the login is successful, if the user account or shelf synchronization fails, it will affect our entire login experience. In order to achieve better user awareness, do not need users to do more operations, then we use the way of message queuing to asynchronous synchronization.

Publish and subscribe mode

This is a flow chart of user data synchronization and RabbitMQ publishing and subscription. You may have noticed how there is an additional switch in the middle.

Note here that in the publish subscribe mode, the switch must be bound to the queue. If it is not bound, it will send a message directly, which will not be sent to any queue or consumed.

Switch type

There are four types of switches: direct, topic, headers and fanout. This time we mainly talk about fanout, because this is the switch type we need to use this time.

fanout, as its name suggests, is a broadcast mode. It pushes the message to all queues that subscribe to it.

code

producer

public class Send {

    /**
     * Switch name
     */
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {

        // Get connection
        Connection connection = MQConnectUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declare switch fanout: distribution mode, split
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // Message content
        String msg = "I am a successful login message";

        // send message
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

        System.out.println("Message sent successfully:" + msg);

        channel.close();
        connection.close();
    }
}

Consumer - sync account

public class Consumer1 {

    /**
     * Switch name
     */
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    private final static String QUEUE_NAME = "test_topic_publish_account";

    public static void main(String[] args) throws IOException, TimeoutException {

        // Get connection
        Connection connection = MQConnectUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declaration queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // Bind queue to switch
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // Ensure that only one message is received at a time, and that rabbitMQ sends messages to idle consumers every time
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("Sync account[1]: " + msg);

                Thread.sleep(1000);

                // Manual response
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // listen queue 
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

Consumer - synchronous bookshelf

public class Consumer2 {

    /**
     * Switch name
     */
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    private final static String QUEUE_NAME = "test_topic_publish_book_case";

    public static void main(String[] args) throws IOException, TimeoutException {

        // Get connection
        Connection connection = MQConnectUtil.getConnection();

        // Create channel
        Channel channel = connection.createChannel();

        // Declaration queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // Bind queue to switch
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // Ensure that only one message is received at a time, and that rabbitMQ sends messages to idle consumers every time
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("Synchronous bookshelf[2]: " + msg);

                Thread.sleep(1000);

                // Manual response
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // listen queue 
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

summary

Based on the need to synchronize user data, we can use multiple queues to synchronize user data in order to ensure that data synchronization does not affect each other and reduce the coupling. Improve the high availability of the whole system.

< P style = "text align: Center; font weight: bold; color: ා0e88eb; font size: 20px" > the sun arch is the only one who has contributed a lot</p>

< P style = "text align: Center; font weight: bold; color:; font size: 16px" > for more, please pay attention to:</p>

Keywords: Programming RabbitMQ

Added by noiseusse on Tue, 19 May 2020 06:07:30 +0300