RabbitMQ practical tutorial (video learning notes)

RabbitMQ practical tutorial

In this paper, I personally experiment and modify some places for the notes in the video recorded when learning rabbitmq. The versions of centos and rabbitmq used are different from those in the video. You can give the video three links to support up master ha. The video address is: https://www.bilibili.com/video/BV1dE411K7MG?from=search&seid=15593601763323732951&spm_id_from=333.337.0.0

1.MQ introduction

1.1 what is MQ

MQ(Message Queue): translated into message queue. Through the typical producer and consumer model, producers continuously produce messages to the message queue, and consumers continuously obtain messages from the queue. Because the production and consumption of messages are asynchronous, and only care about the consumption and production of messages, without the intrusion of business logic, it is easy to realize the decoupling between systems. The alias is message middleware, which uses efficient and reliable message passing mechanism for platform independent data exchange, and integrates distributed systems based on data communication.

1.2 what are the advantages of MQ

There are many message oriented middleware in the market today, such as the old ActiveMQ, RabbitMQ, the hot Kafka, and RocketMQ independently developed by Alibaba.

1.3 characteristics of different MQ

# 1.ActiveMQ
	ActiveMQ yes Apache Product, the most popular and powerful open source message bus. He is a full supporter JMS Standardized message oriented middleware. abundant API,Multiple cluster architecture modes ActiveMQ Known as the old brand of message middleware, it is very popular in small and medium-sized enterprises.
# 2.Kafka
	Kafka yes LinkedIn The company's open source distributed release-Subscription message middleware, which currently belongs to Apache Top projects. Kafka The main feature is based on pull Pattern to handle message consumption. In pursuit of high throughput, the initial purpose is to collect and transmit logs. 0.8 The version starts to support replication, does not support things, and has no strict requirements for message loss, repetition and error. Data collection business suitable for Internet services that generate a large amount of data.
# 3.RocketMQ
	RocketMQ It is Alibaba's open source message middleware. He is pure java It has the characteristics of high throughput, high availability and suitable for large-scale distributed system applications. RocketMQ The idea originated from Kafka,But it's not Kafka It optimizes the reliable transmission of messages and things. At present, Alibaba group is widely used in transaction, recharge, stream computing, message push, log stream processing binglog Distribution and other scenarios.
# 4.RabbitMQ
	RabbitMQ Is to use Erlang Open source message middleware system developed by language, based on AMQP Protocol. AMQP Its main features are message oriented, queue oriented, routing (including point-to-point and publish subscribe), reliability and security. AMQP The protocol is more used in scenarios with high requirements for data consistency, stability and reliability in the enterprise, followed by the requirements for performance and throughput.

RabbitMQ is more reliable than Kafka. Kafka is more suitable for IO high throughput processing. It is generally used in big data log processing or scenarios with slightly lower requirements for real-time (low delay) and reliability (a small amount of data loss), such as ELK log collection.

2. Introduction of rabbitmq

2.1 RabbitMQ introduction

RabbitMQ is developed in erlang language based on AMQP protocol. It is the most widely deployed open source message middleware and one of the most popular message middleware.

# AMQP protocol (to be discussed separately later)
	AMQP(Advanced Message Queuing Protocol,Advanced message queuing protocol),It was proposed in 2003 to solve the interaction problem of message passing between different platforms in the financial field. seeing the name of a thing one thinks of its function, AMQP It's a kind of agreement, more accurately a kind of agreement binary wrie-level protocol(Link agreement). Then its sum JMS The essential difference between, AMQP Not from api Layer, but directly define the data format of network exchange. This enables AMQP of provider Naturalness is cross platform.
								Server
					+---------------------------+
					|		Virtual Host		|
					|	+--------------------+  |
   	+------------+  |	|	+-----------+    |	|
	| Publisher  | -------->| Exchange	|    |  |
	| Application|	|	|	+-----+-----+    |  |
	+------------+	|   | 		  |			 |	|
					|	|	+-----------+	 |	|	+-------------+
					|	|	+  Message	+	 |	|	|  Consummer  |
					|	|	+	Queue	+ --------->| Application |
					|	|	+-----------+	 |	|	+-------------+
					|	+--------------------+	|
					+---------------------------+

