[learning notes] learning notes of RabbitMQ tutorial of teachers with poor programming

This article is the main "bad programming person" of station B RabbitMQ tutorial According to my study notes, up uses CentOS, while I usually use Ubuntu more, so this article is based on Ubuntu. In addition, it seems that RabbitMQ needs root permission, so all the following RabbitMQ related commands should preferably have sudo, otherwise an error may be reported. In addition, if the queue and switch are created repeatedly, an error will also be reported, which can be deleted on the web page.

1, Message queuing MQ

1. What is MQ

MQ(Message Queue): translated into message queue. Through the typical producer and consumer model, producers constantly produce messages to the message queue, and consumers constantly get messages from the queue. Because the production and consumption of messages are asynchronous, and only care about the sending and receiving of messages, without the intrusion of business logic, it is easy to realize the decoupling between systems. The alias is message middleware, which uses efficient and reliable message passing mechanism for platform independent data exchange, and integrates distributed systems based on data communication.

2. What are MQ's

There are many mainstream message middleware in the market today, such as the old ActiveMQ and RabbitMQ, the hot Kafka, and RocketMQ independently developed by Alibaba.

3. Characteristics of different MQ

(1) ActiveMQ

ActiveMQ is the most popular and powerful open source message bus produced by Apache. It is a message oriented middleware that fully supports the JMS specification. Rich API s and a variety of cluster architecture modes make ActiveMQ an old message middleware in the industry and popular in small and medium-sized enterprises!

(2) Kafka

Kafka is an open-source distributed publish subscribe messaging system of LinkedIn, which is currently a top-level project of Apache. Kafka is mainly characterized by processing message consumption based on Pull mode and pursuing high throughput. Its initial purpose is to collect and transmit logs. Version 0.8 supports replication, does not support transactions, and has no strict requirements on message duplication, loss and error. It is suitable for the data collection business of Internet services that generate a large amount of data.

(3) RocketMQ

RocketMQ is Alibaba's open source message middleware. It is developed in pure Java. It has the characteristics of high throughput, high availability and is suitable for large-scale distributed system applications. RocketMQ originated from Kafka, but it is not a Copy of Kafka. It optimizes the reliable transmission and transaction of messages. At present, it is widely used in trading, recharge, stream computing, message push, log streaming, binglog distribution and other scenarios in Alibaba group.

(4) RabbitMQ

RabbitMQ is an open source message queue system developed in Erlang language and implemented based on AMQP protocol. The main features of AMQP are message oriented, queue oriented, routing (including point-to-point and publish / subscribe), reliability and security. AMQP protocol is more used in scenarios that require high data consistency, stability and reliability in enterprise systems, and the requirements for performance and throughput are second.

RabbitMQ is more reliable than Kafka. Kafka is more suitable for IO high throughput processing. It is generally used in big data log processing or scenarios with slightly lower requirements for real-time (a small amount of delay) and reliability (a small amount of data loss), such as ELK log collection.

2, Getting to know RabbitMQ

Based on AMQP protocol and erlang language development, it is the most widely deployed open source message middleware and one of the most popular open source message middleware. RabbitMQ's official website is https://www.rabbitmq.com/ .

1. AMQP protocol

AMQP (advanced message queuing protocol) was proposed in 2003. It was first used to solve the problem of message transmission and interaction between different financial platforms. As the name suggests, AMQP is a protocol, more precisely a binary wire level protocol. This is the essential difference between AMQP and JMS. AMQP does not limit from the API layer, but directly defines the data format of network exchange. This makes the provider nature of AMQP cross platform. The following is the AMQP protocol model:

As shown in the figure, the producer (Publisher) sends messages to the switch (Exchange) in the Virtual host. There is a binding relationship between the switch and the Message Queue, and the Consumer consumes messages through the Message Queue.

2. RabbitMQ installation

