RabbitMQ learning notes

Basic concepts and principles of RabbitMQ

  1. AMQP, namely Advanced Message Queuing Protocol, is an open standard of application layer protocol, which is designed for message oriented middleware.

  2. RabbitMQ is an open source AMQP implementation, and the server side is written in Erlang language.

  3. Channel - Channel is the most important interface for us to deal with RabbitMQ. Most of our business operations are completed in the channel interface, including defining Queue, defining Exchange, binding Queue to Exchange, publishing messages, etc.

  4. Exchange (the producer sends the message to exchange (exchange), and exchange routes the message to one or more queues (or discards it). There are four commonly used exchange types of RabbitMQ: fanout, direct, topic and headers)

  5. Queue (internal object of RabbitMQ for storing messages)

  6. Basic working principle

    • Client (producer) send message -- > Exchange (the switch saves messages to the corresponding queue through different types)

      –>Queue

    • The client (consumer) consumes the messages in the queue through subscription.

Installation instructions

Installing RabbitMQ

Under Window:

  1. Download and install otp_win64_21.0.1.exe

    – Erlang program running environment (MQ running environment)

  2. Download and install rabbitmq-server-3.7.7.exe

    – RabbitMQ server program

  3. Start RabbitMQ service

  4. Activate the RabbitMQ administrative console (it needs to be activated because there is an operation panel)

    • Installation location of RabbitMQ = rabbitmq_server-3.7.7 folder

      = "" find SBIN = = > "" enter cmd in the absolute path bar = "" enter the command after entering

    – rabbitmq-plugins.bat enable rabbitmq_management

  5. Browser access: http://localhost:15672

    (the Window system automatically runs MQ by default)

    If the browser does not open it directly, double-click the service in the sbin directory. If not, double-click the server to start the service manually

    • There are two ports:
      • 15672: operate MQ through browser
      • 5672: message passing when writing code

    – administrator account: guest

    – password: guest

Install otp_win64_21.0.1.exe

(if there is a pop-up window Error opening file for writing, click ignore)

Double click to open = = > Next = = > Next = = > Install = = > Close

Install rabbitmq-server-3.7.7.exe

Double click to open = = "Next = =" (select installation location) Install = = "Next = =" Finish

Under Linux:

  1. Download ESL erlang_21.0-1centos7_amd64.rpm

    – Erlang running environment rpm package

  2. Download rabbitmq-server-3.7.7-1.el7.noarch.rpm

    – rabbitmq server program

  3. Upload files to temp directory through XFTP

	cd /	Move to root
	mkdir upload	establish upload catalogue
	cd upload/	Move to upload Under the directory
	- File transfer (two methods)
		1. rz	Select file
		2. Direct drag!!! (only in) Xshell (LI)
  1. rpm -ivh --nodeps esl-erlang_21.0-1~centos~7_amd64.rpm
    

    – install this rpm package

  2. rpm -ivh --nodeps rabbitmq-server-3.7.7-1.el7.noarch.rpm 
    

    – install this rpm package

  3. rabbitmq-plugins enable rabbitmq_management
    

– enable console

  1.  chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
    

    – authorization

  2. rabbitmq-server
    

    – enable services

    Access in CentOS: localhost:15672 (local access)

MQ Linux common commands

Startup and shutdown

rabbitmq-server

– front desk enable service

rabbitmq-server -detached