2.2 RabbitMQ installation

The installation environment is as follows:

  • System: centos8 64 bit
  • erlang: 24.1.7
  • rabbitmq: 3.9.11

2.2.1 erlang Download

Because rabbitMQ is developed based on Erlang, download the package of Erlang first: https://github.com/rabbitmq/erlang-rpm

Comparison between Erlang version and rabbitmq version: https://www.rabbitmq.com/which-erlang.html

The latest rabbitmq version used in this development is 3.9 11. The minimum supported erlang version is 23.2, so this time erlang uses 24.1 Version 7

2.2.2 RabbitMQ Download

Rabbitmq download address: https://www.rabbitmq.com/install-rpm.html#downloads

2.2. 3 installation and startup

# 1. Upload rabbitmq related packages to the linxu server
	use scp The command is uploaded to two packages on the server:
	scp ./erlang-24.1.7-1.el8.x86_64.rpm root@10.3.4.5:/root/
	scp ./rabbitmq-server-3.9.11-1.el8.noarch.rpm root@10.3.4.5:/root/
# 2. Install erlang and rabbitmq successively
	use rpm Command to install. If the dependency is missing, you will be prompted:
	rpm -ivh erlang-24.1.7-1.el8.x86_64.rpm
	rpm -ivn rabbitmq-server-3.9.11-1.el8.noarch.rpm
# 3. Modify the configuration of rabbitmq
	use rpm During package installation, the configuration file is not placed in the specified directory, so you need to create a configuration file yourself:/etc/rabbitmq/rabbitmq.conf,For specific configuration contents, please refer to: https://www.rabbitmq.com/configure.html#config-file-formats. 
	Here we need to modify one: loopback_users=none,Indicates that it can make guest Remote access for users. By default, guest Users can only localhost Access under domain name. We use ECs and need to use it ip Access, so you need to modify this configuration.
# 4. Open the management console plug-in
	It's actually on rabbitmq A plug-in for: rabbitmq_management,Can let us use web Interface management rabbitmq. Execute command: rabbitmq-plugins enable rabbitmq_management. This command also opens two additional plug-ins: rabbitmq_management_agent,rabbitmq_web_dispatch. 
# 5. Start / stop / restart rabbitmq service
	rabbitmq During installation, it will be set as system service. You can use the system service command:
	systemctl start/stop/restart rabbitmq-server.service
# 6. View service status
	systemctl stauts rabbitmq-server.service
    The results are as follows:
    ● rabbitmq-server.service - RabbitMQ broker
       Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
       Active: active (running) since Sun 2021-12-12 23:56:51 CST; 1 day 9h ago
      Process: 22710 ExecStop=/usr/sbin/rabbitmqctl shutdown (code=exited, status=0/SUCCESS)
     Main PID: 22758 (beam.smp)
        Tasks: 23 (limit: 23722)
       Memory: 94.8M
       CGroup: /system.slice/rabbitmq-server.service
               ├─22758 /usr/lib64/erlang/erts-12.1.5/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 12800>
               ├─22773 erl_child_setup 32768
               ├─22827 inet_gethost 4
               └─22828 inet_gethost 4
# 7. Access the management interface
 	default http The port of the background interface is 15672.
 	http://10.3.4.5:15672/
# 8. Login
	account number/password: guest/guest

3.RabbitMQ configuration

3.1 RabbitMQ management command line

# 1. Service management
	systemctl start/stop/restart rabbitmq-server.service
# 2. Management command line
	Can be used when not in use web Management on the management side rabbitmq. 
	rabbitmqctl help // View all commands
# 3. Plug in management
	rabbitmq-plugins enable/list/disable

