Get started with RabbitMq! One piece is enough

This is a study note about RabbitMq tutorial author: Poor programmers. I hope you can support the poor programmers. I think RabbitMq speaks very well, but notes are charged. (Teacher's source code is included, it's cheap.) I shared it here. It's yy. Please support genuine purchase link: www.baizhiedu.xin

RabbitMQ Actual Warfare Tutorial

1.MQ Introduction

1.1 What is MQ

MQ(Message Quene): Translated into a message queue, through a typical producer and consumer model, producers continuously produce messages to the message queue, and consumers continuously get messages from the queue. Because the production and consumption of messages are asynchronous, and only care about the sending and receiving of messages, there is no intrusion of business logic, and decoupling between systems is easy. Alias Messaging Middleware integrates distributed systems by using efficient and reliable messaging mechanisms for platform-independent data communication and based on data communication.

What are 1.2 MQ s

There are many mainstream message middleware on the market today, such as the old ActiveMQ, RabbitMQ, the hot Kafka, Alibaba develops RocketMQ by itself.

1.3 Different MQ Features

# 1.ActiveMQ
		ActiveMQ yes Apache Out of the box, the most popular and powerful open source messaging bus. It's a full support JMS A canonical message middleware. Abundant API,Multiple cluster architecture patterns allow ActiveMQ Message middleware that has become an established brand in the industry,Popular in SMEs!

# 2.Kafka
		Kafka yes LinkedIn Open Source Distributed Publishing-Subscribe to the messaging system, currently owned by Apache Top level projects. Kafka The main features are based on Pull Mode to handle message consumption,
		In the pursuit of high throughput, the goal was initially for log collection and transmission. 0.8 Version supports replication, does not support transactions, and has no strict requirements for message duplication, loss, and error.
		Data collection business for Internet services that generate large amounts of data.

# 3.RocketMQ
		RocketMQ Is Ali open source messaging middleware, it is pure Java Development, with high throughput, high availability, suitable for large-scale distributed system applications. RocketMQ Thinking
		Originate from Kafka,But it's not Kafka Of Copy,It optimizes the reliable transmission and Transactionality of messages, and is currently widely used in Alibaba Group in transaction, recharge, flow calculation, elimination
		Information push, log streaming, binglog Scenes such as distribution.

# 4.RabbitMQ
		RabbitMQ Is used Erlang Open Source Message Queuing System for Language Development, Based on AMQP Protocol implementation. AMQP The main features are message-oriented, queue-oriented, routing (including point-to-point and point-to-point).
		Release/Subscription), Reliability, Security. AMQP Protocols are more used in scenarios where data consistency, stability, and reliability are required within an enterprise system, while performance and throughput are also required
		Secondly.

RabbitMQ is more reliable than Kafka, which is better suited for high-throughput IO processing. It is generally used in large data log processing or scenarios where real-time (small latency) and reliability (small loss of data) are less required, such as ELK log collection.

2. Introduction to RabbitMQ

The author here suggests installing with docker. I was stuck in the startup phase of server because of a long version conflict between rabbitMq and erlang. If you have to go to the official website to install, it is recommended that you check the Erlang version first, then go to Github to download and pull it to the server. The web address for the Erlang version is described below.

2.1 RabbitMQ

Based on AMQP protocol, erlang language is developed, is the most widely deployed open source message middleware, and is one of the most popular open source message middleware.

Official website: https://www.rabbitmq.com/

