RabbitMQ
What is MQ
MQ(message queue), in its literal sense, is essentially a queue. FIFO is first in first out, but the content stored in the queue is message. It is also a cross process communication mechanism for upstream and downstream message delivery. In the Internet architecture, MQ is a very common upstream and downstream "logical decoupling + physical decoupling" message communication service. After using MQ, the upstream of message sending only needs to rely on MQ without relying on other services.
Advantages of RabbitMQ
Due to the high concurrency of erlang language, the performance is good; With a throughput of 10000, MQ features are relatively complete, robust, stable, easy to use, cross platform, support for multiple languages, high community activity and fast update frequency (but the commercial version needs to be charged)
The concept of Rabbit
RabbitMQ is a message oriented middleware: it accepts and forwards messages. You can regard it as an express site. When you want to send a package, you put your package in the express station, and the courier will eventually send your express to the recipient. According to this logic, RabbitMQ is an express station, and an courier will deliver the express for you. The main difference between RabbitMQ and express station is that it does not process express mail, but receives, stores and forwards message data.
Four core concepts
producer
- Program for generating data transmission message
consumer
- Most of the time, consumers are a program waiting to receive messages. Producers and consumers are often not on the same machine. The same machine can even make producers and consumers
Switch
- On the one hand, receive the producer's message, on the other hand, push the message to the queue. You can push the message to a specific queue or multiple queues, or discard the message
queue
- RabbitMQ is a data structure for storing messages. Messages sent by producers can only be stored in queues. Queues are only limited by host memory and disk. In essence, they are a large message buffer. Many producers can send messages to a queue, and many consumers can receive data from a queue.
RabbitMQ installation
-
Official website address
- https://www.rabbitmq.com/download.html
-
File upload
- Upload to / usr/local/software directory (if there is no software, you need to create it yourself)
-
Installation files (installed in the following order)
- rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- yum install socat -y
- rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
-
Common commands (executed in the following order)
- Add startup startup RabbitMQ service chkconfig RabbitMQ server on
- Start service / SBIN / service rabbitmq server start
- View service status / SBIN / service rabbitmq server status
- Stop service (select execute) / SBIN / service rabbitmq server stop
- Open the web management plug-in (access the management interface of RabbitMQ on the page) RabbitMQ plugins enable RabbitMQ_ management
-
Access the management interface of RabbitMQ http: / / {the IP address of the machine where RabbitMQ is installed}: 15672 /. Using the default account password guest will cause permission problems (close the firewall or open the port)
If you do not close the firewall, you need to open 15672 and 5672 ports, one is to connect to the console and the other is to connect to the service
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
If it is a ECs, you need to open two ports on the management side- Add new user access here
- Create account rabbitmqctl add_user admin 123
- Set user role rabbitmqctl set_user_tags admin administrator
- Set user permissions
set_permissions [-p <vhostpath>] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- User user_admin has the configuration, write and read permissions of all resources in the virtual host / vhost1
Current user and role rabbitmqctl list_users - Log in again with admin.
- Add new user access here
-
Reset command
- The command to close the application is rabbitmqctl stop_app
- The command to clear is rabbitmqctl reset
- The restart command is rabbitmqctl start_app
Getting started with RabbitMQ
1. Import Maven dependency
<dependencies> <!--rabbitmq Dependent client--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--A dependency of operation file stream--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
2. Create the producer of the message
public class Producer { // Queue name public static final String QUERE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //Create connection factory ConnectionFactory factory = new ConnectionFactory(); // Factory IP, connecting to RabbitMQ queue (the IP address of the machine where RabbitMQ is installed) factory.setHost("xxx.xxx.xxx.xxx"); //user name factory.setUsername("username"); //password factory.setPassword("password"); // Create connection Connection connection = factory.newConnection(); // Acquisition channel Channel channel = connection.createChannel(); /** * Declare a queue * 1. Queue name * 2. Whether the messages in the queue need to be persistent (disk). By default, the messages are stored in memory * 3. Whether the queue is only for one consumer to consume and whether to share messages. true can be consumed by multiple consumers and false can only be consumed by one consumer * 4. Whether to delete automatically. Whether the queue will be deleted automatically after the last consumer disconnects. If true, it will be deleted automatically, and if false, it will be the opposite * 5. Other parameters */ channel.queueDeclare(QUERE_NAME,false,false,false,null); String message = "hello world "; /** * Send a message * 1. To which switch * 2. What is the key value of the route? This time it is the queue name * 3. Other parameter information * 4. The body of the message sent */ channel.basicPublish("",QUERE_NAME,null,message.getBytes()); } }
3. Consumers who create messages
/** * Consumer, accept message */ public class Consumer { // Queue name public static final String QUERE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //Create connection factory ConnectionFactory factory = new ConnectionFactory(); // Factory IP, connecting RabbitMQ queue factory.setHost("xxx.xxx.xxx.xxx"); //user name factory.setUsername("username"); //password factory.setPassword("password"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // How to conduct interface callback for consumption of pushed messages DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println(new String(message.getBody())); }; // Callback when canceling a message CancelCallback cancelCallback = consumerTag -> { System.out.println("Message consumption interrupted"); }; /** * Consumer News * 1. Which queue to consume * 2. Whether to respond automatically after successful consumption. true means automatic and false means manual * 3. How to conduct interface callback for consumption of pushed messages * 4. Consumers cancel the callback of consumption */ channel.basicConsume(QUERE_NAME,true,deliverCallback,cancelCallback); } }
4. Start consumers and producers to view the results
- Producer Consumer output hello world
- The RabbitMQ administration page has a new queue generated
Work queue for RabbitMQ
Take turns receiving messages
- Follow the above procedure to write that two consumers consume the same queue or start two working threads. Here, I use idea to start two working threads. Check the logo below to allow multiple threads to be started
/** * Test a worker thread (equivalent to a consumer) */ public class Worker01 { // Queue name public static final String QUERE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { xxxxxx Connect in the above way RabbitMQ,Generate channel xxxxxxx // How to conduct interface callback for consumption of pushed messages DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Received message:" + new String(message.getBody())); }; // Callback when canceling a message CancelCallback cancelCallback = consumerTag -> { System.out.println("Message consumption interrupted"); }; System.out.println("c1 Waiting to receive messages....."); //messages receiving channel.basicConsume(QUERE_NAME, true, deliverCallback, cancelCallback); } }
- Write producer and start
/** * Producers send a lot of messages */ public class Task01 { public static final String QUERE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { xxxxxx The above connection mode RabbitMQ,Generate channel xxxxxxx // Declaration of queue channel.queueDeclare(QUERE_NAME,false,false,false,null); // Receive information from the console Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUERE_NAME,null,message.getBytes()); System.out.println("Send message complete:"+message); } } }
- The producer sends multiple messages to check the consumption of two consumers. Through program execution, it is found that the producer sends a total of six messages, and consumer 1 and consumer 2 receive three messages respectively, and receive messages once in order
RabbitMQ tutorial of Shangsi valley station B: Silicon Valley RabbitMQ