3.2 introduction to Web Management

  • Connection: connection. Both consumers and producers must establish a connection with rabbitmq to produce and consume messages.
  • Channels: channels. After a connection is established, messages are delivered and obtained through channels.
  • Exchages: a switch used to implement messages.
  • Queues: queue in which messages are stored for consumption and then removed.

3.2. 1 user and virtual host management

3.2. 1.1 users

# Tags Description:
	Admin(Super administrator): Log in to the console, view all information, and check users and policies( policy)Make changes.
	Monitoring(Monitor): log in to the console to view rabbitmq Node (number of processes, memory, disk usage, etc.) information.
	Policymaker(Policy maker): log in to the console, right policy Manage.
	Management(General administrator): log in to the console to view the information.
	Others: unable to log in to the console, that is, ordinary consumers or producers.
3.2. 1.2 virtual host

# Virtual host:
	In order to allow users to work without interference with each other, RabbitMQ Added virtual host( Virtual Host)Concept. In fact, it is an independent access path. Different users use different paths, and each has its own queue and switch, which does not affect each other. amount to MySQL Database in.
3.2. 1.3 user and virtual host binding

4. Java client of rabbitmq

4.1 AMQP protocol review

4.2 message types supported by rabbitmq

4.3 introducing dependencies

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.14.0</version>
</dependency>

4.4 client code of various models

4.4.1 Direct direct mode

# Related concepts:
	P: provider,Producer, the program to send the message
	C: consumer,Consumer, consumer message program
	Queue: Queue, where messages are stored

Producer:

/**
 * direct Mode producer
 * <p>
 * Direct point-to-point, that is, the producer sends the message to the queue, and the consumer obtains it directly from the queue
 *
 * @author secret
 * @date 2021/12/14 11:34 AM
 */
public class TestProducer {
    // Define queue name
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. Create connection project
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.43.52.186");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/adu");
        connectionFactory.setUsername("adu");
        connectionFactory.setPassword("adu");
        // 2. Create connection
        Connection connection = connectionFactory.newConnection();
        // 3. Create channel
        Channel channel = connection.createChannel();
        // 4. Declare queue: queue name, persistence, exclusive, additional parameters
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 5. Send message to queue: switch name, queue name, additional parameters, message body
        channel.basicPublish("", QUEUE_NAME, null, "hello world".getBytes(StandardCharsets.UTF_8));
        // 6. Close resources
        channel.close();
        connection.close();
    }
}

Consumer:

/**
 * direct Model - Consumer
 *
 * @author secret
 * @date 2021/12/14 11:34 AM
 */
public class TestConsumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 create a connection factory and set basic parameters
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.43.52.186");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/adu");
        connectionFactory.setUsername("adu");
        connectionFactory.setPassword("adu");
        // 2 create connection
        Connection connection = connectionFactory.newConnection();
        // 3 Create channel
        Channel channel = connection.createChannel();
        // 4 declaration queue
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 4 receive message
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            // Callback method
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer: " + new String(body, StandardCharsets.UTF_8));
            }
        });
        // 5 close resources
        channel.close();
        connection.close();
    }
}

4.4.2 Work Queues

Work Queue is also called Task Queue task model. When message processing is time consuming, the speed of message production may far exceed the speed of message consumption. In the long run, messages will accumulate, and the Work Queue model can be used. Let multiple consumers bind to the same queue and consume messages in the queue together. Once the messages in the queue are consumed, they will be deleted, so there will be no repeated consumption.

// Extract the tool class connecting rabbitmq
public class RabbitMQUtil {

