RabbitMQ message queue

RabbitMQ Message Queuing Middleware

Install directly on Linux

Start service

systemctl start rabbitmq-server
View service status

systemctl status rabbitmq-server
Out of Service

systemctl stop rabbitmq-server
Startup service

systemctl enable rabbitmq-server

1, Introduction to message queue

1. Synchronous call and asynchronous call

  • Synchronous call
    • When service A calls service B, it needs to wait for the return value after the execution of service B before service A can continue to execute
    • Synchronous calls can be done through REST and RPC
      • REST: Ribbon,Feign
      • RPC: Dubbo
  • Asynchronous call
    • Service A calls service B, and A does not need to wait for the execution result of service B
    • Asynchronous call through message queue

2. Message queue concept

  • The full name of MQ is Message Queue. Message Queue (MQ) is an application to application communication method. Applications communicate by reading and writing messages in and out of the queue (data for the application) without requiring a dedicated connection to link them.
  • Message passing refers to the communication between programs by sending data in messages, rather than through direct calls to each other. Direct calls are usually used for technologies such as remote procedure calls. Queuing means that applications communicate through queues. The use of queues removes the requirement that the receiving and sending applications execute simultaneously.

3. Common Message Queuing Middleware

  • RabbitMQ,ActiveMQ,RocketMQ,Kafka
    • RabbitMQ is stable and reliable with consistent data, supports multiple protocols, has message confirmation, and is based on erlang language
    • Kafka has high throughput, high performance, fast persistence, no message acknowledgement, no message omission, and there may be duplicate messages. It is dependent on zookeeper, and the cost is high
    • ActiveMQ is not flexible and lightweight enough to support many queues
    • RocketMQ has good performance, high throughput, high availability, supports large-scale distribution, and supports a single protocol

2, RabbitMQ

1. RabbitMQ introduction

  • RabbitMQ is a reusable enterprise message system based on AMQP. He follows the Mozilla Public License open source agreement.
  • AMQP, or Advanced Message Queuing Protocol, is an application layer standard Advanced Message Queuing Protocol that provides unified messaging services. It is an open standard of application layer protocol and is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages, which is not limited by different products and different development languages of the client / middleware. The implementations in Erlang include RabbitMQ and so on.
  • Main features:
    • Ensure reliability: some mechanisms are used to ensure reliability, such as persistence, transmission confirmation and release confirmation
    • Flexible routing function
    • Scalability: it supports message clustering, and multiple RabbitMQ servers can form a cluster
    • High availability: the queue is still available when a node in the RabbitMQ cluster has a problem
    • Support multiple protocols
    • Support multilingual client
    • Provide good management interface
    • Provide tracking mechanism: if an exception occurs in a message, you can analyze the cause of the exception through the tracking mechanism
    • Provide plug-in mechanism: it can be extended in many aspects through plug-ins

2. Install rabbitmq through docker

2.1 Erlang installation

Reference address: https://www.erlang-solutions.com/downloads/

wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpmrpm -Uvh erlang-solutions-2.0-1.noarch.rpm
yum install -y erlang
erl -v

Install socat

yum install -y socat

2.2 installing dokcer

(1)yum Update package to latest
yum update
(2)Install the required packages, yum-util provide yum-config-manager Function, the other two are devicemapper Drive dependent
yum install -y yum-utils device-mapper-persistent-data lvm2
(3)set up yum Source: alicloud
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)install docker
yum install docker-ce -y
(5)View after installation docker edition
docker -v
 (6) Install accelerated mirror
 sudo mkdir -p /etc/docker
 sudo tee /etc/docker/daemon.json <<-'EOF'
 {
"registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
 }
 EOF
 sudo systemctl daemon-reload
 sudo systemctl restart docker

docker related commands

# Start docker:
systemctl start docker
# Stop docker:
systemctl stop docker
# Restart docker:
systemctl restart docker
# View docker status:
systemctl status docker
# Startup:  
systemctl enable docker
systemctl unenable docker
# View docker profile
docker info
# View docker help documentation
docker --help

2.3 installation rabbitmq

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

Start RabbitMQ installed in docker

  • docker ps -a

    View all docker containers

  • docker start CONTAINER ID

    CONTAINER ID is the CONTAINER ID of the container displayed in docker ps -a

  • docker ps

    View started containers

3, RabbitMQ user management

1. Logical structure

  • user
  • Virtual host
  • queue

2. User management