– enable the service in the background (compared with starting the service, the background starts with the - detached parameter,

((Ctrl + C cannot be used) the latter is safer to prevent jamming)

ps -aux | grep rabbitmq

– check whether rabbitMQ is started successfully

rabbitmqctl stop

– stop service (safe exit)

Pause and resume

*-It is needed for clustering

*-Let your service run, just give you pause, convenient for clustering

*-Note: pause and resume will not affect the process of RabbitMQ

rabbitmqctl stop_app

– pause

rabbitmqctl start_app

– restore

user management

< Username: user defined name, Password: user defined Password >

rabbitmqctl add_user UserName Password

– create a user (you can't log in directly just after the creation, and you need to authorize a role)

rabbitmqctl delete_user UserName

– delete user

rabbitmqctl change_password UserName Password

– change password

rabbitmqctl set_user_tags UserName Tag

– authorization role "four kinds of tags: administrator, monitoring, policymaker and management"

rabbitmqctl set_permissions -p / UserName '.*' '.*' '.*' 

​ – ’ .* ‘’ .* ‘’ .* ’ The three meanings are: 1 Configure permissions 2 Read permission 3 Write permission (don't doubt it is write. *)

RabbitMQ user four Tag s

Super administrator

  • You can log in to the management console (when the management plugin is enabled) to view all the information
    And can operate on user policies.

Monitoring

  • Log in to the management console (when the management plugin is enabled) and view it at the same time
    rabbitmq node related information (number of processes, memory usage, disk usage, etc.)

Policy maker

  • You can log in to the management console (when the management plugin is enabled). At the same time, you can
    policy. However, you cannot view the relevant information of the node (the part identified by the red box in the above figure).

General manager

  • You can only log in to the management console (when the management plugin is enabled) and cannot see it
    Node information, and policies cannot be managed.

Virtual host

– * - note: after a role is assigned to a role, the virtual host name defaults to:/

("/" represents a database, which is equivalent to a database in MySQL. Our future data will be placed under this virtual host)

rabbitmq's default virtual machine name is "/", and the virtual machine is equivalent to an independent mq server

Virtual host name can be changed

How to add multiple virtual hosts? (create virtual host / add virtual host for required users)

  • The following figure: ☟

MQ communication

AMQP

AMQP,Namely Advanced Message Queuing Protocol,An application layer standard providing unified messaging services, advanced message queuing protocol is an open standard of application layer protocol, which is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages and are not affected by the client/Middleware is limited by different products and different development languages. Erlang The implementations in are Rabbitmq Wait.
  • AMQP is a protocol standard. We must abide by this underlying standard when transmitting data

Basic concepts

RabbitMQ six working modes

MQ required profiles

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <!-- Objects, converting to String type -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

1. Simple mode

  • Hello World - also known as simple mode

      1. The message generator puts the message on the queue

      2. Message consumers listen to the message queue. If there are messages in the queue, they consume them. After the messages are taken away, they are automatically deleted from the queue

        (hidden danger: the message may not be handled correctly by the consumer and has disappeared from the queue, resulting in the loss of the message)

        (application scenario: chat (there is an excessive server in the middle; p-end, c-end)

Producers | consumers | point-to-point (one production and one consumption)

producer

package com.mq.test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Hello World Mode & & simple mode
 */
/*producer*/
public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //Create factory
        ConnectionFactory cf = new ConnectionFactory();

        //Start connection
        cf.setHost("192.168.118.130");    //IP address of the server
        cf.setPort(5672);                 //Default port number of rabbitMQ
        cf.setUsername("admin");          //user name
        cf.setPassword("123456");         //password
        cf.setVirtualHost("/test");       //Address of virtual host

        //Create a physical connection with MQ -- here you can get a TCP based physical connection with MQ
        Connection conn = cf.newConnection();

        //Creating a communication channel is equivalent to a virtual connection in TCP, because the overhead of physical connection is very large
        /*To establish a connection (establish a channel), you only need to do a TCP handshake. On the contrary, you need to do TCP handshake every time*/
        //Multiple virtual connections are created on the physical connection to reduce the overhead of the physical connection
        Channel channel = conn.createChannel();

        /*Create a queue through code, declare and create a queue, load if there is one, and create if there is none*/
        //The first parameter represents the queue name
        //The second parameter indicates whether to persist. false no, if mq stops, data will be lost | true yes
        //The third parameter indicates whether to privatize. false indicates that all consumers can access it. true indicates that only consumers who own it for the first time can use it, while other consumers will not allow access
        //The fourth parameter indicates whether to delete automatically. false indicates that the queue will not be deleted automatically after the connection is stopped. true: the queue will be deleted automatically after the connection is stopped
        //Other parameters: null
        channel.queueDeclare("helloWorld",false,false,false,null);

        //Create a message to send
        String message = "helloWorld";//Simulate messages delivered to mq

        /*Send data (send message to mq)*/
        //The first parameter, exchange switch, will not be used for the time being and will only be used later when publishing and subscribing
        //The second parameter routingKey queue name
        //Third parameter props extra attribute
        //The fourth parameter, body, passes the bytes of the database -- that is, information
        channel.basicPublish("","helloWorld",null,message.getBytes());
        
        /*The connection is not normally closed*/
        //Close virtual connection
        //channel.close();
        //Close physical connection
        //conn.close();
        System.out.println("Data sent successfully!!!!!");
    }
}

consumer

package com.mq.test;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Hello World Mode & & simple mode
 */
/*consumer*/
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        
        //Start connection
        factory.setHost("192.168.118.130");     //Server ip
        factory.setPort(5672);                  //Default port number of rabbitMQ
        factory.setUsername("admin");           //user name
        factory.setPassword("123456");          //password
        factory.setVirtualHost("/test");        //Address of virtual host
        
        //Create a physical connection with MQ - here you can get a TCP based physical connection with MQ
        Connection conn = factory.newConnection();
        
        //Create channel
        Channel channel = conn.createChannel();
        
        //Bind message queue
        channel.queueDeclare("helloWorld",false,false,false,null);
        
        //Create a message consumer (message reception)
        //The first parameter is the queue name
        //The second parameter is whether to automatically confirm the receipt of the message. false means to manually program to confirm the receipt of the message. This is the recommended practice of mq
        //The third parameter is to manually program the implementation class of Consumer to confirm that the class of message is the Recv object written below
        channel.basicConsume("helloWorld",false,new Recv(channel));//Here, the program will be monitored continuously. As long as there is production, I will consume it immediately. Do not close the channel

        /*Cannot close channel*/
        //channel.close();
        //conn.close();
    }
}