    public static Connection getConnection()    {
        try {
            // Create connection factory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // Set parameters
            connectionFactory.setHost("101.43.52.186");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/adu");
            connectionFactory.setUsername("adu");
            connectionFactory.setPassword("adu");
            // Create connection
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static Channel getChannel(Connection connection) {
        try {
            return connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Channel channel, Connection connection) {
        if (channel != null) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Producer:

/**
 * Work Queue Model
 * <p>
 * The work queue model deals with the situation that consumers consume slowly. Multiple consumers consume messages in the same queue.
 *
 * @author secret
 * @date 2021/12/14 2:02 PM
 */
public class WorkQueueProducer {

    public static final String QUEUE_NAME = "work";

    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declaration queue
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // Send a message
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("hello work queue " + i).getBytes(StandardCharsets.UTF_8));
        }
        // close resource
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class WorkQueueConsumer1 {
    public static final String QUEUE_NAME = "work";
    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get pipeline
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declaration queue
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // Consumption news
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
 and consumer1 Same code, modify consumption output: System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));

result:

# consumer1 output...
consumer1: hello work queue 0
consumer1: hello work queue 2
consumer1: hello work queue 4
consumer1: hello work queue 6
consumer1: hello work queue 8
# consumer2 output...
consumer2: hello work queue 1
consumer2: hello work queue 3
consumer2: hello work queue 5
consumer2: hello work queue 7
consumer2: hello work queue 9

Summary:

By default, RabbitMQ will send messages to each consumer in order, that is, each consumer will receive the same number of messages. This way of sharing messages is called a loop.

4.4. 2.1 automatic message confirmation mechanism

If a consumer goes down while performing a task, in the above code, once RabbitMQ passes a message to the consumer, it will be immediately marked as deleted, and the unprocessed messages in the consumer will be lost. We hope that in the process of processing tasks, when one consumer goes down, we can hand over the tasks to other consumers. At this time, we can use the automatic message confirmation mechanism. That is, we can notify RabbitMQ to delete the task after processing the task.

Producer:

// ditto

Consumer:

// consumer1
public class WorkQueueConsumer3 {
    public static final String QUEUE_NAME = "work";
    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get pipeline
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declaration queue
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // Only one message can be consumed at a time
        channel.basicQos(1);
        // Consumption news
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000); // Simulation processing message consumption
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
                // Manual confirmation, parameter 1: message marking parameter 2: whether to confirm multiple items at the same time. false means that only one item can be confirmed at a time
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}
// consumer2
public class WorkQueueConsumer4 {
    public static final String QUEUE_NAME = "work";
    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get pipeline
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declaration queue
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // Limit 1 item per consumption
        channel.basicQos(1);
        // For consumption messages, change autoAck to false, indicating that automatic confirmation is not required
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
                // Manual confirmation
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

result:

# consumer1 output...
consumer1: hello work queue 1
consumer1: hello work queue 3
consumer1: hello work queue 5
consumer1: hello work queue 8
consumer1: hello work queue 10
consumer1: hello work queue 13
consumer1: hello work queue 15
consumer1: hello work queue 18
# consumer2 output...
consumer1: hello work queue 0
consumer1: hello work queue 2
consumer1: hello work queue 4
consumer1: hello work queue 6
consumer1: hello work queue 7
consumer1: hello work queue 9
consumer1: hello work queue 11
consumer1: hello work queue 12
consumer1: hello work queue 14
consumer1: hello work queue 16
consumer1: hello work queue 17
consumer1: hello work queue 19

Summary:

To realize automatic message confirmation, you need to pay attention to two points: 1) set the channel to consume only one message at a time; 2) turn off automatic confirmation and confirm messages manually

4.4.3 Publish/Subscribe publish / subscribe mode

# Message flow:
	There can be multiple consumers, and each consumer has its own queue(Queue), and each queue must be bound exchange(Switch).
	The message sent by the producer can only be sent to the switch. The switch determines which queue to send, but the producer cannot decide.
	The switch sends the message to all the bound queues, and the consumers of the queue can get the message to realize that a message is consumed by multiple consumers.
# Corresponding switch type: fanout

Proudcer:

public class PublishSubscriptProducer {

