RabbitMQ introduction and Application

1, Brief introduction

  • Open source AMQP implementation, written in Erlang language, supports a variety of clients

  • Distributed, highly available, persistent, reliable and secure

  • Support multiple protocols: AMQP, STOMP, MQTT, HTTP

  • Message Oriented Middleware for business decoupling between multiple systems

2, Basic concepts

1. Exchange: an exchange, which is responsible for receiving messages and forwarding messages to the bound queue. There are four types:

  • direct: exactly matched route

  • topic: pattern matching route

  • fanout: broadcast mode

  • headers: key value pair matching route

Exchange properties:

  • Persistence: if enabled, the rabbit service still exists after restart

  • Auto delete: if enabled, the switch will automatically delete itself after all its bound queues are deleted

2. Queue: queue, the internal object of rabbitmq. It is used to store messages. Its properties are similar to Exchange. You can also set whether to persist or delete automatically.

The consumer gets the message from the Queue and consumes it. Multiple consumers can subscribe to the same Queue. At this time, the messages in the Queue will be evenly allocated to multiple consumers for processing, instead of each consumer receiving and processing all messages.

3. Binding: binding, which binds the exchanger and queue according to the routing rules

4. Routing: routing key, the key of routing

3, Message reliability

  • Message acknowledgment: message acknowledgment. Under the message acknowledgment mechanism, the message will be deleted only after receiving the receipt. If the connection is disconnected without receiving the receipt, the message will be forwarded to other consumers. If the receipt is forgotten, the message will accumulate. After the consumer restarts, they will consume these messages again and execute the business logic repeatedly.

  • Message durability: Message persistence. Setting message persistence can avoid the loss of most messages, such as rabbitmq service restart, but non persistence can improve the processing efficiency of the Queue. If you want to ensure message persistence, the Exchange and Queue corresponding to the message should also be set to persistence.

  • Prefetch count, the number of messages sent to consumers each time. The default value is 1

In addition, if reliable services are required, persistence and ack mechanisms need to be set. If the system has high throughput, non persistence, noack and automatic deletion mechanisms can be set.

4, Simple application

Simulate such a business scenario. After the user places a successful order, it needs to increase points for the user and send the message of successful order to the user. This is a common business scenario in e-commerce business.

If the system is a micro service architecture, the function of placing orders may be in the order service, the function of adding points to users may be in the point service, and the function of sending notification messages to users may be in the notification service. Each service is decoupled and does not affect each other. To implement the above business scenario, message oriented middleware rabbitmq is a good choice.

The reasons are as follows:

  • High performance. Its implementation language is erlang language with high concurrency and high availability

  • Support message persistence. Even if the server hangs, messages will not be lost

  • Message response (ack) mechanism: rabbitmq will delete the message only after the consumer sends a message response after consuming the message, so as to ensure the reliability of the message

  • Support highly available clusters

  • Flexible routing

Implementation idea:

After the user orders successfully, rabbitmq sends a message to exchange ORDER_ Create switch, which is bound to two queues, queue ORDER_ INCREASESCORE,QUEUE.ORDER_NOTIFY: consumers subscribe to these two queues, which are respectively used to process the increase of points and send user notifications.

If the subsequent log system also needs to record the relevant logs of orders, we only need to define another queue and bind it to exchange ORDER_ Just create.

Issue a single rabbitmq message

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @author: Dancing robot
 * @date: 2017/10/13 10:46
 * @description: Send rabbitmq message after simulating the user to place an order
 */
public class OrderCreator {
    // Exchanger name
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // Message content
    private static String msg = "create order success";

    /**
     * Send mq message after simulating order creation
     */
    public void createOrder() {
        System.out.println("Order successfully, start sending rabbitmq news");

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");

        Connection connection;
        Channel channel;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // Persistence
            boolean durable = true;
            // topic type
            String type = "topic";
            // Declare the exchange and create it if it does not exist
            channel.exchangeDeclare(EXCHANGE, type, durable);

            String messgeId = UUID.randomUUID().toString();
            // Deliverymode > = 2 indicates setting message persistence
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
            // Release news
            String routingKey = "order_create";
            channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Point system subscription message

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: Dancing robot
 * @date: 2017/10/13 16:02
 * @description: rabbitmq Consumers can add points to users after successfully placing a simulated order
 */
public class IncreaseScoreConsumer implements Consumer {
    private Connection connection;
    private Channel channel;
    // Exchanger name
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // Add point queue name
    private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";


    public void consume() {
        // Initialize rabbitmq connection information
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // Declaration switch
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // Declaration queue
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // The switch is bound to the queue and the routingKey is set
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // For consumption messages, callback is this type. It turns off automatic confirmation messages and manually confirms after completing business logic
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("<Order message received by credit system:" + msg + ",Add points to users......");
        // Manual confirmation message
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);This method discards the message in the queue
         * channel.basicReject(envelope.getDeliveryTag(), true);This method puts the message back on the queue
         * Generally, the system will set a number of retries. If the number of retries exceeds, the message will be discarded. Otherwise, the message will be put into the queue again
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }

}

Notification system subscription message

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: Dancing robot
 * @date: 2017/10/13 16:20
 * @description: rabbitmq The consumer sends a notification to the user after the simulated order is successfully placed
 */
public class NotifyConsumer implements Consumer {
    private Connection connection;
    private Channel channel;

    // Exchanger name
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // Notify the user of the success of the order placement notification queue name
    private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";


    public void consume() {
        // Initialize rabbitmq connection information
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // Declaration switch
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // Declaration queue
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // The switch is bound to the queue and the routingKey is set
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // For consumption messages, callback is this type. It turns off automatic confirmation messages and manually confirms after completing business logic
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("<Notification system receives the order message:" + msg + ",Start sending notifications to users......");
        // Manual confirmation message
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);This method discards the message in the queue
         * channel.basicReject(envelope.getDeliveryTag(), true);This method puts the message back on the queue
         * Generally, the system will set a number of retries. If the number of retries exceeds, the message will be discarded. Otherwise, the message will be put into the queue again
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }
}

test

package com.robot.rabbitmq;

/**
 * @author: Dancing robot
 * @date: 2017/10/13 16:27
 * @description:
 */
public class Test {
    public static void main(String[] args) {

        IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
        increaseScoreConsumer.consume();

        NotifyConsumer notifyConsumer = new NotifyConsumer();
        notifyConsumer.consume();

        OrderCreator orderCreator = new OrderCreator();
        for (int i = 0; i < 3; i++) {
            orderCreator.createOrder();
        }
    }
}

Output:

Order successfully, start sending rabbitmq news
<Order message received by credit system: create order success,Add points to users......
<Notification system receives the order message: create order success,Start sending notifications to users......
Order successfully, start sending rabbitmq news
<Order message received by credit system: create order success,Add points to users......
<Notification system receives the order message: create order success,Start sending notifications to users......
Order successfully, start sending rabbitmq news
<Order message received by credit system: create order success,Add points to users......
<Notification system receives the order message: create order success,Start sending notifications to users......

Keywords: Java RabbitMQ Distribution

Added by niranjnn01 on Wed, 29 Dec 2021 17:59:32 +0200