Install RabiitMQ for Ubuntu. Refer to the article Ubuntu16.04 18.04 installing rabbitmq configuration and using detailed tutorial.

  • Sudo apt get install erlang NOx: because RabbitMQ is developed in erlang language, the language environment should be installed first

  • Sudo apt get install RabbitMQ server: install RabbitMQ

  • sudo rabbitmq-plugins enable rabbitmq_management: start plug-in management in RabbitMQ

  • Systemctl start RabbitMQ server: starts RabbitMQ

  • Since RabbitMQ occupies port 15672 by default, visit localhost:15672 in the browser and the following interface is displayed

    The login user name and password are guest. The login page is as follows:

After installation, the location of RabbitMQ configuration file is / etc / RabbitMQ / RabbitMQ env conf. You need to turn off the firewall when using:

systemctl disable firewalld
systemctl stop firewalld

3, RabiitMQ configuration

1. RabbitMQ management command line

(1) Service startup related

  • Systemctl start RabbitMQ server: starts RabbitMQ
  • Systemctl restart RabbitMQ server: restart RabbitMQ
  • Systemctl stop RabbitMQ server: stop RabbitMQ
  • Systemctl status RabbitMQ server: view the status of RabbitMQ

(2) Manage command line

  • rabbitmqctl help: view more commands to operate RabbitMQ without using the web management interface

(3) Plug in management command line

  • Rabbitmq plugins enable: start the plug-in
  • Rabbitmq plugins list: lists all plugins
  • Rabbitmq plugins disable: close the plug-in

2. Web management interface

(1) Overview overview

  • connections: both producers and consumers need to establish a connection with RabbitMQ before they can complete the production and consumption of messages. You can view the connection here
  • channels: channel. After the connection is established, a channel will be formed. The delivery of messages depends on the channel
  • Exchanges: a switch used to route messages
  • Queues: queue, that is, message queue. Messages are stored in the queue and wait for consumption. After consumption, they are removed from the queue

(2) Admin user and virtual host management

i) Add user

The Tags option above actually specifies the user's role. The following options are available:

  • Super administrator: you can log in to the management console, view all information, and operate on users and policies

  • Monitoring: you can log in to the management console and view the related information of rabbitmq node (number of processes, memory usage, disk usage, etc.)

  • policy maker: you can log in to the management console and manage policies at the same time. However, the relevant information of the node cannot be viewed (the part identified by the red box in the above figure)

  • General manager: you can only log in to the management console and cannot see node information or manage policies

  • Others: unable to log in to the management console, usually ordinary producers and consumers

ii) create virtual host

iii) bind virtual hosts and users

After creating the virtual host, we also need to add access rights to users:

Click add virtual host:

Enter the virtual machine setting interface

4, RabbitMQ's first program

1. Review of AMQP agreement

Producers send messages through channels. Each producer corresponds to a virtual host. Only after binding the virtual host with users can they have access rights. Whether a message is placed in the switch depends on the message model used. When a message is not placed in the switch, it will be directly placed in the message queue. Consumers and producers are decoupled. They only care about whether there are corresponding messages in the message queue. When consumers consume messages, they also need to connect to the virtual host.

2. Message model supported by AMQP

This paper does not involve the sixth model, and there is a new model, neither.

3. Introduce dependency

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

4. The first model (direct connection)

In the model above, there are the following concepts:

  • P: The producer is the program that sends the message
  • C: Consumer: the receiver of the news will always wait for the news to arrive
  • Queue: message queue, shown in red. Similar to a mailbox, messages can be cached; The producer delivers the message to it, and the consumer takes the message out of it

In this model, there is only one producer and one consumer. The producer sends the message to the message queue. The producer listens to the message queue and takes out the message from the message queue for consumption.

(1) Create virtual hosts and users on Web pages

In the Admin option

  • Create a virtual host named / ems, which starts with /
  • Create a user named ems
  • Click the user name of ems to bind with / ems virtual host

(2) Development producer