    public static final String QUEUE_NAME = "fanout";

    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Declare switch name switch type
        channel.exchangeDeclare("logs", "fanout");
        // Send message to switch
        channel.basicPublish("logs", "", null, "hello fanout".getBytes(StandardCharsets.UTF_8));
        // close resource
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class PublishSubscribeConsumer1 {

    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get channel
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Claim switch
        channel.exchangeDeclare("logs", "fanout");
        // Create temporary queue
        String queue = channel.queueDeclare().getQueue();
        // Bind switch and queue name switch name routing
        channel.queueBind(queue, "logs", "");
        // receive messages
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
 with consumer1,The output is modified to: System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));

result:

# consumer1 output...
consumer1: hello fanout, type [
consumer1: hello fanout

# consumer2 output...
consumer2: hello fanout, type [
consumer2: hello fanout

4.4.4 Routing - routing mode

In publish subscribe mode, a message is consumed by all subscribed queues. However, in some scenarios, we want different message types to be consumed by different queues. In this case, we need to use the direct Exchange of Routing mode.

# In Direct mode:
	The binding between the queue and the switch cannot be arbitrary, but one needs to be specified Routing Keying,The message sender is sending a message to Exchange When sending a message, you must also specify the name of the message Routing Key. 
	Exchange The message is no longer sent to each queue bound to it, but according to the message Routing Key Judgment, only queue Routing Key And message Routing Key The message is sent only when it is completely consistent.

# explain:
	P: Proudcer,Producer, to Exchange Send a message and specify a Routing Key. 
	X: Exchange,The switch receives the message sent by the producer, and then passes the message to the routing key An exact match queue.
	C1: Consumer,Consumer whose queue specifies the required routing key by error News of
	C2: Consumer,Consumer whose queue specifies the required routing key by info,error,warning News of

Producer:

public class RoutingProducer {

    public static void main(String[] args) throws IOException {
        // Get connection
        Connection connection = RabbitMQUtil.getConnection();
        // Get pipeline
        Channel channel = RabbitMQUtil.getChannel(connection);
        // Claim switch
        channel.exchangeDeclare("logs_direct", "direct");
        // send message
        String routingKey = "warning";
        channel.basicPublish("logs_direct", routingKey, null, String.format("routing for type [%s]", routingKey).getBytes(StandardCharsets.UTF_8));
        // close resource
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class RoutingConsumer1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare("logs_direct", "direct");
        String queue = channel.queueDeclare().getQueue();
        // consumer1 binds the routing key to info
        channel.queueBind(queue, "logs_direct", "info");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
public class RoutingConsumer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare("logs_direct", "direct");
        String queue = channel.queueDeclare().getQueue();
        // consumer1 binds the routing key to info, error and warning
        channel.queueBind(queue, "logs_direct", "info");
        channel.queueBind(queue, "logs_direct", "error");
        channel.queueBind(queue, "logs_direct", "warning");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

result:

# Send the routing key as info, error and warning results in sequence:

## consummer1
consumer1: routing for type [info]

## consumer2
consumer1: routing for type [info]
consumer1: routing for type [error]
consumer1: routing for type [warning]

4.4.5 Topics topics (dynamic routing mode)

Compared with the routing mode, Topics mode can route messages to different queues according to the Routing Key. However, wildcards can be used for routing keys of topic type. Routing Key is generally composed of one or more words, which are separated by ','. The type of Exchange is topic.

# Wildcard:
	*(star)can substitute for exactly one word. Match a word exactly.
	#(hash) can substitute for zero or more words.
## For example:
	audit.# : match audit and audit irs,audit.irs.corporate et al.
	audit.* : Only match audit.irs. 

Producer:

public class TopicsProducer {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare("logs_topics", "topic");
        String routingKey = "user.save.now";
        channel.basicPublish("logs_topics", routingKey, null, String.format("hello topics, routing key : [%s]", routingKey).getBytes(StandardCharsets.UTF_8));
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class TopicsConsumer1 {

    public static final String EXCHANGE_NAME = "logs_topics";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queue = channel.queueDeclare().getQueue();
        // The route bound between the queue and the switch is user*
        channel.queueBind(queue, EXCHANGE_NAME, "user.*");
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
public class TopicsConsumer2 {

    public static final String EXCHANGE_NAME = "logs_topics";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queue = channel.queueDeclare().getQueue();
        // The route bound between the queue and the switch is user*
        channel.queueBind(queue, EXCHANGE_NAME, "user.#");
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

result:

# Send the routing key as user save,user,user.save. Results of now:

## conumser1:
consumer1: hello topics, routing key : [user.save]

## consumer2:
consumer2: hello topics, routing key : [user.save]
consumer2: hello topics, routing key : [user]
consumer2: hello topics, routing key : [user.save.now]

5. Spring boot reuses RabbitMQ

5.1 construction environment

5.1. 1. Introduce dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.1. 2 configuration information

spring:
  application:
    name: adu-rabbitmq
  rabbitmq:
    host: 10.2.3.4
    port: 5672
    username: adu
    password: adu
    virtual-host: /adu

5.2 client code of various models

5.2.1 Hello World direct connection model

/**
 * direct Direct Mode 
 *
 * @author secret
 * @date 2021/12/14 6:30 PM
 */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloWorldConsumer {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("consumer: " + message);
    }

}

5.2.2 Work Queue - work queue mode

/**
 * Work Queue Model - Consumer
 * <p>
 * In the circular mode, each consumer gets the same number of messages
 *
 * @author secret
 * @date 2021/12/14 6:36 PM
 */
@Component
public class WorkQueueConsumer {

    @RabbitListener(queuesToDeclare = @Queue(value = "work", durable = "true", exclusive = "false", autoDelete = "false"))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue(value = "work", durable = "true", exclusive = "false", autoDelete = "false"))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }

}

5.2.3 Publish/Subscribe - publish subscribe mode

/**
 * Publish/Subscribe pattern
 * <p>
 * Use the switch to send messages to all bound queues
 *
 * @author secret
 * @date 2021/12/14 6:44 PM
 */
@Component
public class PublishSubscribeConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // If there are no parameters, it indicates a temporary queue
            exchange = @Exchange(value = "logs", type = "fanout") // Switch
    ))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // If there are no parameters, it indicates a temporary queue
            exchange = @Exchange(value = "logs", type = "fanout") // Switch
    ))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }
}

