RabbitMQ Message Queuing Middleware
Install directly on Linux
Start service
systemctl start rabbitmq-server
View service status
systemctl status rabbitmq-server
Out of Service
systemctl stop rabbitmq-server
Startup service
systemctl enable rabbitmq-server
1, Introduction to message queue
1. Synchronous call and asynchronous call
- Synchronous call
- When service A calls service B, it needs to wait for the return value after the execution of service B before service A can continue to execute
- Synchronous calls can be done through REST and RPC
- REST: Ribbon,Feign
- RPC: Dubbo
- Asynchronous call
- Service A calls service B, and A does not need to wait for the execution result of service B
- Asynchronous call through message queue
2. Message queue concept
- The full name of MQ is Message Queue. Message Queue (MQ) is an application to application communication method. Applications communicate by reading and writing messages in and out of the queue (data for the application) without requiring a dedicated connection to link them.
- Message passing refers to the communication between programs by sending data in messages, rather than through direct calls to each other. Direct calls are usually used for technologies such as remote procedure calls. Queuing means that applications communicate through queues. The use of queues removes the requirement that the receiving and sending applications execute simultaneously.
3. Common Message Queuing Middleware
- RabbitMQ,ActiveMQ,RocketMQ,Kafka
- RabbitMQ is stable and reliable with consistent data, supports multiple protocols, has message confirmation, and is based on erlang language
- Kafka has high throughput, high performance, fast persistence, no message acknowledgement, no message omission, and there may be duplicate messages. It is dependent on zookeeper, and the cost is high
- ActiveMQ is not flexible and lightweight enough to support many queues
- RocketMQ has good performance, high throughput, high availability, supports large-scale distribution, and supports a single protocol
2, RabbitMQ
1. RabbitMQ introduction
- RabbitMQ is a reusable enterprise message system based on AMQP. He follows the Mozilla Public License open source agreement.
- AMQP, or Advanced Message Queuing Protocol, is an application layer standard Advanced Message Queuing Protocol that provides unified messaging services. It is an open standard of application layer protocol and is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages, which is not limited by different products and different development languages of the client / middleware. The implementations in Erlang include RabbitMQ and so on.
- Main features:
- Ensure reliability: some mechanisms are used to ensure reliability, such as persistence, transmission confirmation and release confirmation
- Flexible routing function
- Scalability: it supports message clustering, and multiple RabbitMQ servers can form a cluster
- High availability: the queue is still available when a node in the RabbitMQ cluster has a problem
- Support multiple protocols
- Support multilingual client
- Provide good management interface
- Provide tracking mechanism: if an exception occurs in a message, you can analyze the cause of the exception through the tracking mechanism
- Provide plug-in mechanism: it can be extended in many aspects through plug-ins
2. Install rabbitmq through docker
2.1 Erlang installation
Reference address: https://www.erlang-solutions.com/downloads/
wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpmrpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v
Install socat
yum install -y socat
2.2 installing dokcer
(1)yum Update package to latest yum update (2)Install the required packages, yum-util provide yum-config-manager Function, the other two are devicemapper Drive dependent yum install -y yum-utils device-mapper-persistent-data lvm2 (3)set up yum Source: alicloud yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo (4)install docker yum install docker-ce -y (5)View after installation docker edition docker -v (6) Install accelerated mirror sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker
docker related commands
# Start docker: systemctl start docker # Stop docker: systemctl stop docker # Restart docker: systemctl restart docker # View docker status: systemctl status docker # Startup: systemctl enable docker systemctl unenable docker # View docker profile docker info # View docker help documentation docker --help
2.3 installation rabbitmq
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
Start RabbitMQ installed in docker
-
docker ps -a
View all docker containers
-
docker start CONTAINER ID
CONTAINER ID is the CONTAINER ID of the container displayed in docker ps -a
-
docker ps
View started containers
3, RabbitMQ user management
1. Logical structure
- user
- Virtual host
- queue
2. User management
2.1 user level
- The super administrator administrator can log in to the console, view all information, and operate users and policies
- The monitor can log in to the console and view the relevant information of the node, such as the number of processes, memory and disk usage
- The policy maker can log in to the console to make policies, but cannot view node information
- Ordinary administrator management can only log in to the console
- Others, unable to log in to the console, generally refers to providers and consumers
2.2 add user (command mode)
-
docker ps #Enter container docker exec -it d2dd40da7056 /bin/bash
-
Add / configure user test and set password to test
rabbitmqctl add_user test test
If rabbitmq is not installed through docker, you need to execute. / rabbitmqctl add in the sbin directory of rabbitmq_ user test test
-
Set user permissions
#Set admin to the administrator level rabbitmqctl set_user_tags test administrator
2.3 adding users (web Mode)
- Browser access: http://47.113.192.192/:15672/ (log in with guest. Guest has the highest permission and can only log in locally; first create a user using the command line)
2.4 adding virtual hosts
2.5 user binding virtual host
4, RabbitMQ working mode
1. Message queue mode
Reference documents: http://www.rabbitmq.com/getstarted.html
1.1 simple mode
The simple mode is that the producer sends the message to the queue, and the consumer takes the message from the queue. A message corresponds to a consumer
A queue has only one consumer
1.2 working mode
Work mode is that a message can be received by multiple consumers, but only one consumer can get it in the end
1.3 subscription mode
A message can be obtained by multiple consumers at the same time
The producer sends the message to the switch, and the consumer registers its corresponding queue to the switch
After sending the message, all consumers who have registered the queue can receive the message
1.4 routing mode
The producer sent the message to the switch with type direct mode
When the consumer's queue binds itself to the route, it will bind itself with a key
Only when the producer sends a message in the corresponding key format will the corresponding queue receive the message
5, Rabptimq switch and queue management
1. Create queue
2. Create a switch
3. Switch binding queue
Enter switch ex1
Bind two queues queue1 and Queue2 on the switch ex1 with the mode of fanout
6, Using MQ in normal Maven applications
RabbitMQ message queuing mode
1. Simple mode
1.1 message producer
-
Create Maven project
-
Add dependencies required for RabbitMQ connections
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.10.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
-
Create log configuration file log4j.properties
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG log4j.logger.org.mybatis = DEBUG log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
-
Create MQ connection help class
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //1. Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //2. Set MQ connection information (ip,port,virtualhost,username,password) in the factory object factory.setHost("47.113.192.192"); factory.setPort(5672); factory.setVirtualHost("host1"); factory.setUsername("admin"); factory.setPassword("admin"); //3. Obtain the connection with MQ through the factory object Connection connection = factory.newConnection(); return connection; } }
-
Message producer sends message
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { String msg = "Hello RabbitMQ"; Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC Channel channel = connection.createChannel(); //Equivalent to JDBC statement //Define queue (create a new MQ queue in Java code) //Parameter 1: defined queue name //Parameter 2: whether to select persistence for data in the queue //Parameter 3: exclusive (whether the current queue is private to the current connection) //Parameter 4: automatic deletion (when the number of connections in this queue is 0, this queue will be destroyed (whether there is data in the queue or not)) //Parameter 5: set the parameters of the current queue //channel.queueDeclare("queue7",false,false,false,null); //Parameter 1: switch name (simple mode here, no switch) //Parameter 2: destination queue name //Parameter 3: set the properties of the current message (for example, set the expiration time) //Parameter 4: content of message channel.basicPublish("","queue1",null,msg.getBytes()); System.out.println("send out" + msg); channel.close(); connection.close(); } }
1.2 message consumers
-
Message consumer receives message
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("The data received is:" + msg); } }; channel.basicConsume("queue1",true,consumer); } }
2. Working mode
2.1 message producer
-
Message producer sends message
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("Please enter a message:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC Channel channel = connection.createChannel(); //Equivalent to JDBC statement //Define queue (create a new MQ queue in Java code) //Parameter 1: defined queue name //Parameter 2: whether to select persistence for data in the queue //Parameter 3: exclusive (whether the current queue is private to the current connection) //Parameter 4: automatic deletion (when the number of connections in this queue is 0, this queue will be destroyed (whether there is data in the queue or not)) //Parameter 5: set the parameters of the current queue //channel.queueDeclare("queue7",false,false,false,null); //Parameter 1: switch name (working mode here, no switch) //Parameter 2: destination queue name //Parameter 3: set the properties of the current message (for example, set the expiration time) //Parameter 4: content of message channel.basicPublish("","queue2",null,msg.getBytes()); System.out.println("send out" + msg); channel.close(); connection.close(); } } }
2.2 message consumers
-
Message consumer receives message
- consumer1
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("consumer1 The data received is:" + msg); } }; channel.basicConsume("queue2",true,consumer); } }
- consumer2
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("consumer2 The data received is:" + msg); } }; channel.basicConsume("queue2",true,consumer); } }
3. Subscription mode
3.1 message producer
- Message producer sends message
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("Please enter a message:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC Channel channel = connection.createChannel(); //Equivalent to JDBC statement //Parameter 1: switch name //Parameter 2: destination queue name //Parameter 3: set the properties of the current message (for example, set the expiration time) //Parameter 4: content of message channel.basicPublish("ex1","",null,msg.getBytes()); System.out.println("send out" + msg); channel.close(); connection.close(); } } }
3.2 message consumers
-
Message consumer receives message
- consumer1
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("consumer1 The data received is:" + msg); } }; channel.basicConsume("queue3",true,consumer); } }
- consumer2
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("consumer2 The data received is:" + msg); } }; channel.basicConsume("queue4",true,consumer); } }
4. Routing mode
4.1 message producer
- Message producer sends message
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class SendMsg { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("Please enter a message:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC Channel channel = connection.createChannel(); //Equivalent to JDBC statement //Parameter 1: switch name //Parameter 2: the name of the target queue, but since there is a switch here, this parameter is Key //Parameter 3: set the properties of the current message (for example, set the expiration time) //Parameter 4: content of message if(msg.startsWith("a")){ channel.basicPublish("ex2","a" ,null,msg.getBytes()); }else if(msg.startsWith("b")){ channel.basicPublish("ex2","b" ,null,msg.getBytes()); } System.out.println("send out" + msg); channel.close(); connection.close(); } } }
4.2 message consumers
-
Message consumer receives message
- consumer1
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("consumer1 The data received is:" + msg); } }; channel.basicConsume("queue5",true,consumer); } }
- consumer2
import com.eicoma.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body is the data obtained from the queue String msg = new String(body); System.out.println("consumer2 The data received is:" + msg); } }; channel.basicConsume("queue6",true,consumer); } }
7, Using MQ in SpringBoot
SpringBoot can complete automatic configuration and dependency injection, and directly obtain the connection object of RabbitMQ through Spring
1. Message producer
-
Correlation dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
-
Configure applicaiton.yml
server: port: 9001 spring: application: name: producer rabbitmq: host: 47.113.192.192 port: 5672 virtual-host: host1 username: admin password: admin
-
Producer sends message
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class TestService { @Resource private AmqpTemplate amqpTemplate; public void senMsg(String msg){ //1. Send message to queue amqpTemplate.convertAndSend("queue1",msg); //2. Send message to switch (subscription switch) amqpTemplate.convertAndSend("ex1","",msg); //3. Send message to switch (routing switch) amqpTemplate.convertAndSend("ex2","a",msg); amqpTemplate.convertAndSend("ex2","b",msg); } }
2. Message consumers
-
Add dependency
-
Configure yml
server: port: 9002 spring: application: name: producer rabbitmq: host: 47.113.192.192 port: 5672 virtual-host: host1 username: admin password: admin
-
Consumer receives message
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service @RabbitListener(queues = {"queue1"}) public class ReceiveMsgService { @RabbitHandler public void receiveMsg(String msg){ System.out.println("receive msg:" + msg); } }