2.1 user level

  • The super administrator administrator can log in to the console, view all information, and operate users and policies
  • The monitor can log in to the console and view the relevant information of the node, such as the number of processes, memory and disk usage
  • The policy maker can log in to the console to make policies, but cannot view node information
  • Ordinary administrator management can only log in to the console
  • Others, unable to log in to the console, generally refers to providers and consumers

2.2 add user (command mode)

  • docker ps
    #Enter container
    docker exec -it d2dd40da7056 /bin/bash
    
  • Add / configure user test and set password to test

    rabbitmqctl add_user test test
    

    If rabbitmq is not installed through docker, you need to execute. / rabbitmqctl add in the sbin directory of rabbitmq_ user test test

  • Set user permissions

    #Set admin to the administrator level
    rabbitmqctl set_user_tags test administrator
    

2.3 adding users (web Mode)

  • Browser access: http://47.113.192.192/:15672/ (log in with guest. Guest has the highest permission and can only log in locally; first create a user using the command line)

2.4 adding virtual hosts

2.5 user binding virtual host

4, RabbitMQ working mode

1. Message queue mode

Reference documents: http://www.rabbitmq.com/getstarted.html

1.1 simple mode

The simple mode is that the producer sends the message to the queue, and the consumer takes the message from the queue. A message corresponds to a consumer

A queue has only one consumer

1.2 working mode

Work mode is that a message can be received by multiple consumers, but only one consumer can get it in the end

1.3 subscription mode

A message can be obtained by multiple consumers at the same time

The producer sends the message to the switch, and the consumer registers its corresponding queue to the switch

After sending the message, all consumers who have registered the queue can receive the message

1.4 routing mode

The producer sent the message to the switch with type direct mode

When the consumer's queue binds itself to the route, it will bind itself with a key

Only when the producer sends a message in the corresponding key format will the corresponding queue receive the message

5, Rabptimq switch and queue management

1. Create queue

2. Create a switch

3. Switch binding queue

Enter switch ex1

Bind two queues queue1 and Queue2 on the switch ex1 with the mode of fanout

6, Using MQ in normal Maven applications

RabbitMQ message queuing mode

1. Simple mode

1.1 message producer

  • Create Maven project

  • Add dependencies required for RabbitMQ connections

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.9</version>
    </dependency>
    
  • Create log configuration file log4j.properties

    log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG 
    log4j.logger.org.mybatis = DEBUG
    log4j.appender.A1=org.apache.log4j.ConsoleAppender
    log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
    
  • Create MQ connection help class

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtil {
    
        public static Connection getConnection() throws IOException, TimeoutException {
            //1. Create a connection factory
            ConnectionFactory factory = new ConnectionFactory();
            //2. Set MQ connection information (ip,port,virtualhost,username,password) in the factory object
            factory.setHost("47.113.192.192");
            factory.setPort(5672);
            factory.setVirtualHost("host1");
            factory.setUsername("admin");
            factory.setPassword("admin");
            //3. Obtain the connection with MQ through the factory object
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    
  • Message producer sends message

    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class SendMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            String msg = "Hello RabbitMQ";
    
            Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC
            Channel channel = connection.createChannel();   //Equivalent to JDBC statement
    
            //Define queue (create a new MQ queue in Java code)
            //Parameter 1: defined queue name
            //Parameter 2: whether to select persistence for data in the queue
            //Parameter 3: exclusive (whether the current queue is private to the current connection)
            //Parameter 4: automatic deletion (when the number of connections in this queue is 0, this queue will be destroyed (whether there is data in the queue or not))
            //Parameter 5: set the parameters of the current queue
            //channel.queueDeclare("queue7",false,false,false,null);
    
            //Parameter 1: switch name (simple mode here, no switch)
            //Parameter 2: destination queue name
            //Parameter 3: set the properties of the current message (for example, set the expiration time)
            //Parameter 4: content of message
            channel.basicPublish("","queue1",null,msg.getBytes());
            System.out.println("send out" + msg);
    
            channel.close();
            connection.close();
        }
    }
    
    

1.2 message consumers

  • Message consumer receives message

    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue1",true,consumer);
        }
    }
    
    

2. Working mode