5.2.4 Routing - routing mode (direct)

/**
 * Routing Model - Consumer
 * <p>
 * Exchange Type: direct
 *
 * @author secret
 * @date 2021/12/14 7:01 PM
 */
@Component
public class RoutingConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Temporary queue
            exchange = @Exchange(value = "logs_direct", type = "direct"),
            key = "info" // routing key
    ))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Temporary queue
            exchange = @Exchange(value = "logs_direct", type = "direct"),
            key = {"info", "error", "warning"} // routing key
    ))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Temporary queue
            exchange = @Exchange(value = "logs_direct", type = "direct"),
            key = {"warning"}
    ))
    public void receive3(String message) {
        System.out.println("consumer3: " + message);
    }
}

5.2.5 Topics - topic (dynamic routing) mode

/**
 * Topics Model - Consumer
 *
 * Exchange The type of is topic
 *
 * @author secret
 * @date 2021/12/14 7:16 PM
 */
@Component
public class TopicsConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Temporary queue
            exchange = @Exchange(value = "logs_topics", type = "topic"),
            key = {"user.*"}
    ))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // Temporary queue
            exchange = @Exchange(value = "logs_topics", type = "topic"),
            key = {"user.#"}
    ))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }

}

5.2. 6 client test code

/**
 * The production side does not specify a switch, only routing key and object. Match the routing key on the production side with the queue on the consumer side
 */