Official Tutorial: https://www.rabbitmq.com/#getstarted

 # AMQP Protocol
 		AMQP(advanced message queuing protocol)`It was first proposed in 2003 to solve the problem of messaging interaction between different platforms of financial collar. Seeing the name of a thing one thinks of its function, AMQP Is an agreement, or more precisely, a binary wire-level protocol(Link protocol). This is its sum JMS Essential differences, AMQP Do not follow API Layer defines, but directly defines, the data format of the network exchange. This makes it possible to AMQP Of provider Naturality is cross-platform. The following are AMQP Protocol Model:

2.2 Installation of RabbitMQ

2.2. 1 Download

Official download address: https://www.rabbitmq.com/download.html![image-20190925220115235

Latest version: 3.7. 18

Note: The installation package here is the centos7 installation package

2.2. 3 Installation steps

# 1. Upload the rabbitmq installation package to the linux system
	erlang-22.0.7-1.el7.x86_64.rpm
	rabbitmq-server-3.7.18-1.el7.noarch.rpm

# 2. Install Erlang dependency packages
	rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

# 3. Install RabbitMQ installation package (requires networking)
	yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
		Be careful:Profile template after default installation is completed in:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example In directory,Need	
				Copy Configuration File to/etc/rabbitmq/In directory,And change the name to rabbitmq.config
# 4. Copy Profile
        # Rabbit MQ did not initially provide a configuration file directly under Rabbit Mq under etc, but instead provided us with a template
	cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

# 5. View profile location
	ls /etc/rabbitmq/rabbitmq.config

# 6. Modify the configuration file (see figure below:)
	vim /etc/rabbitmq/rabbitmq.config

There's a discrepancy between the author and the files in the video. If your rabbitMq is the latest version, there won't be this example file, but there's also a configuration file, and it's modified the same way as the following image to remove the guest account. Check the blog for more details here.

Remove the red part of the configuration file from the figure above by%%, and finally, change the comma to the following:

# 7. Start plug-in management in rabbitmq by executing the following command
	rabbitmq-plugins enable rabbitmq_management
	
	The following description appears:
		Enabling plugins on node rabbit@localhost:
    rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@localhost...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch

    set 3 plugins.
    Offline change; changes will take effect at broker restart.

# 8. Start the RabbitMQ service
	systemctl start rabbitmq-server
	systemctl restart rabbitmq-server
	systemctl stop rabbitmq-server
	

# 9. View the status of the service (see figure below:)
	systemctl status rabbitmq-server
  ● rabbitmq-server.service - RabbitMQ broker
     Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
     Active: active (running) since 3 2019-09-25 22:26:35 CST; 7s ago
   Main PID: 2904 (beam.smp)
     Status: "Initialized"
     CGroup: /system.slice/rabbitmq-server.service
             ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
             MBlmbcs...
             ├─3220 erl_child_setup 32768
             ├─3243 inet_gethost 4
             └─3244 inet_gethost 4
      .........

Half the success here, and you can get started with rabbitMq

# 10. Turn off firewall services
	systemctl disable firewalld
    Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
    Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
	systemctl stop firewalld   

# 11. Access the web management interface
	http://10.15.0.8:15672/

Here, if you use Ali Cloud server friends, you must remember to open the port in the security group, and the open is 15672! Otherwise, you will not be able to access the web management interface

# 12. Login management interface
	username:  guest
	password:  guest

3. RabiitMQ configuration

3.1 RabbitMQ Management Command Line

# 1. Service startup related
	systemctl start|restart|stop|status rabbitmq-server

# 2. The administration command line is used to command RabbitMQ operations without using the web administration interface
	rabbitmqctl  help  You can view more commands

# 3. Plugin Management Command Line
	rabbitmq-plugins enable|list|disable

3.2 Introduction to web management interface

3.2.1 overview overview

  • connections: Both producers and consumers need to be connected to RabbitMQ to complete production and consumption of messages, where you can view the connection

  • channels: channels, which form a channel when a connection is established. Message delivery depends on the channel.

  • Exchanges: A switch used to route messages

  • Queues: Queues, that is, message queues, where messages are stored, waiting for consumption, and then being removed after consumption.

3.2.2 Admin User and Virtual Host Management

1. Add users

The Tags option above actually specifies the user's role. There are several options:

  • Super Administrator
    You can log on to the administration console, view all information, and operate on users, policies.

  • Monitor
    You can log in to the administration console and view information about the rabbitmq node (number of processes, memory usage, disk usage, etc.)

  • Policcymaker
    You can log on to the Administration Console and manage the policy at the same time. However, you cannot view the information about the node (the part identified by the red box above).

  • General Manager
    You can only log on to the management console, you cannot see node information, and you cannot manage policies.

  • Other
    Unable to log on to the management console, usually the average producer and consumer.

2. Create a virtual host
# Virtual Host
	To allow individual users to work without interruption, RabbitMQ A virtual host was added ( Virtual Hosts)Concept. In fact, it is an independent access path, different users use different paths, each has its own queue, switch, and will not affect each other.

3. Bind virtual hosts and users

Create a virtual host and add access to the user:

Click on the added virtual host:

Enter the virtual machine setup interface:

4. First program of RabbitMQ

Review of 4.0 AMQP Protocol

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-mIDskLzU-1628485076031). ( https://cdn.nlark.com/yuque/0/2021/png/21630572/1628484465042-4fb7b33e-a848-4265-8118-f1eccce24282.png )]

4.1 RabbitMQ Supported Message Model

These models, commonly used in the first five, can be found on rabbitMq website.

4.2 Introducing Dependency

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

4.3 First Model (Direct Connection)

In the model illustrated above, there are the following concepts:

  • P: The producer, the program that sends the message

  • C: Consumer: The recipient of the message will always wait for the message to arrive.

  • Queue: Message queue, the red part of the diagram. Like a mailbox, you can cache messages; The producer delivers the message to it and the consumer pulls it out.

1. Developer
  //Create Connection Factory
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  //Create Channel
  Channel channel = connection.createChannel();
  //Parameter 1: Whether to persist parameter 2: Whether to exclusive queue parameter 3: Whether to automatically delete parameter 4: Other properties
  channel.queueDeclare("hello",true,false,false,null);
  channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());
  channel.close();
  connection.close();

2. Develop consumers
  //Create Connection Factory
  ConnectionFactory connectionFactory = new ConnectionFactory();
  connectionFactory.setHost("10.15.0.9");
  connectionFactory.setPort(5672);
  connectionFactory.setUsername("ems");
  connectionFactory.setPassword("123");
  connectionFactory.setVirtualHost("/ems");
  Connection connection = connectionFactory.newConnection();
  Channel channel = connection.createChannel();
  channel.queueDeclare("hello", true, false, false, null);
  channel.basicConsume("hello",true,new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      System.out.println(new String(body));
    }
  });

3. Description of parameters
  channel.queueDeclare("hello",true,false,false,null);
	'Parameter 1':Queue used to declare channel correspondence
  'Parameter 2':Used to specify whether to persist queues
  'Parameter 3':Used to specify whether the queue is exclusive
  'Parameter 4':Used to specify whether queues are automatically deleted
  'Parameter 5':Additional configuration of queues

4.4 Second model (work quene)

Work queues, Also known as (Task queues), task model. When message processing is time consuming, messages may be produced much faster than they are consumed. In the long run, more and more messages will accumulate and cannot be processed in time. work model can be used: multiple consumers are bound to a queue and messages in the queue are consumed together once consumed. The task will not be repeated.

Roles:

  • P: Producer: Publisher of Task

  • C1: Consumer-1, pick up tasks and complete them, assuming they are done slowly

  • C2: Consumer-2: Get tasks and complete them, assuming fast completion

1. Developer
channel.queueDeclare("hello", true, false, false, null);
for (int i = 0; i < 10; i++) {
  channel.basicPublish("", "hello", null, (i+"====>:I'm a message").getBytes());
}

2. Develop Consumers-1
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1: "+new String(body));
  }
});

3. Develop Consumers-2
channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    try {
      Thread.sleep(1000);   //Processing a message is one second slower than processing a message
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Consumer 2: "+new String(body));  
  }
});

4. Test results

summary:By default, RabbitMQ Each message will be sent sequentially to the next consumer. On average, every consumer receives the same number of messages. This way of distributing messages is called loops.

5. Automatic Message Confirmation Mechanism

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

channel.basicQos(1);//Accept only one unacknowledged message at a time
//Parameter 2: Turn off automatic confirmation messages
channel.basicConsume("hello",false,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1: "+new String(body));
    channel.basicAck(envelope.getDeliveryTag(),false);//Manual confirmation message
  }
});

  • Setting channels can consume only one message at a time
  • Turn off automatic confirmation of messages and turn on manual confirmation messages
    ng)

4.5 The third model (fanout)

fanout Fan out is also known as broadcasting

In broadcast mode, the message sending process is as follows:

  • There can be multiple consumers

  • Each consumer has its own queue

  • Each queue is bound to Exchange

  • Messages sent by the producer can only be sent to the switch, which decides which queue to send. The producer cannot decide which queue to send.

  • Switch sends messages to all bound queues

  • Queued consumers get the message. Implement a message consumed by multiple consumers

1. Developer
//Claim Switch
channel.exchangeDeclare("logs","fanout");//Broadcast a message Multiple consumers consume at the same time
//Publish a message
channel.basicPublish("logs","",null,"hello".getBytes());

2. Develop Consumers-1
//Bind switch
channel.exchangeDeclare("logs","fanout");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind temporary queue to exchange
channel.queueBind(queue,"logs","");
//Processing messages
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1: "+new String(body));
  }
});

3. Develop Consumers-2
//Bind switch
channel.exchangeDeclare("logs","fanout");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind temporary queue to exchange
channel.queueBind(queue,"logs","");
//Processing messages
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 2: "+new String(body));
  }
});

4. Develop Consumers-3
//Bind switch
channel.exchangeDeclare("logs","fanout");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind temporary queue to exchange
channel.queueBind(queue,"logs","");
//Processing messages
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 3: "+new String(body));
  }
});

5. Test results

4.6 Fourth model (Routing)

4.6.1 Routing Subscription Model-Direct

stay Fanout In mode, a message is consumed by all subscribed queues. However, in some scenarios, we want different messages to be consumed by different queues. This is the time to use Direct Type Exchange. 

Under the Direct model:

  • The binding of a queue to a switch cannot be arbitrary, but specifies a Routing key.

  • When sending a message to Exchange, the sender of the message must also specify the RoutingKey of the message.

  • Exchange no longer delivers messages to each bound queue, but instead judges from the message's Routing key, which receives messages only if the queue's Routing key is identical to the message's Routing key

Technological process:

Illustration:

  • P: Producer, sends a message to Exchange, and when sending a message, specifies a routing key.

  • X:Exchange, receives the producer's message, and delivers it to a queue that matches the routing key exactly

  • C1: Consumer whose queue specifies a message requiring routing key as error

  • C2: Consumer whose queue specifies messages requiring routing key for info, error, warning

1. Developer
//Declare switch parameter 1: switch name parameter 2: switch type Instruction-based outing key forwarding
channel.exchangeDeclare("logs_direct","direct");
String key = "";
//Publish a message
channel.basicPublish("logs_direct",key,null,("Designated route key"+key+"Message").getBytes());

2. Develop Consumers-1
 //Claim Switch
channel.exchangeDeclare("logs_direct","direct");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind queues and switches
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","warn");

//Consumer News
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1: "+new String(body));
  }
});

3. Develop Consumers-2
//Claim Switch
channel.exchangeDeclare("logs_direct","direct");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind queues and switches
channel.queueBind(queue,"logs_direct","error");
//Consumer News
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 2: "+new String(body));
  }
});

4. When the test producer sends a message that Route key is error

5. When the test producer sends a message that Route key is info

4.6.2 Routing Subscription Model-Topic

Topic`Type`Exchange`and`Direct`By contrast, you can`RoutingKey`Route messages to different queues. Just`Topic`type`Exchange`Queues can be bound`Routing key` When using wildcards! This model`Routingkey` Generally, they consist of one or more words, with "."Split, for example: `item.insert

# wild card
		* (star) can substitute for exactly one word.    Match no more than exactly one word
		# (hash) can substitute for zero or more words. Match one or more words
# For example:
		audit.#    Match audit.irs.corporate or audit.irs, etc.
    audit.*   Only matches audit.irs

1. Developer
//Life switch and switch type topic use dynamic routing (wildcard mode)
channel.exchangeDeclare("topics","topic");
String routekey = "user.save";//Dynamic routing key
//Publish a message
channel.basicPublish("topics",routekey,null,("This is a dynamic subscription model in routing,route key: ["+routekey+"]").getBytes());

2. Develop Consumers-1
Routing Key Use*wildcard

 //Claim Switch
channel.exchangeDeclare("topics","topic");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind queues to switches and set get dynamic routes in switches
channel.queueBind(queue,"topics","user.*");

//Consumer News
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 1: "+new String(body));
  }
});

3. Develop Consumers-2
Routing Key Use#wildcard

//Claim Switch
channel.exchangeDeclare("topics","topic");
//Create temporary queue
String queue = channel.queueDeclare().getQueue();
//Bind queues to switches and set get dynamic routes in switches
channel.queueBind(queue,"topics","user.#");

//Consumer News
channel.basicConsume(queue,true,new DefaultConsumer(channel){
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("Consumer 2: "+new String(body));
  }
});

4. Test results

5. Use RabbitMQ in SpringBoot

5.0 Build Initial Environment

1. Introducing dependencies
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Configuration Profile
spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 10.15.0.9
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

RabbitTemplate is used to simplify operation usage by injecting directly into the project

5.1 The first hello world model uses

Developer Producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testHello(){
  rabbitTemplate.convertAndSend("hello","hello world");
}

Develop consumers
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {

    @RabbitHandler
    public void receive1(String message){
        System.out.println("message = " + message);
    }
}

5.2 Use of the second work model

Developer Producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testWork(){
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("work","hello work!");
  }
}

Develop consumers
@Component
public class WorkCustomer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("work message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("work message2 = " + message);
    }
}
Explain:Default at Spring AMQP In Implementation Work This is fair scheduling,Additional configuration is required if more energy is needed

5.3 Fanout Broadcast Model

Developer Producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testFanout() throws InterruptedException {
  rabbitTemplate.convertAndSend("logs","","This is a log broadcast");
}

Develop consumers
@Component
public class FanoutCustomer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name="logs",type = "fanout")
    ))
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, //Create temporary queue
            exchange = @Exchange(name="logs",type = "fanout")  //Binding switch type
    ))
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

5.4 Route Routing Model

Developer Producer
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testDirect(){
  rabbitTemplate.convertAndSend("directs","error","error Log information");
}

Develop consumers
@Component
public class DirectCustomer {

    @RabbitListener(bindings ={
            @QueueBinding(
                    value = @Queue(),
                    key={"info","error"},
                    exchange = @Exchange(type = "direct",name="directs")
            )})
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings ={
            @QueueBinding(
                    value = @Queue(),
                    key={"error"},
                    exchange = @Exchange(type = "direct",name="directs")
            )})
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

5.5 Topic Subscription Model (Dynamic Routing Model)

Developer Producer
@Autowired
private RabbitTemplate rabbitTemplate;

//topic
@Test
public void testTopic(){
  rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll Message");
}

Develop consumers
@Component
public class TopCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.*"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.#"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

6. MQ application scenarios

6.1 Asynchronous Processing

Scene description: After user registration, they need to send registration email and registration SMS,There are two traditional practices 1.Serial Mode 2.Parallel approach

  • Serial mode: After writing the registration information to the database, send the registration email, then send the registration SMS, and return to the client after all three tasks are completed. The problem is that mail, text messages are not required, they are just notifications, and this allows clients to wait for something they don't have to wait for.

  • Parallel mode: Write registration information into the database, send mail while sending text messages, and return to the client after the above three tasks are completed. Parallel mode can improve processing time.

  • Message Queuing: Assume three business nodes use 50ms, 150 ms in serial mode, and 100ms in parallel. Although concurrency has improved processing time, as mentioned earlier, e-mail and text messages have no impact on my normal use of the site. Clients do not have to wait for their sending to complete before they show that registration is successful. They should return after writing to the database. Message Queuing: When message queuing is introduced, messages are sent and messages are not processed asynchronously by business logic as required.

It can be seen that when message queuing is introduced, the response time of the user is equal to the time written to the database + the time written to the message queue (negligible). After message queuing post-processing is introduced, the response time is three times that of the serial and twice that of the parallel.

6.2 Applying decoupling

Scene: Double 11 is a shopping spree,After user orders,Order system needs to notify inventory system,The traditional practice is that the order system calls the interface of the inventory system.

This approach has one drawback:

When the inventory system fails, the order fails. Order system and inventory system are highly coupled. Introduce message queue

  • Order system: After the user places the order, the order system completes the persistence process, writes the message to the message queue, and returns the success of the user's order.
  • Inventory system: Subscribe to the order message, get the order message, and do library operation. Message queuing guarantees reliable delivery of messages even if the inventory system fails without causing message loss.

6.3 Flow Peak Clipping

Scenario: Secondary killing usually causes applications to hang due to excessive traffic. To solve this problem, message queues are usually joined at the front of applications.

Effect:

1. You can control the number of people active, and orders that exceed this threshold are discarded directly (why can't I kill once in a second?^)

2. Can alleviate short periods of high traffic crashing applications (applications get orders at their maximum processing power)

1. When a user's request is received by the server, the server writes the message queue first. If the length of the queue exceeds the maximum, the user's request is discarded directly or the server jumps to the error page.

2.The secondkill service performs subsequent processing based on the requested information in the message queue.

7. RabbitMQ Cluster

7.1 Cluster Architecture

7.1. 1 Ordinary Cluster (Replica Cluster)

All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster -- from the official website

By default:RabbitMQ All data required for agent operation/The state will be replicated across all nodes. An exception to this is Message Queuing, which by default is on one node, although they can be seen and accessed from all nodes

Architecture diagram

Core problem solving: Quene information can be backed up when the master node is down at some point in the cluster

Cluster building
# 0. Cluster Planning
	node1: 10.15.0.3  mq1  master Primary Node
	node2: 10.15.0.4  mq2  repl1  Replica Node
	node3: 10.15.0.5  mq3  repl2  Replica Node

# 1. Clone three machine hostnames and ip mappings
	vim /etc/hosts join:
		 10.15.0.3 mq1
    	10.15.0.4 mq2
    	10.15.0.5 mq3
	node1: vim /etc/hostname join:  mq1
	node2: vim /etc/hostname join:  mq2
	node3: vim /etc/hostname join:  mq3

# 2.Three machines install rabbitmq and synchronize cookie files to execute on node1:
	scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
	scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/

# 3. Check if the cookie s are consistent:
	node1: cat /var/lib/rabbitmq/.erlang.cookie 
	node2: cat /var/lib/rabbitmq/.erlang.cookie 
	node3: cat /var/lib/rabbitmq/.erlang.cookie 

# 4. Background boot rabbitmq all nodes execute the following command to launch the successful access management interface:
	rabbitmq-server -detached 

# 5. Execute the Join Cluster command on Noe2 and Noe3:
	1.Close       rabbitmqctl stop_app
	2.Join Cluster    rabbitmqctl join_cluster rabbit@mq1
	3.Start Services    rabbitmqctl start_app

# 6. View the state of the cluster, and any node executes:
	rabbitmqctl cluster_status

# 7. If the following shows, the cluster is successfully built:
	Cluster status of node rabbit@mq3 ...
	[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
	{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
	{cluster_name,<<"rabbit@mq1">>},
	{partitions,[]},
	{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]

# 8. Login management interface, showing the following status:

# 9. Test cluster on node1, create queue

# 10. View node2 and node3 nodes:

# 11. Close the node1 node and execute the following command to view node2 and node3:
	rabbitmqctl stop_app

7.1. 2 Mirror Cluster

This guide covers mirroring (queue contents replication) of classic queues -- from the official website

By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. -- From the official website

Mirroring queue mechanism is to set up a master-slave relationship between three nodes. Messages are automatically synchronized among the three nodes. If one of the nodes is unavailable, it will not cause message loss or service unavailability, and promote MQ Overall high availability of clusters.
Cluster architecture
2. Configure Cluster Architecture
# 0. Policy Description
	rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>
	-p Vhost:  Optional parameters, specified for vhost Lower queue Set up
	Name:     policy Name of
	Pattern: queue Matching pattern of(regular expression)
	Definition: Mirror Definition, which consists of three parts ha-mode, ha-params, ha-sync-mode
           		ha-mode:Indicates the mode of the mirror queue, valid values are all/exactly/nodes
                        all: Represents mirroring on all nodes in a cluster
                        exactly: Represents mirroring on a specified number of nodes by ha-params Appoint
                        nodes: Represents mirroring on a specified node with the node name passed through ha-params Appoint
            	 ha-params: ha-mode Parameters required by the pattern
                ha-sync-mode: Synchronize messages in the queue with a valid value of automatic and manual
                priority: Optional parameters, policy Priority
                
                 
# 1. View the current policy
	rabbitmqctl list_policies

# 2. Add Policy
	rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}' 
	Explain:The policy regular expression is "^" Represents all matching queue names  ^hello:matching hello Beginning Queue

# 3. Deletion Policy
	rabbitmqctl clear_policy ha-all

# 4. Test Cluster

Keywords: Java RabbitMQ Back-end

Added by drath on Mon, 27 Dec 2021 15:11:58 +0200