2.1 message producer

  • Message producer sends message

    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    public class SendMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("Please enter a message:");
            Scanner scanner = new Scanner(System.in);
            String msg = null;
            while(!"quit".equals(msg = scanner.nextLine())){
                Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC
                Channel channel = connection.createChannel();   //Equivalent to JDBC statement
    
                //Define queue (create a new MQ queue in Java code)
                //Parameter 1: defined queue name
                //Parameter 2: whether to select persistence for data in the queue
                //Parameter 3: exclusive (whether the current queue is private to the current connection)
                //Parameter 4: automatic deletion (when the number of connections in this queue is 0, this queue will be destroyed (whether there is data in the queue or not))
                //Parameter 5: set the parameters of the current queue
                //channel.queueDeclare("queue7",false,false,false,null);
    
                //Parameter 1: switch name (working mode here, no switch)
                //Parameter 2: destination queue name
                //Parameter 3: set the properties of the current message (for example, set the expiration time)
                //Parameter 4: content of message
                channel.basicPublish("","queue2",null,msg.getBytes());
                System.out.println("send out" + msg);
    
                channel.close();
                connection.close();
            }
        }
    }
    
    

2.2 message consumers

  • Message consumer receives message

    • consumer1
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("consumer1 The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue2",true,consumer);
        }
    }
    
    
    • consumer2
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("consumer2 The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue2",true,consumer);
        }
    }
    

3. Subscription mode

3.1 message producer

  • Message producer sends message
import com.eicoma.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class SendMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("Please enter a message:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC
            Channel channel = connection.createChannel();   //Equivalent to JDBC statement
            //Parameter 1: switch name
            //Parameter 2: destination queue name
            //Parameter 3: set the properties of the current message (for example, set the expiration time)
            //Parameter 4: content of message
            channel.basicPublish("ex1","",null,msg.getBytes());
            System.out.println("send out" + msg);

            channel.close();
            connection.close();
        }
    }
}

3.2 message consumers

  • Message consumer receives message

    • consumer1
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("consumer1 The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue3",true,consumer);
        }
    }
    
    
    • consumer2
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("consumer2 The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue4",true,consumer);
        }
    }
    

4. Routing mode

4.1 message producer

  • Message producer sends message
import com.eicoma.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class SendMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("Please enter a message:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection(); //It is equivalent to database connection in JDBC
            Channel channel = connection.createChannel();   //Equivalent to JDBC statement

            //Parameter 1: switch name
            //Parameter 2: the name of the target queue, but since there is a switch here, this parameter is Key
            //Parameter 3: set the properties of the current message (for example, set the expiration time)
            //Parameter 4: content of message
            if(msg.startsWith("a")){
                channel.basicPublish("ex2","a" ,null,msg.getBytes());
            }else if(msg.startsWith("b")){
                channel.basicPublish("ex2","b" ,null,msg.getBytes());
            }
            System.out.println("send out" + msg);

            channel.close();
            connection.close();
        }
    }
}

4.2 message consumers

  • Message consumer receives message

    • consumer1
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("consumer1 The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue5",true,consumer);
        }
    }
    
    
    
    • consumer2
    import com.eicoma.mq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveMsg {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
    
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body is the data obtained from the queue
                    String msg = new String(body);
                    System.out.println("consumer2 The data received is:" + msg);
                }
            };
    
            channel.basicConsume("queue6",true,consumer);
        }
    }
    
    

7, Using MQ in SpringBoot

SpringBoot can complete automatic configuration and dependency injection, and directly obtain the connection object of RabbitMQ through Spring

1. Message producer

  • Correlation dependency

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
    
  • Configure applicaiton.yml

    server:
      port: 9001
    
    spring:
      application:
        name: producer
      rabbitmq:
        host: 47.113.192.192
        port: 5672
        virtual-host: host1
        username: admin
        password: admin
    
  • Producer sends message

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    
    @Service
    public class TestService {
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void senMsg(String msg){
                //1. Send message to queue
                amqpTemplate.convertAndSend("queue1",msg);
            
                //2. Send message to switch (subscription switch)
                amqpTemplate.convertAndSend("ex1","",msg);
            
                //3. Send message to switch (routing switch)
                amqpTemplate.convertAndSend("ex2","a",msg);
                amqpTemplate.convertAndSend("ex2","b",msg);
        }
    }
    

2. Message consumers

  • Add dependency

  • Configure yml

    server:
      port: 9002
    
    spring:
      application:
        name: producer
      rabbitmq:
        host: 47.113.192.192
        port: 5672
        virtual-host: host1
        username: admin
        password: admin
    
  • Consumer receives message

    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @RabbitListener(queues = {"queue1"})
    public class ReceiveMsgService {
    
        @RabbitHandler
        public void receiveMsg(String msg){
            System.out.println("receive msg:" + msg);
        }
    }
    

Keywords: Java RabbitMQ

Added by MartinGr on Thu, 04 Nov 2021 07:01:04 +0200