/**
 * Consumer Implementation class of
 */
class Recv extends DefaultConsumer {
    private Channel channel;
    
    //Override constructor
    //The Channel object needs to be passed in from the outer layer, which is used in the handleDelivery method
    public Recv(Channel channel) {
        super(channel);
        this.channel = channel; //Receive external incoming Channel objects
    }
    
    //This method here is the way we process messages
    //And must be rewritten
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String str = new String(body);//Convert the message of byte array into String
        System.out.println("Received message:"+str);
        //Sign in message, also known as confirmation message
        //The first parameter is envelope Getdeliverytag(), get the TagId of this message, which is the unique number of the message
        //The second parameter, false, indicates that only the current message is signed in. If true, it indicates that all messages not signed in by the consumer are signed in
        this.channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

Encapsulation tool class

package com.mq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtils {
    private static ConnectionFactory factory;
    static {
        factory = new ConnectionFactory();
        factory.setHost("192.168.118.130");  //IP address of the server
        factory.setPort(5672);              //Default port number of rabbitMQ
        factory.setUsername("admin");       //user name
        factory.setPassword("123456");         //password
        factory.setVirtualHost("/test");    //Address of virtual host
    }

    /**
     * Get physical connection
     * @return
     */
    public static Connection getConnection() {
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

------------------------------------------------------------------------------------

package com.mq.content;

/**
 * The constant class in which the queue name is stored
 * Improve code reuse
 */
public interface RabbitContent {
    /**
     * hello queue
     */
    String QUEUE_HELLO="hello";
    //public static final  String QUEUE_HELLO = "hello";
}    

Changes after writing tools

package com.mq.test;

import com.mq.content.RabbitContent;
import com.mq.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Hello World Mode & & simple mode
 */
/*producer*/
public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //Create factory
			/*Omitted here*/
        //Start connection
			/*Omitted here*/
        //Create a physical connection with MQ -- here you can get a TCP based physical connection with MQ
        Connection conn = RabbitMQUtils.getConnection();/*Change position*/

        /*Create a queue through code, declare and create a queue, load if there is one, and create if there is none*/
        	/*Change location: queue name modification*/
        channel.queueDeclare(RabbitContent.QUEUE_HELLO,false,false,false,null);

        /*Send data (send message to mq)*/
        	/*Change location: queue name*/
        channel.basicPublish("",RabbitContent.QUEUE_HELLO,null,message.getBytes());

------------------------------------------------------------------------------------

/**
 * Hello World Mode & & simple mode
 */
/*consumer*/
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //Create factory
			/*Omitted here*/
        //Start connection
			/*Omitted here*/
        //A physical connection based on MQ is also created here
        Connection conn = RabbitMQUtils.getConnection();/*Change position*/

        //Bind message queue
        	/*Queue name modification*/
        channel.queueDeclare(RabbitContent.QUEUE_HELLO,false,false,false,null);

        //Create a message consumer (message reception)
        	/*Queue name modification*/
        channel.basicConsume(RabbitContent.QUEUE_HELLO,false,new Recv(channel));//Here, the program will be monitored continuously. As long as there is production, I will consume it immediately. Do not close the channel

Consumer - anonymous inner class writing 1

package com.mq.test;

import com.mq.content.RabbitContent;
import com.mq.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Hello World Mode & & simple mode
 */
/*consumer*/
public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //Create a physical connection with MQ - here you can get a TCP based physical connection with MQ
        Connection conn = RabbitMQUtils.getConnection();

        //Create channel
        final Channel channel = conn.createChannel();

        //Bind message queue
        channel.queueDeclare(RabbitContent.QUEUE_HELLO, false, false, false, null);

        //Create a message consumer (message reception)
        //channel.basicConsume(RabbitContent.QUEUE_HELLO,false,new Recv(channel));// Here, the program will be monitored continuously. As long as there is production, I will consume it immediately. Do not close the channel
        channel.basicConsume(RabbitContent.QUEUE_HELLO, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str = new String(body);//Convert the message of byte array into String
                System.out.println("Received message:"+str);
                //Sign in message, also known as confirmation message
                //The first parameter is envelope Getdeliverytag(), get the TagId of this message, which is the unique number of the message
                //The second parameter, false, indicates that only the current message is signed in. If true, it indicates that all messages not signed in by the consumer are signed in
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        /*Cannot close channel*/
        //channel.close();
        //conn.close();
    }
}

2. Work queue

  • Work queues - work queue mode (competition for resources)

  • Every consumer gets different messages

  • A queue is distributed to different consumers

      1. The message generator puts the message into the queue. There can be multiple consumers, consumer 1 and consumer 2, who listen to the same queue at the same time. Is the message consumed? C1 and C2 compete for the content of the current message queue. Who gets the message first is responsible for consuming the message

        (hidden danger: in the case of high concurrency, a message will be generated by default and used by multiple consumers

        //Solution: you can set a switch (syncronize, which is different from the performance of synchronous lock) to ensure that a message can only be used by one consumer)

      2. Application scenario: red envelope; Resource scheduling in large projects (the task allocation system does not need to know which task execution system is idle, so it directly throws the task into the message queue, and the idle system automatically competes; 12306 sends messages to each passenger: if the consumer with poor performance fails to receive messages for half a day, it will not affect other consumers to continue to receive messages) (it can maximize the performance of the consumer server)

3. Publish and subscribe

  • Publish < producer > / subscribe < consumer > - publish and subscribe (share resources)
  • Every consumer gets the same message - especially suitable for data providers and applications
      1. X represents the internal components of the switch rabbitMQ. The erlang message generator completes the code, and the execution efficiency of the code is not high. The message generator puts the message into the switch, and the publish subscribe switch sends the message to all the message queues bound to it. The consumers of the corresponding message queue get the message for consumption
      2. One switch, multiple message queues, and one queue is bound to one consumer (user)
      3. When you pay attention to a producer, you will receive new news immediately, and each message is the same
      4. Related scenes: mass mailing, group chat, broadcasting (advertising) meteorological information pushed by the Meteorological Bureau

4. Routing mode

  • Routing - routing mode (filtering is added compared with publish subscribe mode)
  • The messages received by each consumer can be different < but need accurate matching > for example: search Beijing; It is not allowed to enter North only. It must be an exact match. It must be "="
      1. The message producer sends the message to the switch and judges according to the route. The route is the string (info). The currently generated message carries the route character (method of the object). According to the route key, the switch can only match the message queue corresponding to the route key, and the corresponding consumer can consume the message;

      2. Define routing strings according to business functions

      3. Get the corresponding function string from the code logic of the system and throw the message task into the corresponding queue
        Business scenario: error notification; EXCEPTION; Function of error notification; Traditional error notification; Customer notification; Using key routing, errors in the program can be encapsulated into messages and sent to the message queue. Developers can customize consumers and receive errors in real time;

5. Theme mode

  • Topic - topic mode (a kind of routing mode)
  • Compared with the routing mode, the topic mode can be fuzzy matched < for example: search Beijing; Just enter North >
    1. Asterisks and pound signs represent wildcards
    2. Asterisks represent multiple words and pound signs represent one word
    3. Add fuzzy matching to routing function
    4. The message generator generates the message and gives it to the switch
    5. The switch is fuzzy matched to the corresponding queue according to the rules of the key, and the listening consumer of the queue receives the consumption message

6. Remote mode

  • RPC (not commonly used and not explained)
  • RPC remote call mode

RabbitMQ other content

http://www.cnblogs.com/zhangweizhong/category/855479.html

The key provided by can only match the message queue corresponding to the routing key, and the corresponding consumer can consume messages;

2. Define routing strings according to business functions

3. Obtain the corresponding function string from the code logic of the system,Throw the message task into the corresponding queue
   Business scenario: error notice;EXCEPTION;Function of error notification;Traditional error notification;Customer notification;utilize key route,Errors in the program can be encapsulated into messages and passed into the message queue,Developers can customize consumers,Real time receiving error;

[external chain picture transferring... (img-YSEg5S18-1645611005681)]

5. Theme mode

  • Topic - topic mode (a kind of routing mode)
  • Compared with the routing mode, the topic mode can be fuzzy matched < for example: search Beijing; Just enter North >
    1. Asterisks and pound signs represent wildcards
    2. Asterisks represent multiple words and pound signs represent one word
    3. Add fuzzy matching to routing function
    4. The message generator generates the message and gives it to the switch
    5. The switch is fuzzy matched to the corresponding queue according to the rules of the key, and the listening consumer of the queue receives the consumption message

[external chain picture transferring... (img-W8CC7HCA-1645611005682)]

6. Remote mode

  • RPC (not commonly used and not explained)
  • RPC remote call mode

[external chain picture transferring... (img-fmc0PNZd-1645611005682)]

RabbitMQ other content

http://www.cnblogs.com/zhangweizhong/category/855479.html

  1. Click quick jump ↩︎

Keywords: RabbitMQ

Added by rhodrykorb on Wed, 23 Feb 2022 12:24:04 +0200