package HelloWorld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import utils.MQConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    // Production message
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // Create a connection factory object that connects to MQ
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // Set up connection to RabbitMQ host
        connectionFactory.setHost("192.168.114.129");
        // Set port number
        connectionFactory.setPort(5672);
        // Set which virtual host to connect to
        connectionFactory.setVirtualHost("/ems");
        // Set the user name and password to access the virtual host
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");

        // Get connection
        Connection connection = connectionFactory.newConnection();
        // Get channels in connection
        Channel channel = connection.createChannel();
        // Message queue corresponding to channel binding
        // Parameters: queue name, whether the queue is persistent (whether the queue still exists after restarting RabbitMQ, and the message will still be lost), whether to monopolize the queue, whether to automatically delete the queue after consumption, and additional parameters
        channel.queueDeclare("hello", false, false, false, null);

        // Release news
        // Parameters: switch name, queue name, additional settings for delivery message, and specific content of message
        // MessageProperties. PERSISTENT_ TEXT_ Plan: after restarting RabbitMQ, the message will still exist
        channel.basicPublish("","hello",null,"hello RabbitMQ".getBytes());
        // Close channels and connections
        channel.close();
        connection.close();
    }
}

Note that you need to close the firewall of the system where RabbitMQ is located, otherwise an error will be reported.

(3) Develop consumers

package HelloWorld;

import com.rabbitmq.client.*;
import utils.MQConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    // Note that it needs to be the main function, because consumers should always listen
    public static void main(String[] args) throws IOException, TimeoutException {
        // MQ connection objects created by the factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // Set up connection to RabbitMQ host
        connectionFactory.setHost("192.168.114.129");
        // Set port number
        connectionFactory.setPort(5672);
        // Set which virtual host to connect to
        connectionFactory.setVirtualHost("/ems");
        // Set the user name and password to access the virtual host
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");

        // Get connection
        Connection connection = connectionFactory.newConnection();
        // Get channels in connection
        Channel channel = connection.createChannel();
        // Message queue corresponding to channel binding
        // Parameters: queue name, whether the queue is persistent, whether the queue is exclusive, whether the queue is automatically deleted after consumption, and additional parameters
        // Ensure that the parameters of producer and consumer queues are consistent
        channel.queueDeclare("hello", false, false, false, null);

        // Parameters: queue name, automatic confirmation mechanism when starting message, callback interface when charging message
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override
            // body: the message taken from the queue
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Message: " + new String(body));
            }
        });
    }
}

(4) Encapsulation tool class

Encapsulate duplicate code from producers and consumers into tool classes

package Utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConnection {

    // Create a connection factory object that connects to MQ
    public static ConnectionFactory connectionFactory;
    // The content in the static code block is executed only once when the class is loaded
    static {
        connectionFactory = new ConnectionFactory();
    }

    // Provides methods for connecting objects
    public static Connection getConnection() {
        try {

            // Set up connection to RabbitMQ host
            connectionFactory.setHost("192.168.114.129");
            // Set port number
            connectionFactory.setPort(5672);
            // Set which virtual host to connect to
            connectionFactory.setVirtualHost("/ems");
            // Set the user name and password to access the virtual host
            connectionFactory.setUsername("ems");
            connectionFactory.setPassword("ems");
            // Get connection
            return connectionFactory.newConnection();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    // Method of closing channels and connections
    public static void closeChannelAndConnection(Channel channel, Connection connection) throws IOException, TimeoutException {
        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

5. The second model (work queue)

Work queues, also known as Task queues, is a task model. When message processing is time-consuming, the speed of message production may be much faster than that of message consumption. In the long run, more and more messages will accumulate and cannot be processed in time. At this point, you can use the work model: let multiple consumers bind to a queue and consume the messages in the queue together. Once the messages in the queue are consumed, they will disappear, so the task will not be repeated.

Role:

  • P: Producer: publisher of the task
  • C1: consumer-1, get the task and complete it. It is assumed that the completion speed is slow
  • C2: consumer-2: get the task and complete it. It is assumed that the completion speed is fast

(1) Development producer

package workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.MQConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare queue through channel
        channel.queueDeclare("work", true, false, false, null);
        for (int i = 0; i < 10; i++) {
            // Production message
            channel.basicPublish("", "work", null, (i + " hello work queque").getBytes());
        }
        // close resource
        MQConnection.closeChannelAndConnection(channel, connection);
    }
}

(2) Developing consumers 1

package workqueue;

import com.rabbitmq.client.*;
import utils.MQConnection;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare queue through channel
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1:"+new String(body));
            }
        });
    }
}

