Message Queue Introduction
A message is the data being transmitted
Message Queue is a form of communication that can be returned immediately after sending a message
There is a messaging system to ensure reliable delivery of messages
Message publishers can only publish messages to MQ without having to care about others
Message consumers get messages from MQ without having to care who published them (asynchronous processing, FIFO)
Message Queuing is used for decoupling business, ensuring the final consistency of messages, broadcasting, peak offset flow control, etc.
RabbitMQ is an open source AMQP developed in Erlang
AMQP: Advanced Message Queuing Protocol
RabbitMQ features:
1: Reliability
2: Flexible routing (forward signal)
3: Message Clusters (supports RabbitMQ to form clusters)
4: High Availability (mirroring is supported on clusters)
5: Support multiple protocols
6: Support for java language
7: Management interface (gui interface)
8: Tracking mechanism
RabbitMQ Installation
Install and download Erlang language: Erlang website
Install and download RabbitMQ-server: github website
Installation dependency command: yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto
Install Erlang:
1: Unzip the erlang source package in the installation directory
Command: tar-zxvf otp_ Src_ 24.2. Tar. Gz
2: Create erlang installation directory
mkdir /usr/local/erlang
4: Enter erlang unzip directory
cd otp_src_24.2
5: Configure installation information for erlang
./configure --prefix=/usr/local/erlang --without-javac
6: Compile and install
make && make install
7: Configure environment variables
ERL_HOME=/usr/local/erlang PATH=$ERL_HOME/bin:$PATH export ERL_HOME PATH
8: Overload environment variables
Command: source/etc/profile
Check erlang:erl-version
Install RabbitMQ
Enter to store rabbitmq-server-3.9.12-1. El7. Noarch. Directory of RPM
Command: rpm-ivh --nodeps rabbitmq-server-3.9.12-1. El7. Noarch. RPM
RabbitMQ Startup Service: rabbitmq-server strat
Note: Errors may occur here due to/var/lib/rabbitmq/. Erlang. Insufficient cookie file permissions.
Solution authorizes this file
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
RabbitMQ Shutdown Service: rabbitmqctl stop
Add plug-in: rabbitmq-plugins enable d plug-in name
Add: rabbitmq-plugins enable rabbitmq_management
Delete plug-in: rabbitmq-plugins disable plug-in name
Once the plug-in is added, you can access the address in the browser on your linux machine: http://192.168.73.132:15672/
RabbitMQ User Management
RabbitMQ default user guest password guest
Add user: rabbitmqctl add_user username password
Delete user: rabbitmqctl delete_user username
Modify user: rabbitmqctl change_password username password
Set user role: rabbitmqctl set_user_tags username role
Roles (level): management (1), monitoring (2), policymaker (2), administrator (4)
Jurisdiction
1, Authorization command: rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read} -p vhostpath : Namespace used to specify a resource, for example –p / Represents the root path namespace user: Used to specify which user to authorize to fill in the user name conf:A regular expression match Which configuration resources can be configured by this user. write:A regular expression match Which configuration resources can be read by this user. read:A regular expression match Which configuration resources can be accessed by this user. For example: rabbitmqctl set_permissions -p / root '.*' '.*' '.*' For setting up root Users have read and write configuration permissions for all resources 2,view user permission rabbitmqctl list_permissions [vhostpath] for example View all user permissions under root diameter rabbitmqctl list_permissions View all user permissions under the specified namespace rabbitmqctl list_permissions /abc 3,View permissions under specified user rabbitmqctl list_user_permissions {username} for example See root Permissions under User rabbitmqctl list_user_permissions root 4,Clear User Rights rabbitmqctl clear_permissions {username} For example: Eliminate root User's Permissions rabbitmqctl clear_permissions root
vhost management
vhost yes RabbitMQ A namespace in which you can restrict where messages are stored. It's similar to using this namespace to control permissions Windows In the same folder, different files are stored in different folders. 1,Add to vhost: rabbitmqctl add vhost {name} for example rabbitmqctl add vhost test 2,delete vhost: rabbitmqctl delete vhost {name} for example rabbitmqctl delete vhost test
RabbitMQ message sending and receiving
Message Sending and Receiving Mechanisms
1,Message Message, which is not specific, consists of a header and a message body. The body of the message is opaque, while the header is Consists of a series of optional attributes, including routing-key(Routing keys), priority(Relative to other messages Priority), delivery-mode(Indicates that the message may require persistent storage, etc. 2,Publisher The producer of the message is also a client application that publishes the message to the exchanger. 3,Exchange A switch that receives messages from producers and routes them to queues in the server. 4,Binding Binding, used for associations between message queues and switches. A binding is to swap based on routing keys Routing rules that connect a switch to a message queue, so you can think of the switch as a binding Routing table. 5,Queue Message queue, used to save messages until they are sent to consumers. It is the container of the message and also the message End. A message can be put into one or more queues. Messages have been in the queue waiting for consumers to connect Go to this queue and remove it. 6,Connection Network connections, such as one TCP Connect. 7,Channel A separate two-way data flow channel in a multiplexed connection. Channels are built in reality TCP Connect virtual connections from the mainland. AMQP Commands are sent over a channel, whether it's publishing a message, Subscribe to a queue or receive messages, all over a channel. Because it's built for the operating system Vertical and Destruction TCP They are expensive, so the concept of a channel is introduced to multiply one TCP Connect. 8,Consumer The consumer of a message, representing a client application that gets a message from a message queue. 9,Virtual Host A virtual host representing a batch of switches, message queues, and related objects. Virtual hosts share the same A stand-alone server domain for authentication and encryption environments. each vhost Essentially one mini Edition RabbitMQ Server, with its own queues, switches, bindings, and permissions. vhost yes AMQP The basis of the concept must be specified at the time of connection. RabbitMQ Default vhost yes / . 10,Broker Represents a message queue server entity.
Exchange switch type: direct, fanout, topic, headers
direct:
Precise one-to-one matching
Fanout Switch
topic switch
Message Test
Add Dependency
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.0</version> </dependency>
Test Send Message to Queue
public class App { public static void main( String[] args ) { //Create Connection Factory ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132");//Set ip factory.setPort(5672);//Set Port factory.setUsername("root");//Account number factory.setPassword("123");//Password Connection connection = null;//Define Connections Channel channel = null;//Define Channels try { connection = factory.newConnection();//Get Connections channel = connection.createChannel();//Get Channel //Declare a Queue //Parameter 1 is custom for queue name //Parameter 2 is whether or not a persisted queue //Whether parameter 3 is excluded, and if so, only one consumer is allowed to listen in on this queue //Whether parameter 4 automatically deletes the queue, true means delete when there is no data in the queue //Parameter 5 sets the value null for the queue property channel.queueDeclare("myQueue",true,false,false,null); //Define message String message = "RabbitMQ Message Test"; //send message /* * Parameter 1 switch name not in use * Parameter 2 queue name or routingKey * Attribute information for parameter 3 message * Parameter 4 is the byte array of the specific message * */ channel.basicPublish("","myQueue",null,message.getBytes("utf-8")); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
receive messages
public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); //receive messages /* * Queue name listening for receiving message for parameter 1 * Parameter 2 is whether the message automatically acknowledges true, indicating that the message will automatically pop up from the queue after automatically confirming receipt of the message * Tags with parameter 3 as message recipients are used to distinguish different consumers when multiple consumers are listening on a queue at the same time, usually an empty string * Parameter 4 specifies how the message is processed by the callback method received by the message * Note: When the basicConsume method is used, a thread listening queue is started and messages are automatically received if new information enters the queue * Connections and channels cannot therefore be closed * */ channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){ //Method for receiving and processing override method messages @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("The message is"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Switch Test
direct switch
Message sent to
public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); //Declare Queue channel.queueDeclare("myQueue",true,false,false,null); /* * Declare a switch * Parameter 1-bit switch name * Parameter 2 is the type of switch (direct, fanout, topic, headers) * Parameter 3 is whether or not it is a persistent switch * */ channel.exchangeDeclare("myExchange","direct",true); /* * Bind switch * Parameter 1 Queue Name * Parameter 2 switch name * RougKey or bindingKey for parameter 3 message * Note: You must ensure that switches and queue declarations are successful when Binding queues and switches * */ channel.queueBind("myQueue","myExchange","directRoutingKey"); String message = "direct message test"; //Send message to queue channel.basicPublish("myExchange","directRoutingKey",null,message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
receive messages
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare("myQueue",true,false,false,null); channel.exchangeDeclare("myExchange","direct",true); channel.queueBind("myQueue","myExchange","directRoutingKey"); //Get Messages channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("news"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
fanout switch
send message
public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare("fanoutExchange","fanout",true); String message = "direct message test"; channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
receive messages
public class App1 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); /* * Since messages from Fanout-type switches are broadcast-like, he does not need to bind a RoutingKey * It is also possible that many consumers will receive data from this switch, so we create a random queue name when we create a queue * * The queueDeclare method without parameters creates a queue with a random name * The data for this queue is nonpersistent * Is Exclusive * Automatically deleted * getQueue This method is used to get this random queue name * */ String queueName = channel.queueDeclare().getQueue(); channel.exchangeDeclare("fanoutExchange","fanout",true); //Bind this random queue to the switch, since it is a fanout type switch, there is no need to specify routingKey channel.queueBind(queueName,"fanoutExchange",""); /* * Listen on a queue and get data from it * */ channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("news"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
topic switch
message sending
public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare("topicExchange","topic",true); String message = "message test"; //aa.123 has aa. * Or aa. # Can receive aa.123.123 Only aa. # Can receive channel.basicPublish("topicExchange","aa.123",null,message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
messages receiving
public class App1 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare("topicQueue",true,false,false,null); channel.exchangeDeclare("topicExchange","topic",true); channel.queueBind("topicQueue","topicExchange","aa.*"); channel.basicConsume("topicQueue",true,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("news"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
topic and fanout use scenarios:
Transactional message
Transaction mechanism is similar to that of a database
Ensure that all message commands either write to the queue at the same time or fail at the same time
RabbitMQ transaction:
1: through the transaction mechanism provided by AMQP
2: Sender Confirmation Mode
channel.txSelect();//Declare Start Transaction Mode channel.txCommit();//Submit Transaction channel.txRollback();//Rollback transaction
Sender Transaction
public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare("fanoutExchange","fanout",true); String message = "message test"; //Start Transaction channel.txSelect(); channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8)); //Submit Transaction channel.txCommit(); } catch (IOException e) { e.printStackTrace(); try {//rollback channel.txRollback(); } catch (IOException ex) { ex.printStackTrace(); } } catch (TimeoutException e) { e.printStackTrace(); try { channel.txRollback(); } catch (IOException ex) { ex.printStackTrace(); } }finally { if(channel != null){ try { channel.txRollback();//Add a transaction rollback, discard any committed messages in the current transaction, and free memory channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Sender Confirmation Mode
Ordinary transaction Transaction rollback error occurs when message sending fails
Confirmer mode replays messages when they fail to send
Grammar:
package com.java; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare("fanoutExchange","fanout",true); //Start sender confirmation mode channel.confirmSelect(); String message = "message test"; channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8)); //Confirmation mode channel.waitForConfirmsOrDie(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Sender Confirmer Mode - General Confirmation
Start sender confirmation mode
channel.confirmSelect();
boolean flag = channel.waitForConfirms();
The role of waitForConfirms:
Sender Confirmer Mode - Bulk Confirmation
Start sender confirmation mode
channel.confirmSelect();
Batch confirmation: channel.waitForConfirmsOrDie();
Effect:
Bulk message acknowledgment is faster than normal message acknowledgment
However, once a message is replaced, it is not possible to determine which message did not complete sending.
All of this message needs to be reissued
Sender Confirmation Mode - Asynchronous Confirmation
Start sender confirmation mode
channel.confirmSelect();
Asynchronous Confirmation:
public class App { public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare("fanoutExchange","fanout",true); //Start sender confirmation mode channel.confirmSelect(); String message = "message test"; //Declare asynchronous listeners channel.addConfirmListener(new ConfirmListener() { //Callback method after message acknowledgement /* * Parameter 1 is the number of the acknowledged message starting from 1 * Parameter 2 currently processes multiple messages and confirms that the number will change * */ @Override public void handleAck(long l, boolean b) throws IOException { } //Callback method with message unacknowledged //Message replacement is required if this method is executed @Override public void handleNack(long l, boolean b) throws IOException { } }); channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Consumer Confirmation Mode - Manual Confirmation
public class App1 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.73.132"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.exchangeDeclare("fanoutExchange","fanout",true); channel.queueBind(queueName,"fanoutExchange",""); //Parameter 2 of the basicConsume method changes the automatic acknowledgment message to false to indicate that a message was received but not processed requiring manual acknowledgment //Manual Confirmation: Call Method /* * Method basicAck(); Batch confirmation for confirmation * */ channel.basicConsume(queueName,false,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("news"+message); long num = envelope.getDeliveryTag();//Get the number of the message Channel c = super.getChannel();//Getting channels in the current internal class can also be done using this //Manual confirmation after message confirmation indicates that the message has been processed and needs to be popped out of the queue This method is executed when the current message handler is fully completed //Parameter 1 is the number of the message //Parameter 2 true requires confirmation of all messages that are less than the current number, and false means that only the current number is processed c.basicAck(num,true); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
SpringBoot Integrated RabbitMQ
Select Dependencies: Spring for RabbitMQ
applicaition Profile Configuration RabbitMQ
spring.rabbitmq.host=192.168.73.132 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=123
Create switch and bind
Profile-based switch creation
@Configuration public class RabbitMQConfig { //Configure a switch of type Direct @Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); } //Configure a queue @Bean public Queue directQueue(){ return new Queue("directQueue"); } //binding /* * Parameter 1 Queue directQueue requires a bound queue parameter name that is exactly the same as a queue method name under @Bean and is automatically injected into this parameter * Parameter 2 DirectExchange directExchange requires a bound switch parameter name that is exactly the same as a queue method name under @Bean and is automatically injected into this parameter * */ @Bean public Binding binding(Queue directQueue,DirectExchange directExchange){ return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting"); } }
send message
@Controller public class app { //Injecting amqp templates uses this object to send and receive messages @Resource private AmqpTemplate amqpTemplate; @RequestMapping(value = "/send") @ResponseBody public void rabbitMQMessage(){ /* * Send message method convertAndSend * Parameter 1 switch name * Parameter 2 RoutingKey * Message sent by parameter 3 * */ amqpTemplate.convertAndSend("directExchange","directRouting","Message test"); } }
Asynchronous listening for receiving messages
@Component public class app { //Injecting amqp templates uses this object to send and receive messages @Resource private AmqpTemplate amqpTemplate; /* * Annotation RabbitListener is used to mark the current method as a listening method * The purpose is to automatically receive messages on a persistent basis. This method does not require a manual call to spring to automatically run this listen * * The parameter message is the message that is listened to every time * */ @RabbitListener(queues = {"directQueue"}) public void rabbitMQMessage(String message){ System.out.println(message); } }
fanout type receives messages
@Component public class app { //Injecting amqp templates uses this object to send and receive messages @Resource private AmqpTemplate amqpTemplate; @RabbitListener(bindings = {//Annotate RabbitListener parameter bindings bound queue switches @QueueBinding( value = @Queue(),//@Queue Creates a Queue Random Queue exchange = @Exchange(name = "fanoutExchange",type = "fanout"))//Create a switch }) public void rabbitMQMessage(String message){ System.out.println(message); } }
fanout Message Sending
@Configuration public class RabbitMQConfig { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } }
@Controller public class RabbitMQConfig { @Resource private AmqpTemplate amqpTemplate; @RequestMapping(value = "/send") @ResponseBody public void sendMessage(){ amqpTemplate.convertAndSend("fanoutExchange","message test"); } }
topic receives messages
@Component public class app { //Injecting amqp templates uses this object to send and receive messages @Resource private AmqpTemplate amqpTemplate; @RabbitListener(bindings = {//Annotate RabbitListener parameter bindings bound queue switches @QueueBinding( value = @Queue("topic"),//@Queue Creates a Queue Random Queue key = {"aa.#"}, exchange = @Exchange(name = "topicExchange",type = "topic"))//Create a switch }) public void rabbitMQMessage(String message){ System.out.println(message); } }
Configure Switches
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); }
send message
@Controller public class RabbitMQConfig { @Resource private AmqpTemplate amqpTemplate; @RequestMapping(value = "/send") @ResponseBody public void sendMessage(){ amqpTemplate.convertAndSend("topicExchange","message test"); } }
RabbitMQ Cluster
Install 2 Linux operating systems and modify hostname s A and B, respectively
Execute vi/etc/hosts file as follows
First Machine 127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.222.129 A 192.168.222.130 B Second machine 127.0.0.1 B localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 B localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.222.129 A 192.168.222.130 B
Install RabbitMQ on both machines (perform the AbbitMQ installation of this article catalog, both machines need to be installed)
Configure Cookie Files
Erlang Cookies are the keys to ensure that different nodes can communicate with each other. To ensure that different nodes in a cluster can communicate with each other, they must share the same Erlang Cookies, stored in/var/lib/rabbitmq/. Erlang. Cookies
Command: cat /var/lib/rabbitmq/.erlang.cookie Gets a list of keys CGCMOJQVMLUNUCQNSLFD
You must ensure that the Cookie files on both Linux s are identical in content, and you can choose to edit them using vim
You can also use the scp command to complete cross-machine copies of files, for example
Command: scp /var/lib/rabbitmq/.erlang.cookie 192.168.222.130:/var/lib/rabbitmq
Note: Since the permissions for this file are read-only, synchronization of Cookie files using vim or scp will fail, so the permissions for this file must be modified.
For example, Chmod 777/var/lib/rabbitmq/. Erlang. Cookies
Modify permissions back to read-only when Cookie file synchronization is complete
For example, Chmod 400/var/lib/rabbitmq/. Erlang. Cookies
Start AbbitMQ service for two machines
Execute the following commands in machine A:
rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@B rabbitmqctl start_app
View the number of nodes: rabbitmqctl cluster_status
SpringBoot Link Cluster
Configure the application configuration file
spring.rabbitmq.addresses=192.168.73.132:5672,192.168.73.133:5672 spring.rabbitmq.username=root spring.rabbitmq.password=123
Add Mirror Rule