Basic concepts and principles of RabbitMQ
-
AMQP, namely Advanced Message Queuing Protocol, is an open standard of application layer protocol, which is designed for message oriented middleware.
-
RabbitMQ is an open source AMQP implementation, and the server side is written in Erlang language.
-
Channel - Channel is the most important interface for us to deal with RabbitMQ. Most of our business operations are completed in the channel interface, including defining Queue, defining Exchange, binding Queue to Exchange, publishing messages, etc.
-
Exchange (the producer sends the message to exchange (exchange), and exchange routes the message to one or more queues (or discards it). There are four commonly used exchange types of RabbitMQ: fanout, direct, topic and headers)
-
Queue (internal object of RabbitMQ for storing messages)
-
Basic working principle
-
Client (producer) send message -- > Exchange (the switch saves messages to the corresponding queue through different types)
–>Queue
-
The client (consumer) consumes the messages in the queue through subscription.
-
Installation instructions
Installing RabbitMQ
Under Window:
-
Download and install otp_win64_21.0.1.exe
– Erlang program running environment (MQ running environment)
-
Download and install rabbitmq-server-3.7.7.exe
– RabbitMQ server program
-
Start RabbitMQ service
-
Activate the RabbitMQ administrative console (it needs to be activated because there is an operation panel)
-
Installation location of RabbitMQ = rabbitmq_server-3.7.7 folder
= "" find SBIN = = > "" enter cmd in the absolute path bar = "" enter the command after entering
– rabbitmq-plugins.bat enable rabbitmq_management
-
-
Browser access: http://localhost:15672
(the Window system automatically runs MQ by default)
If the browser does not open it directly, double-click the service in the sbin directory. If not, double-click the server to start the service manually
- There are two ports:
- 15672: operate MQ through browser
- 5672: message passing when writing code
– administrator account: guest
– password: guest
- There are two ports:
Install otp_win64_21.0.1.exe
(if there is a pop-up window Error opening file for writing, click ignore)
Double click to open = = > Next = = > Next = = > Install = = > Close
Install rabbitmq-server-3.7.7.exe
Double click to open = = "Next = =" (select installation location) Install = = "Next = =" Finish
Under Linux:
-
Download ESL erlang_21.0-1centos7_amd64.rpm
– Erlang running environment rpm package
-
Download rabbitmq-server-3.7.7-1.el7.noarch.rpm
– rabbitmq server program
-
Upload files to temp directory through XFTP
cd / Move to root mkdir upload establish upload catalogue cd upload/ Move to upload Under the directory - File transfer (two methods) 1. rz Select file 2. Direct drag!!! (only in) Xshell (LI)
-
rpm -ivh --nodeps esl-erlang_21.0-1~centos~7_amd64.rpm
– install this rpm package
-
rpm -ivh --nodeps rabbitmq-server-3.7.7-1.el7.noarch.rpm
– install this rpm package
-
rabbitmq-plugins enable rabbitmq_management
– enable console
-
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
– authorization
-
rabbitmq-server
– enable services
Access in CentOS: localhost:15672 (local access)
MQ Linux common commands
Startup and shutdown
rabbitmq-server
– front desk enable service
rabbitmq-server -detached
– enable the service in the background (compared with starting the service, the background starts with the - detached parameter,
((Ctrl + C cannot be used) the latter is safer to prevent jamming)
ps -aux | grep rabbitmq
– check whether rabbitMQ is started successfully
rabbitmqctl stop
– stop service (safe exit)
Pause and resume
*-It is needed for clustering
*-Let your service run, just give you pause, convenient for clustering
*-Note: pause and resume will not affect the process of RabbitMQ
rabbitmqctl stop_app
– pause
rabbitmqctl start_app
– restore
user management
< Username: user defined name, Password: user defined Password >
rabbitmqctl add_user UserName Password
– create a user (you can't log in directly just after the creation, and you need to authorize a role)
rabbitmqctl delete_user UserName
– delete user
rabbitmqctl change_password UserName Password
– change password
rabbitmqctl set_user_tags UserName Tag
– authorization role "four kinds of tags: administrator, monitoring, policymaker and management"
rabbitmqctl set_permissions -p / UserName '.*' '.*' '.*'
– ’ .* ‘’ .* ‘’ .* ’ The three meanings are: 1 Configure permissions 2 Read permission 3 Write permission (don't doubt it is write. *)
RabbitMQ user four Tag s
Super administrator
- You can log in to the management console (when the management plugin is enabled) to view all the information
And can operate on user policies.
Monitoring
- Log in to the management console (when the management plugin is enabled) and view it at the same time
rabbitmq node related information (number of processes, memory usage, disk usage, etc.)
Policy maker
- You can log in to the management console (when the management plugin is enabled). At the same time, you can
policy. However, you cannot view the relevant information of the node (the part identified by the red box in the above figure).
General manager
- You can only log in to the management console (when the management plugin is enabled) and cannot see it
Node information, and policies cannot be managed.
Virtual host
– * - note: after a role is assigned to a role, the virtual host name defaults to:/
("/" represents a database, which is equivalent to a database in MySQL. Our future data will be placed under this virtual host)
rabbitmq's default virtual machine name is "/", and the virtual machine is equivalent to an independent mq server
Virtual host name can be changed
How to add multiple virtual hosts? (create virtual host / add virtual host for required users)
- The following figure: ☟
MQ communication
AMQP
AMQP,Namely Advanced Message Queuing Protocol,An application layer standard providing unified messaging services, advanced message queuing protocol is an open standard of application layer protocol, which is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages and are not affected by the client/Middleware is limited by different products and different development languages. Erlang The implementations in are Rabbitmq Wait.
- AMQP is a protocol standard. We must abide by this underlying standard when transmitting data
Basic concepts
RabbitMQ six working modes
MQ required profiles
<dependencies> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <!-- Objects, converting to String type --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
1. Simple mode
-
Hello World - also known as simple mode
-
-
The message generator puts the message on the queue
-
Message consumers listen to the message queue. If there are messages in the queue, they consume them. After the messages are taken away, they are automatically deleted from the queue
(hidden danger: the message may not be handled correctly by the consumer and has disappeared from the queue, resulting in the loss of the message)
(application scenario: chat (there is an excessive server in the middle; p-end, c-end)
-
-
Producers | consumers | point-to-point (one production and one consumption)
producer
package com.mq.test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello World Mode & & simple mode */ /*producer*/ public class ProducerTest { public static void main(String[] args) throws IOException, TimeoutException { //Create factory ConnectionFactory cf = new ConnectionFactory(); //Start connection cf.setHost("192.168.118.130"); //IP address of the server cf.setPort(5672); //Default port number of rabbitMQ cf.setUsername("admin"); //user name cf.setPassword("123456"); //password cf.setVirtualHost("/test"); //Address of virtual host //Create a physical connection with MQ -- here you can get a TCP based physical connection with MQ Connection conn = cf.newConnection(); //Creating a communication channel is equivalent to a virtual connection in TCP, because the overhead of physical connection is very large /*To establish a connection (establish a channel), you only need to do a TCP handshake. On the contrary, you need to do TCP handshake every time*/ //Multiple virtual connections are created on the physical connection to reduce the overhead of the physical connection Channel channel = conn.createChannel(); /*Create a queue through code, declare and create a queue, load if there is one, and create if there is none*/ //The first parameter represents the queue name //The second parameter indicates whether to persist. false no, if mq stops, data will be lost | true yes //The third parameter indicates whether to privatize. false indicates that all consumers can access it. true indicates that only consumers who own it for the first time can use it, while other consumers will not allow access //The fourth parameter indicates whether to delete automatically. false indicates that the queue will not be deleted automatically after the connection is stopped. true: the queue will be deleted automatically after the connection is stopped //Other parameters: null channel.queueDeclare("helloWorld",false,false,false,null); //Create a message to send String message = "helloWorld";//Simulate messages delivered to mq /*Send data (send message to mq)*/ //The first parameter, exchange switch, will not be used for the time being and will only be used later when publishing and subscribing //The second parameter routingKey queue name //Third parameter props extra attribute //The fourth parameter, body, passes the bytes of the database -- that is, information channel.basicPublish("","helloWorld",null,message.getBytes()); /*The connection is not normally closed*/ //Close virtual connection //channel.close(); //Close physical connection //conn.close(); System.out.println("Data sent successfully!!!!!"); } }
consumer
package com.mq.test; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello World Mode & & simple mode */ /*consumer*/ public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //Start connection factory.setHost("192.168.118.130"); //Server ip factory.setPort(5672); //Default port number of rabbitMQ factory.setUsername("admin"); //user name factory.setPassword("123456"); //password factory.setVirtualHost("/test"); //Address of virtual host //Create a physical connection with MQ - here you can get a TCP based physical connection with MQ Connection conn = factory.newConnection(); //Create channel Channel channel = conn.createChannel(); //Bind message queue channel.queueDeclare("helloWorld",false,false,false,null); //Create a message consumer (message reception) //The first parameter is the queue name //The second parameter is whether to automatically confirm the receipt of the message. false means to manually program to confirm the receipt of the message. This is the recommended practice of mq //The third parameter is to manually program the implementation class of Consumer to confirm that the class of message is the Recv object written below channel.basicConsume("helloWorld",false,new Recv(channel));//Here, the program will be monitored continuously. As long as there is production, I will consume it immediately. Do not close the channel /*Cannot close channel*/ //channel.close(); //conn.close(); } } /** * Consumer Implementation class of */ class Recv extends DefaultConsumer { private Channel channel; //Override constructor //The Channel object needs to be passed in from the outer layer, which is used in the handleDelivery method public Recv(Channel channel) { super(channel); this.channel = channel; //Receive external incoming Channel objects } //This method here is the way we process messages //And must be rewritten @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String str = new String(body);//Convert the message of byte array into String System.out.println("Received message:"+str); //Sign in message, also known as confirmation message //The first parameter is envelope Getdeliverytag(), get the TagId of this message, which is the unique number of the message //The second parameter, false, indicates that only the current message is signed in. If true, it indicates that all messages not signed in by the consumer are signed in this.channel.basicAck(envelope.getDeliveryTag(), false); } }
Encapsulation tool class
package com.mq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtils { private static ConnectionFactory factory; static { factory = new ConnectionFactory(); factory.setHost("192.168.118.130"); //IP address of the server factory.setPort(5672); //Default port number of rabbitMQ factory.setUsername("admin"); //user name factory.setPassword("123456"); //password factory.setVirtualHost("/test"); //Address of virtual host } /** * Get physical connection * @return */ public static Connection getConnection() { Connection connection = null; try { connection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; } } ------------------------------------------------------------------------------------ package com.mq.content; /** * The constant class in which the queue name is stored * Improve code reuse */ public interface RabbitContent { /** * hello queue */ String QUEUE_HELLO="hello"; //public static final String QUEUE_HELLO = "hello"; }
Changes after writing tools
package com.mq.test; import com.mq.content.RabbitContent; import com.mq.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello World Mode & & simple mode */ /*producer*/ public class ProducerTest { public static void main(String[] args) throws IOException, TimeoutException { //Create factory /*Omitted here*/ //Start connection /*Omitted here*/ //Create a physical connection with MQ -- here you can get a TCP based physical connection with MQ Connection conn = RabbitMQUtils.getConnection();/*Change position*/ /*Create a queue through code, declare and create a queue, load if there is one, and create if there is none*/ /*Change location: queue name modification*/ channel.queueDeclare(RabbitContent.QUEUE_HELLO,false,false,false,null); /*Send data (send message to mq)*/ /*Change location: queue name*/ channel.basicPublish("",RabbitContent.QUEUE_HELLO,null,message.getBytes()); ------------------------------------------------------------------------------------ /** * Hello World Mode & & simple mode */ /*consumer*/ public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { //Create factory /*Omitted here*/ //Start connection /*Omitted here*/ //A physical connection based on MQ is also created here Connection conn = RabbitMQUtils.getConnection();/*Change position*/ //Bind message queue /*Queue name modification*/ channel.queueDeclare(RabbitContent.QUEUE_HELLO,false,false,false,null); //Create a message consumer (message reception) /*Queue name modification*/ channel.basicConsume(RabbitContent.QUEUE_HELLO,false,new Recv(channel));//Here, the program will be monitored continuously. As long as there is production, I will consume it immediately. Do not close the channel
Consumer - anonymous inner class writing 1
package com.mq.test; import com.mq.content.RabbitContent; import com.mq.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello World Mode & & simple mode */ /*consumer*/ public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { //Create a physical connection with MQ - here you can get a TCP based physical connection with MQ Connection conn = RabbitMQUtils.getConnection(); //Create channel final Channel channel = conn.createChannel(); //Bind message queue channel.queueDeclare(RabbitContent.QUEUE_HELLO, false, false, false, null); //Create a message consumer (message reception) //channel.basicConsume(RabbitContent.QUEUE_HELLO,false,new Recv(channel));// Here, the program will be monitored continuously. As long as there is production, I will consume it immediately. Do not close the channel channel.basicConsume(RabbitContent.QUEUE_HELLO, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String str = new String(body);//Convert the message of byte array into String System.out.println("Received message:"+str); //Sign in message, also known as confirmation message //The first parameter is envelope Getdeliverytag(), get the TagId of this message, which is the unique number of the message //The second parameter, false, indicates that only the current message is signed in. If true, it indicates that all messages not signed in by the consumer are signed in channel.basicAck(envelope.getDeliveryTag(), false); } }); /*Cannot close channel*/ //channel.close(); //conn.close(); } }
2. Work queue
-
Work queues - work queue mode (competition for resources)
-
Every consumer gets different messages
-
A queue is distributed to different consumers
-
-
The message generator puts the message into the queue. There can be multiple consumers, consumer 1 and consumer 2, who listen to the same queue at the same time. Is the message consumed? C1 and C2 compete for the content of the current message queue. Who gets the message first is responsible for consuming the message
(hidden danger: in the case of high concurrency, a message will be generated by default and used by multiple consumers
//Solution: you can set a switch (syncronize, which is different from the performance of synchronous lock) to ensure that a message can only be used by one consumer)
-
Application scenario: red envelope; Resource scheduling in large projects (the task allocation system does not need to know which task execution system is idle, so it directly throws the task into the message queue, and the idle system automatically competes; 12306 sends messages to each passenger: if the consumer with poor performance fails to receive messages for half a day, it will not affect other consumers to continue to receive messages) (it can maximize the performance of the consumer server)
-
-
3. Publish and subscribe
- Publish < producer > / subscribe < consumer > - publish and subscribe (share resources)
- Every consumer gets the same message - especially suitable for data providers and applications
-
- X represents the internal components of the switch rabbitMQ. The erlang message generator completes the code, and the execution efficiency of the code is not high. The message generator puts the message into the switch, and the publish subscribe switch sends the message to all the message queues bound to it. The consumers of the corresponding message queue get the message for consumption
- One switch, multiple message queues, and one queue is bound to one consumer (user)
- When you pay attention to a producer, you will receive new news immediately, and each message is the same
- Related scenes: mass mailing, group chat, broadcasting (advertising) meteorological information pushed by the Meteorological Bureau
-
4. Routing mode
- Routing - routing mode (filtering is added compared with publish subscribe mode)
- The messages received by each consumer can be different < but need accurate matching > for example: search Beijing; It is not allowed to enter North only. It must be an exact match. It must be "="
-
-
The message producer sends the message to the switch and judges according to the route. The route is the string (info). The currently generated message carries the route character (method of the object). According to the route key, the switch can only match the message queue corresponding to the route key, and the corresponding consumer can consume the message;
-
Define routing strings according to business functions
-
Get the corresponding function string from the code logic of the system and throw the message task into the corresponding queue
Business scenario: error notification; EXCEPTION; Function of error notification; Traditional error notification; Customer notification; Using key routing, errors in the program can be encapsulated into messages and sent to the message queue. Developers can customize consumers and receive errors in real time;
-
-
5. Theme mode
- Topic - topic mode (a kind of routing mode)
- Compared with the routing mode, the topic mode can be fuzzy matched < for example: search Beijing; Just enter North >
-
- Asterisks and pound signs represent wildcards
- Asterisks represent multiple words and pound signs represent one word
- Add fuzzy matching to routing function
- The message generator generates the message and gives it to the switch
- The switch is fuzzy matched to the corresponding queue according to the rules of the key, and the listening consumer of the queue receives the consumption message
6. Remote mode
- RPC (not commonly used and not explained)
- RPC remote call mode
RabbitMQ other content
http://www.cnblogs.com/zhangweizhong/category/855479.html
The key provided by can only match the message queue corresponding to the routing key, and the corresponding consumer can consume messages;
2. Define routing strings according to business functions 3. Obtain the corresponding function string from the code logic of the system,Throw the message task into the corresponding queue Business scenario: error notice;EXCEPTION;Function of error notification;Traditional error notification;Customer notification;utilize key route,Errors in the program can be encapsulated into messages and passed into the message queue,Developers can customize consumers,Real time receiving error;
[external chain picture transferring... (img-YSEg5S18-1645611005681)]
5. Theme mode
- Topic - topic mode (a kind of routing mode)
- Compared with the routing mode, the topic mode can be fuzzy matched < for example: search Beijing; Just enter North >
-
- Asterisks and pound signs represent wildcards
- Asterisks represent multiple words and pound signs represent one word
- Add fuzzy matching to routing function
- The message generator generates the message and gives it to the switch
- The switch is fuzzy matched to the corresponding queue according to the rules of the key, and the listening consumer of the queue receives the consumption message
[external chain picture transferring... (img-W8CC7HCA-1645611005682)]
6. Remote mode
- RPC (not commonly used and not explained)
- RPC remote call mode
[external chain picture transferring... (img-fmc0PNZd-1645611005682)]
RabbitMQ other content
http://www.cnblogs.com/zhangweizhong/category/855479.html
Click quick jump ↩︎