RabbitMQ [Message Queue]

Message Queue Introduction

A message is the data being transmitted
Message Queue is a form of communication that can be returned immediately after sending a message
There is a messaging system to ensure reliable delivery of messages
Message publishers can only publish messages to MQ without having to care about others
Message consumers get messages from MQ without having to care who published them (asynchronous processing, FIFO)

Message Queuing is used for decoupling business, ensuring the final consistency of messages, broadcasting, peak offset flow control, etc.

RabbitMQ is an open source AMQP developed in Erlang
AMQP: Advanced Message Queuing Protocol
RabbitMQ features:
1: Reliability
2: Flexible routing (forward signal)
3: Message Clusters (supports RabbitMQ to form clusters)
4: High Availability (mirroring is supported on clusters)
5: Support multiple protocols
6: Support for java language
7: Management interface (gui interface)
8: Tracking mechanism

RabbitMQ Installation

Install and download Erlang language: Erlang website
Install and download RabbitMQ-server: github website

Installation dependency command: yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto

Install Erlang:
1: Unzip the erlang source package in the installation directory
Command: tar-zxvf otp_ Src_ 24.2. Tar. Gz
2: Create erlang installation directory
mkdir /usr/local/erlang
4: Enter erlang unzip directory
cd otp_src_24.2
5: Configure installation information for erlang
./configure --prefix=/usr/local/erlang --without-javac
6: Compile and install
make && make install
7: Configure environment variables

ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

8: Overload environment variables
Command: source/etc/profile
Check erlang:erl-version

Install RabbitMQ
Enter to store rabbitmq-server-3.9.12-1. El7. Noarch. Directory of RPM
Command: rpm-ivh --nodeps rabbitmq-server-3.9.12-1. El7. Noarch. RPM
RabbitMQ Startup Service: rabbitmq-server strat
Note: Errors may occur here due to/var/lib/rabbitmq/. Erlang. Insufficient cookie file permissions.
Solution authorizes this file
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
RabbitMQ Shutdown Service: rabbitmqctl stop

Add plug-in: rabbitmq-plugins enable d plug-in name
Add: rabbitmq-plugins enable rabbitmq_management
Delete plug-in: rabbitmq-plugins disable plug-in name
Once the plug-in is added, you can access the address in the browser on your linux machine: http://192.168.73.132:15672/

RabbitMQ User Management
RabbitMQ default user guest password guest
Add user: rabbitmqctl add_user username password
Delete user: rabbitmqctl delete_user username
Modify user: rabbitmqctl change_password username password
Set user role: rabbitmqctl set_user_tags username role
Roles (level): management (1), monitoring (2), policymaker (2), administrator (4)

Jurisdiction

1,	Authorization command: rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read}
-p vhostpath : Namespace used to specify a resource, for example –p / Represents the root path namespace
user: Used to specify which user to authorize to fill in the user name
     conf:A regular expression match Which configuration resources can be configured by this user.
       write:A regular expression match Which configuration resources can be read by this user.
       read:A regular expression match Which configuration resources can be accessed by this user.
   For example:
     rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
   For setting up root Users have read and write configuration permissions for all resources
2,view user permission rabbitmqctl  list_permissions [vhostpath]
for example
  View all user permissions under root diameter
  rabbitmqctl  list_permissions 
  View all user permissions under the specified namespace
  rabbitmqctl  list_permissions /abc 
3,View permissions under specified user rabbitmqctl  list_user_permissions {username}
for example
  See root Permissions under User
  rabbitmqctl  list_user_permissions root
4,Clear User Rights rabbitmqctl  clear_permissions {username}
For example:
	Eliminate root User's Permissions
rabbitmqctl  clear_permissions root

vhost management

vhost yes RabbitMQ A namespace in which you can restrict where messages are stored. It's similar to using this namespace to control permissions Windows In the same folder, different files are stored in different folders.
1,Add to vhost: rabbitmqctl add vhost {name}
for example
  rabbitmqctl add vhost test
2,delete vhost: rabbitmqctl delete vhost {name}
   for example
    rabbitmqctl delete vhost test