(3) Developing consumers 2

package workqueue;

import com.rabbitmq.client.*;
import utils.MQConnection;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare queue through channel
        channel.queueDeclare("work", true, false, false, null);
        channel.basicConsume("work", true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2:"+new String(body));
            }
        });
    }
}

(4) Test results

Run Consumer1 and Consumer2 first, and then run Provider. By default, RabbitMQ will send each message to the next user in order. No matter whether the processing speed of the two consumers is the same, the number of messages they can consume is evenly distributed. This method of distributing messages is called loop.

(5) Automatic message confirmation mechanism

When multiple consumers process messages at different speeds, you can turn off automatic confirmation and set the number of messages that can be consumed each time to realize that those who can do more work.

package workqueue;

import com.rabbitmq.client.*;
import utils.MQConnection;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Only one message can be consumed at a time
        channel.basicQos(1);
        // Declare queue through channel
        channel.queueDeclare("work", true, false, false, null);
        // Turn off automatic confirmation and manual confirmation is required
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Consumer 2:" + new String(body));
                // Manual confirmation
                // Parameters: confirm which specific message in the queue and whether to open multiple messages for confirmation at the same time
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

6. The third model (fanout)

fanout: fan out, also known as broadcast

In broadcast mode, the message sending process is as follows:

  • There can be multiple consumers
  • Each consumer has its own queue
  • Each queue is bound to an Exchange (switch)
  • The message sent by the producer can only be sent to the switch. The switch decides which queue to send, but the producer cannot decide
  • The switch sends messages to all queues that have been bound
  • Consumers in the queue can get the message. Realize that a message is consumed by multiple consumers

(1) Development producer

package Fanout;

import Utils.MQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name, switch type (fanout is broadcast type)
        channel.exchangeDeclare("logs","fanout");
        // send message
        // routingkey in fanout is meaningless
        channel.basicPublish("logs", "", null, ("fanout type message").getBytes());
        // close resource
        MQConnection.closeChannelAndConnection(channel, connection);
    }
}

(2) Consumer development 1 / 2 / 3

Since the codes of the three consumers are almost identical, they will not be written repeatedly. The only difference is the class name and the output information on line 25.

package Fanout;

import Utils.MQConnection;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name, switch type (fanout is broadcast type)
        channel.exchangeDeclare("logs","fanout");
        // Temporary queue
        String queueName=channel.queueDeclare().getQueue();
        // Binding switches and queues
        channel.queueBind(queueName,"logs","");
        // Consumption news
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1:"+new String(body));
            }
        });

    }
}

(5) Test results

7. The fourth model (Routing subscription model Direct)

In Fanout mode, a message will be consumed by all subscribed queues. However, in some scenarios, we want different messages to be consumed by different queues. In this case, Exchange of Direct type will be used.

Under the Direct model:

  • The binding between the queue and the switch cannot be arbitrary, but a routing key should be specified

  • When sending a message to Exchange, the sender of the message must also specify the RoutingKey of the message.

  • Exchange will no longer deliver messages to each bound queue, but will judge according to the Routing key of the message. Messages will be received only if the Routing key of the queue is completely consistent with the Routing key of the message

  • P: The producer sends messages to Exchange. When sending messages, a routing key will be specified.

  • 10: Exchange (exchange) receives the message from the producer, and then submits the message to the queue that exactly matches the routing key

  • C1: consumer whose queue specifies the message whose routing key is error

  • C2: consumer, whose queue specifies the messages whose routing key needs to be info, error and warning

(1) Development producer

package Direct;

import Utils.MQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name (from its own), switch type (direct is routing mode)
        channel.exchangeDeclare("logs_direct", "direct");
        String routingKey = "info";
        // send message
        // routingkey in fanout is meaningless
        channel.basicPublish("logs_direct", routingKey, null,
                ("This is direct Model publishing based on routingkey: [" + routingKey + "] Messages sent").getBytes());
        // close resource
        MQConnection.closeChannelAndConnection(channel, connection);
    }
}

(2) Developing consumers 1

package Direct;