@SpringBootTest(classes = RabbitMQApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
@EnableAutoConfiguration
public class RabbitMQApplicationTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // Direct Mode 
    @Test
    public void testHelloWorld() {
        rabbitTemplate.convertAndSend("hello", "hello spring boot starter amqp...");
    }
	// Work Queue mode
    @Test
    public void testWorkQueue() {
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("work", "hello work queue" + i);
        }
    }
	// Publish/Subscribe mode
    @Test
    public void testPublishSubscribe() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("logs", "", "hello publish subscribe " + i);
        }
    }
	// Routing mode
    @Test
    public void testRouting() {
        String routingKey = "warning";
        rabbitTemplate.convertAndSend("logs_direct", routingKey, "hello routing, type=" + routingKey);
    }
	// Topics mode
    @Test
    public void testTopics() {
        String routingKey = "user.save";
        rabbitTemplate.convertAndSend("logs_topics", routingKey, "hello topics, type=" + routingKey);
    }
}

6. Application scenarios of MQ

6.1 asynchronous processing

# Scenario Description:
	After registering, users need to send e-mail and register SMS. There are two traditional methods: 1) serial mode and 2) parallel mode
## Serial mode
	After writing the registration information into the database, send e-mail and register SMS. After all the above three tasks return results, they will be returned to the client. There is a problem here. E-mail and SMS are not necessary. It is just a notice, and this practice needs to make the client wait for things that are not necessary to wait.
## Parallel mode
	After the registration information is written into the database, send an email and a short message at the same time. After the above three tasks are completed, they are returned to the client. The parallel method can improve the processing efficiency.

# Message queuing:
	In addition to the two traditional methods mentioned above, we can also choose the form of message queue. Because it is not necessary to send e-mail and SMS, we can use the form of message queue for asynchronous processing. Compared with the parallel mode, it reduces the time of sending e-mail or SMS, which is equivalent to writing only the registration information into the database + The time-consuming operation of writing to the message queue.

6.2 application decoupling

# Scenario:
	Double eleven is a shopping festival. After users place an order, the order system needs to notify the inventory system. The traditional practice is that the order system calls the interface provided by the inventory system.
# Disadvantages:
	This approach has a disadvantage: when the inventory system is abnormal, the order system cannot be used. The order system and inventory system are coupled. The solution is to introduce message queuing.

# Import message queue:
	Order system: after the user places an order, the order system writes the message to the message queue after the order system completes the persistence, and returns the success of the user's order.
	Inventory system: subscribe to order messages, obtain order messages, and perform inventory operations. Even if the inventory system fails, the message queue can ensure the reliable delivery of messages without message loss.

6.3 flow peak shaving

# Scenario:
	Second kill activities usually cause the application to hang up due to excessive traffic. In order to solve this problem, the message queue is usually added before the application.
# effect:
	1.The number of active people can be controlled, and orders exceeding a certain threshold can be discarded directly
	2.It can alleviate the crushing of applications due to high traffic in a short time (obtain orders according to their own processing capacity).
# technological process:
	1.After receiving the user's request, the server first joins the message queue. If the maximum length of the message queue is exceeded, the user request is directly discarded or jump to the error page.
	2.The second kill service performs subsequent processing according to the request information in the message queue.

7.RabbitMQ cluster

7.1 cluster architecture (replica cluster)

By default: all data / states required for RabbitMQ agent operations are replicated across all nodes. An exception to this is message queues, which are located on one node by default, although they can be seen and accessed from any node Official website

In other words, the cluster architecture can only synchronize (copy) basic information such as switches between nodes, and cannot synchronize queue information. Queue information is the place where data is saved. If replication cannot be performed between nodes, the cluster will lose some meaning.

7.1. 1 architecture diagram

# Core problem solving:
	At some point in the cluster Master Node downtime, you can Queue Backup the information in.

7.1. 2. Cluster construction

PS: use docker install
# 0. Cluster planning
	node1:  172.18.0.2 5672 15672 		master Master node
	node2:	172.18.0.3 5673 15673		repl1  Replica node
	node3:	172.18.0.4 5674 15674	 	repl2  Replica node
