RabbitMQ basic usage

Message queue

Definition of Message queue

The most common way of communication between services is to directly call each other to communicate. Messages can reach the other end immediately after they are sent from one end, which is called instant messaging (synchronous communication)
After a message is sent from one end, it first enters a container for temporary storage. When certain conditions are met, it is sent by this container to the other end, which is called delayed message communication (asynchronous communication)

Problem thinking
Suppose that after we place an order on Taobao, the background of Taobao needs to do these things:
1. Message notification system: inform the merchant that you have a new order, please deliver it in time
2. Recommendation system: update the user's portrait and re recommend the products that the user may be interested in
3. Membership system: update users' points and grade information

createOrder(...) {
   // Complete order service
   doCreateOrder(...); 
   // Call other service interfaces
   sendMsg(...);
   updateUserInterestedGoods(...); 
   updateMemberCreditInfo(...); 
}

Existing problems
Over coupling: if you need to trigger a new action when creating an order later, you have to change the code and add another line of code at the end of the original order creation function
Lack of buffer: if the member system happens to be in a very busy or down state when creating an order, updating the member information will fail at this time. We need a place to temporarily store messages that cannot be consumed
Optimization scheme
We need a message middleware to realize the functions of decoupling and buffering

case analysis
Xiaohong hopes that Xiaoming will read more books and often look for good books for Xiaoming. The previous way is this: Xiaohong asks Xiaoming when he is free, sends the books to Xiaoming, and supervises Xiaoming to leave after reading the books with her own eyes Over time, both felt in trouble
Later, the method was changed to: Xiao Hong said to Xiao Ming, "you should read all the books I put on the shelf". Then Xiao Hong put them on the shelf every time she found good books, and Xiao Ming took them down to read when he saw there were books on the shelf

The bookshelf is a message queue. Xiaohong is a producer and Xiaoming is a consumer


Benefits
1. When Xiaohong wants to give Xiaoming a book, she doesn't have to ask when Xiaoming is free. She handed it to him by hand. Xiaohong just put the book on the shelf In this way, Xiao Hong and Xiao Ming have more free time
2. Xiao Hong believes in Xiao Ming's self-consciousness and ability to read. She doesn't have to observe Xiao Ming's reading process with her own eyes. Xiao Hong only needs to do an action of putting books, which saves time
3. When another reading partner Xiaoqiang joins in tomorrow, Xiaohong still only needs to put the books on the bookshelf, and Xiaoming and Xiaoqiang can take the books from the bookshelf
4. Put the books on the bookshelf there. If Xiao Ming reads fast, he will finish it early, and if he reads slow, he will finish it later. It doesn't matter. Xiao Ming will have less pressure than the way Xiao Hong handed Xiao Ming the book and supervised Xiao Ming to finish it
Message queue characteristics
1. Decoupling: each member does not have to be affected by other members, and can be more independent. It can be connected only through a simple container
2. Speed up: Xiao Hongxuan only needs to do a Book release action, which saves a lot of time for herself
3. Broadcast: Xiao Hong only needs to work once to make multiple partners have books to read, which greatly saves her time and makes the cost of joining new partners very low
4. Peak shifting and flow control: the frequency of Xiaohong's books is unstable. If she gives five books in a row today and tomorrow, and then gives another one every three months, Xiaoming only needs to take five books from the bookshelf in succession within three months, and the pressure will be less

Email case analysis
There are a large number of users registering your software. Under the condition of high concurrency, some problems begin to appear in the registration request
For example, the mail interface can't bear it, or a large number of calculations when analyzing information make the cpu full, which will lead to the situation that although the user data records are quickly added to the database, they are stuck when sending mail or analyzing information
This leads to a substantial increase in the response time of requests and even timeout, which is a little uneconomical In this case, these operations are generally put into the message queue (producer consumer model). The message queue can process them slowly and complete the registration quickly. Please
It will not affect the user's use of other functions

Message queue related

AMQP
An application layer standard advanced message queuing protocol providing unified messaging service is a general application layer protocol
Asynchronous communication can be realized by both sending and receiving sides following this protocol This protocol specifies the format and working mode of the message