import Utils.MQConnection;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name, switch type
        channel.exchangeDeclare("logs_direct", "direct");
        // Temporary queue
        String queueName = channel.queueDeclare().getQueue();
        // Binding switches and queues based on routingKey
        channel.queueBind(queueName, "logs_direct", "error");
        // Consumption news
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1:" + new String(body));
            }
        });

    }
}

(3) Developing consumers 2

package Direct;

import Utils.MQConnection;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name, switch type
        channel.exchangeDeclare("logs_direct", "direct");
        // Temporary queue
        String queueName = channel.queueDeclare().getQueue();
        // Binding switches and queues based on routingKey
        channel.queueBind(queueName, "logs_direct", "error");
        channel.queueBind(queueName, "logs_direct", "info");
        channel.queueBind(queueName, "logs_direct", "warning");
        // Consumption news
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2:" + new String(body));
            }
        });

    }
}

(4) Test results

When the producer sends a message with Route key info

When the producer sends a message with Route key info

8. The fifth model (Routing subscription model Topic)

Compared with Direct, Topic Exchange can route messages to different queues according to Routingkey. However, Topic type Exchange allows the queue to use wildcards when Binding routing keys! This kind of model Routingkey is generally composed of one or more words with "." between them Split, for example: item insert

Uniform character:

  • *: match exactly 1 word
  • #: match one or more words

For example:

  • audit.#: Match audit irs. Corporate or audit IRS et al
  • audit.*: Can only match audit irs
  • *.audit.#: Audit must be in the middle. Audit is preceded by one or more words and followed by one word

() development producers

package Topic;

import Utils.MQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name (from its own), switch type (topic is dynamic routing)
        channel.exchangeDeclare("topics", "topic");
        String routingKey = "user.save";
        // send message
        // routingkey in fanout is meaningless
        channel.basicPublish("topics", routingKey, null,
                ("This is topic Dynamic routing model, routingkey: [" + routingKey + "]").getBytes());
        // close resource
        MQConnection.closeChannelAndConnection(channel, connection);
    }
}

() develop consumers 1 / 2

Consumers 1 and 2 differ in class name, line 20, and line 25 names

package Topic;

import Utils.MQConnection;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get connection object
        Connection connection = MQConnection.getConnection();
        // Get channel object
        Channel channel = connection.createChannel();
        // Declare the channel to the specified switch
        // Parameters: switch name, switch type
        channel.exchangeDeclare("topics", "topic");
        // Temporary queue
        String queueName = channel.queueDeclare().getQueue();
        // Binding switches and queues based on wildcard routingKey
        // Consumer2 is set to user#
        channel.queueBind(queueName, "topics", "user.*");
        // Consumption news
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1:" + new String(body));
            }
        });
    }
}

(3) Test results

5, Using RabbitMQ in SpringBoot

1. Build environment

(1) Introduce dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2) Profile settings

spring:
  application:
    name: rabbitmq_springboot

  rabbitmq:
    host: 192.168.114.129
    port: 5672
    username: ems
    password: ems
    virtual-host: /ems

2. The first hello world model uses

(1) Development producer

package ecnu.cn;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
    // Inject RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // hello world
    @Test
    public void test(){
        rabbitTemplate.convertAndSend("hello","hello world");
    }
}

(2) Develop consumers

package ecnu.cn.Hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// Specifies the hello queue to listen to
// The default is persistent, non exclusive, and the queue is not automatically deleted
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "true", exclusive = "true", autoDelete = "true"))
public class Consumer {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("message: " + message);
    }
}

3. Use the second work model

(1) Development producer

// Inject RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

// work
@Test
public void testWork() {
    for (int i = 0; i < 10; i++) {
    	rabbitTemplate.convertAndSend("work", "work Model" + i);
    }
}

(2) Develop consumers

package ecnu.cn.Work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkConsumer {

    // The first consumer
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message) {
        System.out.println("message1: " + message);
    }

    // 2nd consumer
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message) {
        System.out.println("message2: " + message);
    }
}

4. Fanout broadcast model

(1) Development producer

// Inject RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