# 1. Clone three host names and ip mappings, and modify the hostname of each virtual machine
	Create network:
		docker network create rabbit-net
	Create container:
		docker run -d --privileged=true --hostname node1 --name node1 -p 15672:15672 -p 5672:5672 --network rabbit-net centos:8.2.2004 /sbin/init
	Enter container:
		docker exec -it node/node1/node2 /bin/bash
	copy erlang,rabbit of rpm Package to container:
		docker cp ./erlang.xxx.rpm node1:/
		docker cp ./rabbitmq.xxx.rpm node1:/
	Installation:
		rpm -ihv erlang.xxx.rpm
		rpm -ivh rabbitmq.xxx.rpm --nodeps --force
	to configure:
		stay/etc/hosts Join each node ip And domain name.
	Question:
		reference resources: https://blog.csdn.net/weixin_42181917/article/details/105579288
		1.System has not been booted with systemd as init system (PID 1). Can't operate.Failed to create bus connection: Host is down
		Solution: create container using /sbin/init
		2.Could not set property: Failed to set static hostname: Device or resource busy
		Solution: exit the container and re-enter the setting once
			hostnamectl set-hostname node/node1/node2
			exit
			hostnamectl set-hostname node/node1/node2
# 2. Install rabbitmq on three machines and synchronize cookie s
	docker cp The command will the host erlang and rabbitmq of rpm The packages are uploaded to the virtual machine and started rabbitmq. 
	docker cp Command will node1 In node /var/lib/rabbitmq/.erlang.cookie The files are copied to the host, and then copied to the host through the host node1 and node2.
	Here we need to pay attention to whether.erlang.cookie Are the file permissions and node Same: chown,chgrp Command can modify permissions.
# 3. Check whether the cookie files are consistent
	Enter the virtual machine of three nodes and execute: cat /var/lib/rabbitmq/.erlang.cookie,Check whether the content is consistent.
# 4. Start rabbit in the background
	rabbitmq-server -detached
# 5. Execute the join cluster command on node2 and node3
	1.close		rabbitmqctl stop_app
	2.Join cluster	   rabbitmqctl join_cluster rabbit@node
	3.Start service	   rabbitmqctl start_app
	

Successfully built:

7.2 mirror cluster

The mirror queue mechanism is to set the master-slave relationship between the three nodes, and the messages will be synchronized automatically between the three nodes. If one of the nodes is unavailable, it will not lead to message loss or service unavailability, so as to improve the overall high availability of the MQ cluster From the official website

7.2. 1 architecture diagram

7.2. 2. Cluster construction

The mirror cluster is an additional configuration based on the replica cluster, that is, you must build the replica cluster before you can build the mirror cluster. This additional configuration requires the creation of a policy.

# 0. Policy description
	rabbitmqctl set_policy [--vhost <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
	--vhost: Optional parameter for the specified virutal host Lower queue Make settings
	--priority: Optional parameters, policy The higher the priority, the higher the priority
	--appliy-to: Optional parameter, indicating the object of policy application, followed by queues|exchanges|all. 
	name: Policy name, unique identification.
	pattern: Regular expression that matches the queue.
	definition: The image definition consists of three parts: ha-mode,ha-params,ha-sync-mode
		ha-mode: Mirror queue mode, optional values: all/exaclty/nodes
			all: Indicates that mirroring is performed on all nodes of the cluster
			exactly: Indicates that the mirror image is performed on the specified number of nodes, and the number of nodes is passed ha-params appoint
			nodes: Indicates that mirroring is performed on the specified node, and the node name is passed ha-params appoint
		ha-params: ha-mode Required parameters
		ha-sync-mode: Queue synchronization method. The optional values are: automatic/manual
			automatic: Automatic synchronization
			manual: User triggered synchronization
# 1. View current policy
	rabbitmqctl list_policies
# 2. Add policy
	rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
	Description: the policy is expressed as"^"Indicates that all queues are matched,"^hello"Indicates a match hello Queue at the beginning
# 3. Delete policy
	rabbitmqctl clear_policy ha-all

Successfully built:

Finally: due to the lack of talent and shallow learning, if you have any questions, please point them out. Thank you ~

Keywords: Java RabbitMQ Back-end Distribution architecture

Added by Crysma on Thu, 16 Dec 2021 23:50:23 +0200