RabbitMQ

RabbitMQ is a Message Queuing service that implements AMQP(Advanced Message Queuing Protocol) advanced message queuing protocol. It uses Erlang language

Server(Broker): a process that receives client connections and implements the message queuing and routing functions of AMQP protocol
Virtual Host: the concept of Virtual Host, which is similar to permission control group. A Virtual Host can have multiple exchanges and queues
Exchange: the switch receives the messages sent by the producer and routes the messages to the Queue in the server according to the Routing Key  
ExchangeType: the switch type determines the behavior of routing messages. There are three types of Exchange in RabbitMQ: fanout, direct and topic  
Message Queue: Message Queue, used to store messages that have not been consumed by consumers
Message: it is composed of header and body. Header is a collection of various attributes added by the producer, including whether the message is persisted, what is the priority, which Message Queue receives it, etc Body is the data that really needs to be sent
Rong
BindingKey: binding keyword, which binds a specific Exchange to a specific Queue

Docker installation and deployment RabbitMQ

docker pull rabbitmq:management


Note that when obtaining images, you should obtain the management version instead of the last version. Only the management version has the management interface

docker run -d \ 
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v /home/rabbitmq:/var/lib/rabbitmq \ 
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \ 
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \ 
--restart=always \
rabbitmq:management

-- hostname: hostname (an important note of RabbitMQ is that it stores data according to the so-called "node name", which is the hostname by default)
-e: Specify environment variables:
   RABBITMQ_DEFAULT_VHOST: default virtual machine name
   RABBITMQ_DEFAULT_USER: default user name
   RABBITMQ_DEFAULT_PASS: the password of the default user name
After the container is started, you can view the logs through the docker logs container

docker logs my-rabbitmq

Enter the management background

http://ip:15672

springboot connection configuration


Account Configuration

 

Remember that authorization is required

springboot project construction
rabbitMQ
- consumer
- provider 

Required dependencies

< dependency >
< groupId > org . springframework . boot </ groupId >
< artifactId > spring - boot - starter - amqp </ artifactId >
</ dependency >

Producer yml file configuration

server:
    port: 8080
spring:
    application:
        name: provider
    rabbitmq:
        host: 192.168.211.128
        password: 123456
        port: 5672
        username: springboot
        virtual-host: my_vhost

Consumer yml profile

server:
    port: 8081
spring:
    application:
        name: consumer
    rabbitmq:
        host: 192.168.211.128
        password: 123456
        port: 5672
        username: springboot
        virtual-host: my_vhost

Producer / provider

① . create a new RabbitConfig class

package com.example.provider;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
    @Bean
    public Queue firstQueue() {
        return new Queue("firstQueue");
    }
}

② . create a new Sender class

package com.example.provider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
        public void sendFirst() {
                rabbitTemplate.convertAndSend("firstQueue", "Hello World");
        }

        public void sendFirst(User user) {
                rabbitTemplate.convertAndSend("firstQueue",user );
        }

        public void sendFirst(String json) {
                rabbitTemplate.convertAndSend("firstQueue",json );
        }
}

③ , test class ProviderApplicationTests

package com.example.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest

class ProviderApplicationTests {
    @Autowired
    private Sender sender;

    @Test
    @SneakyThrows
    void contextLoads() {
//        sender.sendFirst();
//        sender.sendFirst(new User("aa","bb"));
        User user = new User("aa", "bb");
        ObjectMapper mapper = new ObjectMapper();
        sender.sendFirst(mapper.writeValueAsString(user));
    }

}

At this point, a queue will be added to RabbitMQ Management

Consumer

Receiver

package com.example.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")

public class Receiver {

    @RabbitHandler
    @SneakyThrows
    public void process(String json) {
        log.warn("Received:" + json);
        ObjectMapper mapper = new ObjectMapper();
        log.warn("Received:" +  mapper.readValue(json,User.class));
    }
}

Custom data sending

(1) Producer / provider

User

package com.example.consumer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private String username;
private String userpwd;
}

Keywords: RabbitMQ

Added by pcwizzzz on Fri, 25 Feb 2022 14:35:06 +0200