// fanout broadcast
@Test
public void testFanout(){
    // Note that there are multiple exchange parameters
    rabbitTemplate.convertAndSend("logs", "", "Fanout Messages sent by the model");
}

(2) Develop consumers

package ecnu.cn.Fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue, // Create temporary queue
            exchange = @Exchange(name = "logs", type = "fanout") // Bound switch
    ))
    public void reveive1(String message) {
        System.out.println("message1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue, // Create temporary queue
            exchange = @Exchange(name = "logs", type = "fanout") // Bound switch
    ))
    public void reveive2(String message) {
        System.out.println("message2: " + message);
    }
}

5. Route routing model

(1) Development producer

// Inject RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

// route routing mode
@Test
public void testRoute(){
    rabbitTemplate.convertAndSend("directs","info","send out info of key Routing information for");
}

(2) Develop consumers

package ecnu.cn.Route;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RouteConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Create temporary queue
            // name and value have the same effect
            exchange = @Exchange(value = "directs", type = "direct"), // Custom switch information
            key = {"info","error"}
    ))
    public void receive1(String message) {
        System.out.println("message1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Create temporary queue
            // name and value have the same effect
            exchange = @Exchange(value = "directs", type = "direct"), // Custom switch information
            key = "error"
    ))
    public void receive2(String message) {
        System.out.println("message2: " + message);
    }
}

6. Topic subscription model (dynamic routing model)

(1) Development producer

// Inject RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

// topic dynamic routing subscription mode
@Test
public void testTopic(){
    rabbitTemplate.convertAndSend("topics","user.save","user.save Routing information");
}

(2) Develop consumers

package ecnu.cn.Topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "topics", type = "topic"),
            key = {"user.save", "user.*"}
    ))
    public void receive1(String message) {
        System.out.println("message1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(value = "topics", type = "topic"),
            key = {"order.#", "produce.#", "user.*"}
    ))
    public void receive2(String message) {
        System.out.println("message2: " + message);
    }
}

6, Application scenarios of MQ

1. Asynchronous processing

(1) Scenario description

After registering, users need to send registration email and registration SMS. There are two traditional methods. 1 Serial mode 2 Parallel mode

(2) Serial mode

After writing the registration information into the database, send the registration email, and then send the registration SMS. After all the above three tasks are completed, it will be returned to the client. One problem is that e-mail and SMS are not necessary. It is just a notice, which makes the client wait for things that are unnecessary

(3) Parallel mode

After writing the registration information into the database, send messages while sending e-mail. After the above three tasks are completed, they will be returned to the client. The parallel method can improve the processing time.

(4) Message queue

It is assumed that the three service nodes use 50ms, 150ms in serial mode and 100ms in parallel. It doesn't affect the normal time for the client to register and return to the database. Although I said that it's not necessary to improve the processing time of the email after the client has completed its registration, it doesn't affect the normal use of the email Message queue: after the message queue is introduced, the business logic that is not necessary for sending e-mail and short message is processed asynchronously

It can be seen that after the introduction of message queue, the user's response time is equal to the time to write to the database + the time to write to the message queue (negligible). After the introduction of message queue post-processing, the response time is three times that of serial and two times that of parallel

2. Application decoupling

(1) Scene

Double 11 is a shopping frenzy Festival. After users place an order, the order system needs to notify the inventory system. The traditional practice is that the order system calls the interface of the inventory system

(2) Shortcomings

When the inventory system fails, the order will fail. High coupling between order system and inventory system Import message queue

(3) Order system

After the user places an order, the order system completes the persistence processing, writes the message to the message queue, and returns the success of the user's order.

(4) Inventory system

Subscribe to the order message, obtain the order message and perform library operation. Even if the inventory system fails, the message queue can ensure the reliable delivery of messages without causing message loss

3. Flow peak shaving

(1) Scene

Second kill activities usually cause the application to hang up due to excessive traffic. In order to solve this problem, the message queue is usually added at the front end of the application.

