Message queue
Definition of Message queue
The most common way of communication between services is to directly call each other to communicate. Messages can reach the other end immediately after they are sent from one end, which is called instant messaging (synchronous communication)
After a message is sent from one end, it first enters a container for temporary storage. When certain conditions are met, it is sent by this container to the other end, which is called delayed message communication (asynchronous communication)
Problem thinking
Suppose that after we place an order on Taobao, the background of Taobao needs to do these things:
1. Message notification system: inform the merchant that you have a new order, please deliver it in time
2. Recommendation system: update the user's portrait and re recommend the products that the user may be interested in
3. Membership system: update users' points and grade information
createOrder(...) { // Complete order service doCreateOrder(...); // Call other service interfaces sendMsg(...); updateUserInterestedGoods(...); updateMemberCreditInfo(...); }
Existing problems
Over coupling: if you need to trigger a new action when creating an order later, you have to change the code and add another line of code at the end of the original order creation function
Lack of buffer: if the member system happens to be in a very busy or down state when creating an order, updating the member information will fail at this time. We need a place to temporarily store messages that cannot be consumed
Optimization scheme
We need a message middleware to realize the functions of decoupling and buffering
case analysis
Xiaohong hopes that Xiaoming will read more books and often look for good books for Xiaoming. The previous way is this: Xiaohong asks Xiaoming when he is free, sends the books to Xiaoming, and supervises Xiaoming to leave after reading the books with her own eyes Over time, both felt in trouble
Later, the method was changed to: Xiao Hong said to Xiao Ming, "you should read all the books I put on the shelf". Then Xiao Hong put them on the shelf every time she found good books, and Xiao Ming took them down to read when he saw there were books on the shelf
The bookshelf is a message queue. Xiaohong is a producer and Xiaoming is a consumer
Benefits
1. When Xiaohong wants to give Xiaoming a book, she doesn't have to ask when Xiaoming is free. She handed it to him by hand. Xiaohong just put the book on the shelf In this way, Xiao Hong and Xiao Ming have more free time
2. Xiao Hong believes in Xiao Ming's self-consciousness and ability to read. She doesn't have to observe Xiao Ming's reading process with her own eyes. Xiao Hong only needs to do an action of putting books, which saves time
3. When another reading partner Xiaoqiang joins in tomorrow, Xiaohong still only needs to put the books on the bookshelf, and Xiaoming and Xiaoqiang can take the books from the bookshelf
4. Put the books on the bookshelf there. If Xiao Ming reads fast, he will finish it early, and if he reads slow, he will finish it later. It doesn't matter. Xiao Ming will have less pressure than the way Xiao Hong handed Xiao Ming the book and supervised Xiao Ming to finish it
Message queue characteristics
1. Decoupling: each member does not have to be affected by other members, and can be more independent. It can be connected only through a simple container
2. Speed up: Xiao Hongxuan only needs to do a Book release action, which saves a lot of time for herself
3. Broadcast: Xiao Hong only needs to work once to make multiple partners have books to read, which greatly saves her time and makes the cost of joining new partners very low
4. Peak shifting and flow control: the frequency of Xiaohong's books is unstable. If she gives five books in a row today and tomorrow, and then gives another one every three months, Xiaoming only needs to take five books from the bookshelf in succession within three months, and the pressure will be less
Email case analysis
There are a large number of users registering your software. Under the condition of high concurrency, some problems begin to appear in the registration request
For example, the mail interface can't bear it, or a large number of calculations when analyzing information make the cpu full, which will lead to the situation that although the user data records are quickly added to the database, they are stuck when sending mail or analyzing information
This leads to a substantial increase in the response time of requests and even timeout, which is a little uneconomical In this case, these operations are generally put into the message queue (producer consumer model). The message queue can process them slowly and complete the registration quickly. Please
It will not affect the user's use of other functions
Message queue related
AMQP
An application layer standard advanced message queuing protocol providing unified messaging service is a general application layer protocol
Asynchronous communication can be realized by both sending and receiving sides following this protocol This protocol specifies the format and working mode of the message
RabbitMQ is a Message Queuing service that implements AMQP(Advanced Message Queuing Protocol) advanced message queuing protocol. It uses Erlang language
Server(Broker): a process that receives client connections and implements the message queuing and routing functions of AMQP protocol
Virtual Host: the concept of Virtual Host, which is similar to permission control group. A Virtual Host can have multiple exchanges and queues
Exchange: the switch receives the messages sent by the producer and routes the messages to the Queue in the server according to the Routing Key
ExchangeType: the switch type determines the behavior of routing messages. There are three types of Exchange in RabbitMQ: fanout, direct and topic
Message Queue: Message Queue, used to store messages that have not been consumed by consumers
Message: it is composed of header and body. Header is a collection of various attributes added by the producer, including whether the message is persisted, what is the priority, which Message Queue receives it, etc Body is the data that really needs to be sent
Rong
BindingKey: binding keyword, which binds a specific Exchange to a specific Queue
Docker installation and deployment RabbitMQ
docker pull rabbitmq:management
Note that when obtaining images, you should obtain the management version instead of the last version. Only the management version has the management interface
docker run -d \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
-- hostname: hostname (an important note of RabbitMQ is that it stores data according to the so-called "node name", which is the hostname by default)
-e: Specify environment variables:
RABBITMQ_DEFAULT_VHOST: default virtual machine name
RABBITMQ_DEFAULT_USER: default user name
RABBITMQ_DEFAULT_PASS: the password of the default user name
After the container is started, you can view the logs through the docker logs container
docker logs my-rabbitmq
Enter the management background
springboot connection configuration
Account Configuration
Remember that authorization is required
springboot project construction
rabbitMQ
- consumer
- provider
Required dependencies
< dependency >
< groupId > org . springframework . boot </ groupId >
< artifactId > spring - boot - starter - amqp </ artifactId >
</ dependency >
Producer yml file configuration
server: port: 8080 spring: application: name: provider rabbitmq: host: 192.168.211.128 password: 123456 port: 5672 username: springboot virtual-host: my_vhost
Consumer yml profile
server: port: 8081 spring: application: name: consumer rabbitmq: host: 192.168.211.128 password: 123456 port: 5672 username: springboot virtual-host: my_vhost
Producer / provider
① . create a new RabbitConfig class
package com.example.provider; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @SuppressWarnings("all") public class RabbitConfig { @Bean public Queue firstQueue() { return new Queue("firstQueue"); } }
② . create a new Sender class
package com.example.provider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFirst() { rabbitTemplate.convertAndSend("firstQueue", "Hello World"); } public void sendFirst(User user) { rabbitTemplate.convertAndSend("firstQueue",user ); } public void sendFirst(String json) { rabbitTemplate.convertAndSend("firstQueue",json ); } }
③ , test class ProviderApplicationTests
package com.example.provider; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class ProviderApplicationTests { @Autowired private Sender sender; @Test @SneakyThrows void contextLoads() { // sender.sendFirst(); // sender.sendFirst(new User("aa","bb")); User user = new User("aa", "bb"); ObjectMapper mapper = new ObjectMapper(); sender.sendFirst(mapper.writeValueAsString(user)); } }
At this point, a queue will be added to RabbitMQ Management
Consumer
Receiver
package com.example.consumer; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "firstQueue") public class Receiver { @RabbitHandler @SneakyThrows public void process(String json) { log.warn("Received:" + json); ObjectMapper mapper = new ObjectMapper(); log.warn("Received:" + mapper.readValue(json,User.class)); } }
Custom data sending
(1) Producer / provider
User
package com.example.consumer; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; @SuppressWarnings("all") @Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String userpwd; }