RabbitMQ message sending and receiving

Message Sending and Receiving Mechanisms

1,Message
 Message, which is not specific, consists of a header and a message body. The body of the message is opaque, while the header is
 Consists of a series of optional attributes, including routing-key(Routing keys), priority(Relative to other messages
 Priority), delivery-mode(Indicates that the message may require persistent storage, etc. 
2,Publisher
 The producer of the message is also a client application that publishes the message to the exchanger. 
3,Exchange
 A switch that receives messages from producers and routes them to queues in the server. 
4,Binding
 Binding, used for associations between message queues and switches. A binding is to swap based on routing keys
 Routing rules that connect a switch to a message queue, so you can think of the switch as a binding
 Routing table. 
5,Queue
 Message queue, used to save messages until they are sent to consumers. It is the container of the message and also the message
 End. A message can be put into one or more queues. Messages have been in the queue waiting for consumers to connect
 Go to this queue and remove it. 
6,Connection
 Network connections, such as one TCP Connect. 
7,Channel
 A separate two-way data flow channel in a multiplexed connection. Channels are built in reality
TCP Connect virtual connections from the mainland. AMQP Commands are sent over a channel, whether it's publishing a message,
Subscribe to a queue or receive messages, all over a channel. Because it's built for the operating system
 Vertical and Destruction TCP They are expensive, so the concept of a channel is introduced to multiply one TCP 
Connect. 
8,Consumer
 The consumer of a message, representing a client application that gets a message from a message queue. 
9,Virtual Host
 A virtual host representing a batch of switches, message queues, and related objects. Virtual hosts share the same
 A stand-alone server domain for authentication and encryption environments. each vhost Essentially one mini Edition 
RabbitMQ Server, with its own queues, switches, bindings, and permissions. vhost yes AMQP 
The basis of the concept must be specified at the time of connection. RabbitMQ Default vhost yes / .  
10,Broker
 Represents a message queue server entity.

Exchange switch type: direct, fanout, topic, headers

direct:
Precise one-to-one matching

Fanout Switch

topic switch

Message Test

Add Dependency

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
</dependency>

Test Send Message to Queue

public class App {
    public static void main( String[] args ) {
        //Create Connection Factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");//Set ip
        factory.setPort(5672);//Set Port
        factory.setUsername("root");//Account number
        factory.setPassword("123");//Password
        Connection connection = null;//Define Connections
        Channel channel = null;//Define Channels
        try {
            connection = factory.newConnection();//Get Connections
            channel = connection.createChannel();//Get Channel

            //Declare a Queue
            //Parameter 1 is custom for queue name
            //Parameter 2 is whether or not a persisted queue
            //Whether parameter 3 is excluded, and if so, only one consumer is allowed to listen in on this queue
            //Whether parameter 4 automatically deletes the queue, true means delete when there is no data in the queue
            //Parameter 5 sets the value null for the queue property
            channel.queueDeclare("myQueue",true,false,false,null);
            //Define message
            String message = "RabbitMQ Message Test";

            //send message
            /*
            * Parameter 1 switch name not in use
            * Parameter 2 queue name or routingKey
            * Attribute information for parameter 3 message
            * Parameter 4 is the byte array of the specific message
            * */
            channel.basicPublish("","myQueue",null,message.getBytes("utf-8"));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

receive messages

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //receive messages
            /*
            * Queue name listening for receiving message for parameter 1
            * Parameter 2 is whether the message automatically acknowledges true, indicating that the message will automatically pop up from the queue after automatically confirming receipt of the message
            * Tags with parameter 3 as message recipients are used to distinguish different consumers when multiple consumers are listening on a queue at the same time, usually an empty string
            * Parameter 4 specifies how the message is processed by the callback method received by the message
            * Note: When the basicConsume method is used, a thread listening queue is started and messages are automatically received if new information enters the queue
            * Connections and channels cannot therefore be closed
            * */
            channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){
                //Method for receiving and processing override method messages
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println("The message is"+message);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Switch Test

direct switch

Message sent to

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //Declare Queue
            channel.queueDeclare("myQueue",true,false,false,null);

            /*
            * Declare a switch
            * Parameter 1-bit switch name
            * Parameter 2 is the type of switch (direct, fanout, topic, headers)
            * Parameter 3 is whether or not it is a persistent switch
            * */
            channel.exchangeDeclare("myExchange","direct",true);

            /*
            * Bind switch
            * Parameter 1 Queue Name
            * Parameter 2 switch name
            * RougKey or bindingKey for parameter 3 message
            * Note: You must ensure that switches and queue declarations are successful when Binding queues and switches
            * */
            channel.queueBind("myQueue","myExchange","directRoutingKey");

            String message = "direct message test";
            //Send message to queue
            channel.basicPublish("myExchange","directRoutingKey",null,message.getBytes(StandardCharsets.UTF_8));

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

receive messages

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.queueDeclare("myQueue",true,false,false,null);
            channel.exchangeDeclare("myExchange","direct",true);
            channel.queueBind("myQueue","myExchange","directRoutingKey");

            //Get Messages
            channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("news"+message);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

fanout switch

send message

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare("fanoutExchange","fanout",true);

            String message = "direct message test";
            channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

receive messages

public class App1 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            /*
            * Since messages from Fanout-type switches are broadcast-like, he does not need to bind a RoutingKey
            * It is also possible that many consumers will receive data from this switch, so we create a random queue name when we create a queue
            *
            * The queueDeclare method without parameters creates a queue with a random name
            * The data for this queue is nonpersistent
            * Is Exclusive
            * Automatically deleted
            * getQueue This method is used to get this random queue name
            * */
            String queueName = channel.queueDeclare().getQueue();
            channel.exchangeDeclare("fanoutExchange","fanout",true);
            //Bind this random queue to the switch, since it is a fanout type switch, there is no need to specify routingKey
            channel.queueBind(queueName,"fanoutExchange","");

            /*
            * Listen on a queue and get data from it
            * */
            channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("news"+message);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

topic switch

message sending

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare("topicExchange","topic",true);

            String message = "message test";
            //aa.123 has aa. * Or aa. # Can receive aa.123.123 Only aa. # Can receive
            channel.basicPublish("topicExchange","aa.123",null,message.getBytes(StandardCharsets.UTF_8));

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

messages receiving

public class App1 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.queueDeclare("topicQueue",true,false,false,null);
            channel.exchangeDeclare("topicExchange","topic",true);
            channel.queueBind("topicQueue","topicExchange","aa.*");

            channel.basicConsume("topicQueue",true,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("news"+message);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

topic and fanout use scenarios:

Transactional message

Transaction mechanism is similar to that of a database
Ensure that all message commands either write to the queue at the same time or fail at the same time
RabbitMQ transaction:
1: through the transaction mechanism provided by AMQP
2: Sender Confirmation Mode

channel.txSelect();//Declare Start Transaction Mode
channel.txCommit();//Submit Transaction
channel.txRollback();//Rollback transaction

Sender Transaction

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare("fanoutExchange","fanout",true);

            String message = "message test";
            //Start Transaction
            channel.txSelect();

            channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));

            //Submit Transaction
            channel.txCommit();
        } catch (IOException e) {
            e.printStackTrace();
            try {//rollback
                channel.txRollback();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        } catch (TimeoutException e) {
            e.printStackTrace();
            try {
                channel.txRollback();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }finally {
            if(channel != null){
                try {
                    channel.txRollback();//Add a transaction rollback, discard any committed messages in the current transaction, and free memory
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Sender Confirmation Mode

Ordinary transaction Transaction rollback error occurs when message sending fails
Confirmer mode replays messages when they fail to send

Grammar:

package com.java;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare("fanoutExchange","fanout",true);

            //Start sender confirmation mode
            channel.confirmSelect();
            String message = "message test";

            channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
            //Confirmation mode
            channel.waitForConfirmsOrDie();
            
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Sender Confirmer Mode - General Confirmation

Start sender confirmation mode
channel.confirmSelect();
boolean flag = channel.waitForConfirms();
The role of waitForConfirms:

Sender Confirmer Mode - Bulk Confirmation
Start sender confirmation mode
channel.confirmSelect();
Batch confirmation: channel.waitForConfirmsOrDie();
Effect:
Bulk message acknowledgment is faster than normal message acknowledgment
However, once a message is replaced, it is not possible to determine which message did not complete sending.
All of this message needs to be reissued

Sender Confirmation Mode - Asynchronous Confirmation
Start sender confirmation mode
channel.confirmSelect();
Asynchronous Confirmation:

public class App {
    public static void main( String[] args ) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare("fanoutExchange","fanout",true);

            //Start sender confirmation mode
            channel.confirmSelect();
            String message = "message test";

            
            //Declare asynchronous listeners
            channel.addConfirmListener(new ConfirmListener() {
                //Callback method after message acknowledgement
                /*
                * Parameter 1 is the number of the acknowledged message starting from 1
                * Parameter 2 currently processes multiple messages and confirms that the number will change
                * */
                @Override
                public void handleAck(long l, boolean b) throws IOException {

                }
                //Callback method with message unacknowledged
                //Message replacement is required if this method is executed
                @Override
                public void handleNack(long l, boolean b) throws IOException {

                }
            });

            channel.basicPublish("fanoutExchange","",null,message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if(channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Consumer Confirmation Mode - Manual Confirmation

public class App1 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.73.132");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            String queueName = channel.queueDeclare().getQueue();
            channel.exchangeDeclare("fanoutExchange","fanout",true);
            channel.queueBind(queueName,"fanoutExchange","");

            //Parameter 2 of the basicConsume method changes the automatic acknowledgment message to false to indicate that a message was received but not processed requiring manual acknowledgment
            //Manual Confirmation: Call Method
            /*
            * Method basicAck(); Batch confirmation for confirmation
            * */
            channel.basicConsume(queueName,false,"",new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("news"+message);
                    long num = envelope.getDeliveryTag();//Get the number of the message
                    Channel c = super.getChannel();//Getting channels in the current internal class can also be done using this

                    //Manual confirmation after message confirmation indicates that the message has been processed and needs to be popped out of the queue This method is executed when the current message handler is fully completed
                    //Parameter 1 is the number of the message
                    //Parameter 2 true requires confirmation of all messages that are less than the current number, and false means that only the current number is processed
                    c.basicAck(num,true);
                    
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

SpringBoot Integrated RabbitMQ

Select Dependencies: Spring for RabbitMQ
applicaition Profile Configuration RabbitMQ

spring.rabbitmq.host=192.168.73.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123

Create switch and bind

Profile-based switch creation

@Configuration
public class RabbitMQConfig {

    //Configure a switch of type Direct
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    //Configure a queue
    @Bean
    public Queue directQueue(){
        return new Queue("directQueue");
    }

    //binding
    /*
    * Parameter 1 Queue directQueue requires a bound queue parameter name that is exactly the same as a queue method name under @Bean and is automatically injected into this parameter
    * Parameter 2 DirectExchange directExchange requires a bound switch parameter name that is exactly the same as a queue method name under @Bean and is automatically injected into this parameter
    * */
    @Bean
    public Binding binding(Queue directQueue,DirectExchange directExchange){
        return BindingBuilder.bind(directQueue).to(directExchange).with("directRouting");
    }

}

send message

@Controller
public class app {

    //Injecting amqp templates uses this object to send and receive messages
    @Resource
    private AmqpTemplate amqpTemplate;

    @RequestMapping(value = "/send")
    @ResponseBody
    public void rabbitMQMessage(){
        /*
        * Send message method convertAndSend
        * Parameter 1 switch name
        * Parameter 2 RoutingKey
        * Message sent by parameter 3
        * */
        amqpTemplate.convertAndSend("directExchange","directRouting","Message test");
    }
}

Asynchronous listening for receiving messages

@Component
public class app {

    //Injecting amqp templates uses this object to send and receive messages
    @Resource
    private AmqpTemplate amqpTemplate;

    /*
    * Annotation RabbitListener is used to mark the current method as a listening method
    * The purpose is to automatically receive messages on a persistent basis. This method does not require a manual call to spring to automatically run this listen
    *
    * The parameter message is the message that is listened to every time
    * */
    @RabbitListener(queues = {"directQueue"})
    public void rabbitMQMessage(String message){
        System.out.println(message);
    }
}

fanout type receives messages

@Component
public class app {

    //Injecting amqp templates uses this object to send and receive messages
    @Resource
    private AmqpTemplate amqpTemplate;

    @RabbitListener(bindings = {//Annotate RabbitListener parameter bindings bound queue switches
            @QueueBinding(
                    value = @Queue(),//@Queue Creates a Queue Random Queue
                    exchange = @Exchange(name = "fanoutExchange",type = "fanout"))//Create a switch
    })
    public void rabbitMQMessage(String message){
        System.out.println(message);
    }
}

fanout Message Sending

@Configuration
public class RabbitMQConfig {

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

}
@Controller
public class RabbitMQConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @RequestMapping(value = "/send")
    @ResponseBody
    public void sendMessage(){
        amqpTemplate.convertAndSend("fanoutExchange","message test");
    }

}

topic receives messages

@Component
public class app {

    //Injecting amqp templates uses this object to send and receive messages
    @Resource
    private AmqpTemplate amqpTemplate;

    @RabbitListener(bindings = {//Annotate RabbitListener parameter bindings bound queue switches
            @QueueBinding(
                    value = @Queue("topic"),//@Queue Creates a Queue Random Queue
                    key = {"aa.#"},
                    exchange = @Exchange(name = "topicExchange",type = "topic"))//Create a switch
    })
    public void rabbitMQMessage(String message){
        System.out.println(message);
    }
}

Configure Switches

@Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

send message

@Controller
public class RabbitMQConfig {

    @Resource
    private AmqpTemplate amqpTemplate;

    @RequestMapping(value = "/send")
    @ResponseBody
    public void sendMessage(){
        amqpTemplate.convertAndSend("topicExchange","message test");
    }

}

RabbitMQ Cluster

Install 2 Linux operating systems and modify hostname s A and B, respectively
Execute vi/etc/hosts file as follows

First Machine
127.0.0.1 A  localhost localhost.localdomain localhost4 localhost4.localdomain4
::1       A  localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.222.129 A
192.168.222.130 B
 Second machine
127.0.0.1 B  localhost localhost.localdomain localhost4 localhost4.localdomain4
::1       B  localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.222.129 A
192.168.222.130 B

Install RabbitMQ on both machines (perform the AbbitMQ installation of this article catalog, both machines need to be installed)

Configure Cookie Files
Erlang Cookies are the keys to ensure that different nodes can communicate with each other. To ensure that different nodes in a cluster can communicate with each other, they must share the same Erlang Cookies, stored in/var/lib/rabbitmq/. Erlang. Cookies

Command: cat /var/lib/rabbitmq/.erlang.cookie
 Gets a list of keys CGCMOJQVMLUNUCQNSLFD

You must ensure that the Cookie files on both Linux s are identical in content, and you can choose to edit them using vim
You can also use the scp command to complete cross-machine copies of files, for example

Command: scp /var/lib/rabbitmq/.erlang.cookie 192.168.222.130:/var/lib/rabbitmq

Note: Since the permissions for this file are read-only, synchronization of Cookie files using vim or scp will fail, so the permissions for this file must be modified.
For example, Chmod 777/var/lib/rabbitmq/. Erlang. Cookies
Modify permissions back to read-only when Cookie file synchronization is complete
For example, Chmod 400/var/lib/rabbitmq/. Erlang. Cookies

Start AbbitMQ service for two machines
Execute the following commands in machine A:

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@B
rabbitmqctl start_app

View the number of nodes: rabbitmqctl cluster_status

SpringBoot Link Cluster

Configure the application configuration file

spring.rabbitmq.addresses=192.168.73.132:5672,192.168.73.133:5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123

Add Mirror Rule

Keywords: Java RabbitMQ Spring Boot Distribution message queue

Added by jp2php on Mon, 10 Jan 2022 19:11:45 +0200