(2) Function

  • You can control the number of activities and discard orders that exceed this certain threshold (why haven't I succeeded in killing every second ^ ^)

  • It can alleviate the crushing application caused by high traffic in a short time (the application obtains orders according to its maximum processing capacity)

(3) Attention

  • After receiving the user's request, the server will first write to the message queue. If the length of the added message queue exceeds the maximum value, the user's request will be discarded directly or jump to the error page

  • The second kill service performs subsequent processing according to the request information in the message queue

7, RabbitMQ cluster

1. Ordinary cluster (replica cluster)

All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster -- from the official website

By default: all data / states required for RabbitMQ proxy operations are replicated across all nodes. An exception to this is message queues, which by default reside on one node, although they can be seen and accessed from all nodes

(1) Architecture diagram

Core problem solving: when the master node in the cluster goes down at a certain time, the information in the Queue can be backed up

(2) Cluster construction

  • Cluster planning

    node1: 10.15.0.3 mq1 master node
    Replica node deqno1.0
    node3: 10.15.0.5 mq3 repl2 replica node

  • Clone the hostname and ip mapping of three machines

    vim /etc/hosts join:
    10.15.0.3 mq1
    10.15.0.4 mq2
    10.15.0.5 mq3
    node1: vim /etc/hostname join: mq1
    node2: vim /etc/hostname join: mq2
    node3: vim /etc/hostname join: mq3

  • Install rabbitmq on three machines, synchronize the cookie files, and execute on node1:

    scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
    scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/

  • Check whether cookie s are consistent:

    node1: cat /var/lib/rabbitmq/.erlang.cookie
    node2: cat /var/lib/rabbitmq/.erlang.cookie
    node3: cat /var/lib/rabbitmq/.erlang.cookie

  • Start rabbitmq in the background, execute the following commands on all nodes, and successfully access the management interface:

    rabbitmq-server -detached

  • Execute the join cluster command on node2 and node3:

    1. Close rabbitmqctl stop_app
    2. Join rabbitmqctl join_ cluster rabbit@mq1
    3. Start service rabbitmqctl start_app

  • To view the cluster status, any node executes the following steps:

    rabbitmqctl cluster_status

  • If the following display appears, the cluster is set up successfully:

    Cluster status of node rabbit@mq3 ...
    [{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
    {running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
    {cluster_name,<<"rabbit@mq1">>},
    {partitions,[]},
    {alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]

  • Log in to the management interface and display the following status:

  • Test the cluster on node1 and create a queue
  • To view node2 and node3 nodes:
  • Close node1 node and execute the following command to view node2 and node3:

    rabbitmqctl stop_app

2. Mirror cluster

This guide covers mirroring (queue contents replication) of classic queues -- from the official website

By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. -- From the official website

The mirror queue mechanism is to set the master-slave relationship between the three nodes, and the messages will be automatically synchronized between the three nodes. If one of the nodes is unavailable, it will not lead to message loss or service unavailability, so as to improve the overall high availability of MQ cluster.

(1) Cluster architecture diagram

(2) Configure cluster architecture

  • Strategy description
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
-p Vhost:  Optional parameter for specified vhost Lower queue Make settings
Name:     policy Name of
Pattern: queue Matching pattern of(regular expression )
Definition: Image definition, including three parts ha-mode, ha-params, ha-sync-mode
         		ha-mode:Indicates the mode of the mirror queue. Valid values are all/exactly/nodes
                      all: Indicates that mirroring is performed on all nodes in the cluster
                      exactly: Indicates that mirroring is performed on a specified number of nodes. The number of nodes is determined by ha-params appoint
                      nodes: Indicates that the image is mirrored on the specified node, and the node name is passed ha-params appoint
          	 ha-params: ha-mode Parameters required by the mode
              ha-sync-mode: The synchronization method of messages in the queue. The valid values are automatic and manual
              priority: Optional parameters, policy Priority of
  • View current policy
rabbitmqctl list_policies
  • Add policy
rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}' 
explain:The policy regular expression is“^" Indicates that all matches all queue names  ^hello:matching hello Start queue
  • Delete policy
rabbitmqctl clear_policy ha-all
  • Test cluster

Keywords: RabbitMQ Middleware message queue MQ

Added by raymedia on Wed, 02 Feb 2022 10:51:22 +0200