RabbitMq theory and application examples

What is rabbit MQ

RabbitMQ is an open source message broker and queue server based on AMQP protocol.

  • Erlang language is used for development as the underlying language: Erlang has the same latency as the native Socket, so the performance is very high
  • Open source, excellent performance and stability guarantee
  • Provide reliability message delivery mode (confirm) and return mode (return)
  • Perfect integration with spring AMQP, rich API
  • The cluster mode is rich, including expression configuration, HA mode and mirror queue model
  • Ensure high reliability and availability on the premise of no data loss

AMOP proper nouns:

  • Server: also known as Broker
  • Connection: connection, the network connection between the application and the broker
  • Channel: network channel, the task of a network session
  • Message: message
  • Virtual host: virtual address, used for logical isolation and message routing at the top layer. A virtual host can have several exchages and queues, but the same virtual host cannot have exchages and queues with the same name
  • Exchange: the switch receives messages and forwards them to the binding queue according to the routing key
  • Binding: the virtual connection between Exchage and Queue. The binding can contain routing key
  • Routing key: a routing rule
  • Queue: a container that holds specific messages

RabbitMQ message flow process:

Application scenario / role

  • Asynchronous buffer
    Some services can be asynchronous. As long as the final consistency is achieved, MQ can be used without strong consistency

  • Service decoupling

    • Strong dependency: service invocation and connection using dubbo or spring cloud are strong dependencies. [for example, registration and discovery need to rely on other services]

    • Weak dependency: MQ Middleware

      • Does not mean that weak dependencies can fail
      • If it can not fail, it is necessary to ensure the reliability of data delivery at the upstream message publishing end

      Scenario example: after the user places an order, the order needs to update the inventory

      Problems under strong dependency:
      1) If the inventory system cannot be accessed, the order inventory reduction fails, resulting in the failure of order generation
      2) Order module and inventory module are strongly coupled
      3) If a thread is enabled for offline operation, it only makes asynchronous access, and the access only improves the speed. Whether the normal call is successful cannot be guaranteed

      Solve the above problems through weak dependency:
      1) The order production successfully writes the message to the message queue (ensure the reliable delivery of the message)
      2) The inventory system obtains the order information through the subscription message, and the inventory system performs inventory operations according to the order information
      3) If the inventory system is abnormal and the inventory consumption message fails, the message will return to the queue and wait for the next transmission

  • Peak cutting and valley filling

    • When our downstream services can't handle it, we can cache these messages in one place and process them step by step
    • It is the process of peak shaving and valley filling to slow down the backlog of business for a short period of time


  1. Reliable delivery at the production end;
    • If the news is about money, it must not be lost
    • To achieve 100% delivery at the production end, you need to ensure atomicity with business data
  2. Idempotent of consumer;
    • If the production side wants to achieve reliable delivery, there may be repeated delivery
    • The consumer has consumed two or more times, and the data may be inconsistent
    • Therefore, the consumer must achieve the same result obtained by consuming the same request many times
  3. MQ itself needs to be considered
    • HA: high availability
    • Low delay
    • Reliability: ensure that the data is complete
    • Stacking capacity: This is the guarantee that MQ can carry your business level
    • Scalability: whether it can naturally support horizontal expansion without perceptual capacity expansion

Principle analysis of RabbitMq cluster architecture

1) Active standby mode
The master slave structure can be understood as hot backup. The master is responsible for reading and writing. After the Master goes down, it switches to the slave

2) Mirror mode
More popular models in the industry;
The classic RabbitMQ cluster is the mirror mode, which ensures 100% data loss;
High availability, low data synchronization latency, odd number of nodes.

The drawback of the image queue cluster is that it cannot scale horizontally well, because each node is a complete node that replicates with each other, and too many image nodes will increase the burden of MQ. A data write will be copied to multiple nodes, and the throughput will be reduced

Installation and use of rabbit MQ

RabbitMq-3.8.19 installation details I have already introduced an article before, and then omit it.

Modify user login and connection heartbeat

  • Set loopback_users.guest = false, remove the previous comments
  • Change {heartbeat, 60} to {heartbeat, 10}

Check whether the MQ port is enabled: yum -y install lsof

  • lsof -i:5672

Launch plug-in:

  • rabbitmq-plugins enable rabbitmq_management

Check whether the management background is started

  • lsof -i:15672

Common commands

# Start service
systemctl start rabbitmq-server
# perhaps
rabbitmq-server -detached
# Open the web management interface plug-in
rabbitmq-plugins enable rabbitmq_management

# Close app
rabbitmqctl stop_app

# Start application
rabbitmqctl start_app

# Node status
rabbitmqctl status

# Add user password
rabbitmqctl add_user username password

# Modify user password
rabbitmqctl change_password username password

# List all users
rabbitmqctl list_users

# delete user
rabbitmqctl delete_user username

# List user permissions
rabbitmqctl list_user_permissions username

# Clear user permissions
rabbitmqctl clear_permissions -p vhostpath username

# Set user permissions
# Three * correspond to: configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"

# List all virtual hosts
rabbitmqctl list_vhosts

# Create virtual host
rabbitmqctl add_vhost vhostpath

# Lists the permissions of the virtual host
rabbitmqctl list_permissions -p vhostpath

# Delete virtual host
rabbitmqctl delete_vhost vhostpath

# View all queues
rabbitmqctl list_queues

# Clear messages in queue
rabbitmqctl -p vhostpath purge_queue queueName

# Clear all data
rabbitmqctl reset # This action is best performed after the MQ service is stopped

Spring boot integration rabbitmq

send out

Build a SpringBoot project and preparations. I wrote it in another blog. I won't repeat it here. SpringBoot global exception handling, integration Swagger and parameter verification , all the preparations we need now are in this article.

Add application yaml

  port: 8088

    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000

Message entity

package cn.com.springboot.vo;

import lombok.Data;

import java.io.Serializable;

public class OrderInfo implements Serializable {

    private String id;

    private String orderName;

    private String messageId;

Send class

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

public class OrderSender {

    private RabbitTemplate rabbitTemplate;

    private static final String ORDER_EXCHANGE = "order_exchange";

    private static final String ORDER_ROUTING_KEY = "order_r_key";

    public void sendOrder(OrderInfo orderInfo){
        //correlationData: Message unique id
        CorrelationData correlationData = new CorrelationData();

        //String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData
        rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData);

rabbitmq preparation

Create exchange

Description of Exchange related properties

  • Name: the name of the exchange

  • Type description

    • direct: exchange will set the routingkey when binding with the queue. Only when the routingkey is exactly the same, exchange will forward the message to the corresponding queue, which is equivalent to point-to-point

    • fanout: directly route messages to all bound queues without matching the routingkey of the message, because the routingkey is not bound, and all messages are forwarded the fastest (broadcast mode)

    • Topic: this type of exchange is similar to direct, but the direct type requires the same routingkey, and topic can use wildcards: '*', '#'

      Where '*' means matching one word and '#' means matching no or more words

    • Header: the routing rule is determined according to the header

    • Summary: generally, direct and topic are used for specific routing information. If broadcast is used, fanout is used, and the header type is less used

  • Durability: Durable means persistent to disk

  • Auto Delete: if set to yes, exchange will automatically delete the last queue bound to exchange after it is deleted

  • Internal: If yes, it indicates that the exchange is used internally by rabbitmq and is not provided to external system applications. It is generally used when writing customized extensions in erlang

  • Arguments this is the customized content used when extending the AMQP protocol

Create Queue

Exchange and Queue are associated through Binding and routed by routingkey

Test send

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import cn.com.springboot.vo.ResultVo;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@Api("send message")
public class RabbitMqController {
    private OrderSender orderSender;

    public ResultVo sender(@RequestBody OrderInfo orderInfo){
        return ResultVo.success();

Test with swagger

The message was successfully sent to the queue

be careful:

  • An exchange can bind multiple queues. As long as the routingkey is the same, a message will be sent to multiple queues
  • exchange is bound to a queue. No matter how many routingkeys are bound, messages that meet the routingkey rule will be sent to the queue. When receiving messages from any routingkey, the consumer connected to the queue will consume them, which is equivalent to a queue corresponding to multiple message rules

messages receiving

Create another project: consumer and producer. The construction steps are basically the same as those above.

  port: 8080

    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000
        concurrency: 5              # Initialization concurrency
        max-concurrency: 10         # Maximum concurrent number
        auto-startup: true          # Automatically turn on monitoring
        prefetch: 1                 # Each connection can process a maximum of several messages at the same time, and the current limit setting
        acknowledge-mode: manual    # The sign in mode is manual sign in

Add consumer class

package cn.com.springboot.web;

import cn.com.springboot.vo.OrderInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

public class OrderReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order_queue", durable = "true"),
            exchange = @Exchange(value = "order_exchange", type = "topic"),//durable defaults to true
            key = "order_r_key"//My routingKey is order_r_key
    public void receiveOrderInfo(@Payload OrderInfo orderInfo,
                                 @Headers Map<String, Object> headers,
                                 Channel channel) throws IOException {
        log.info("Start consumption");
        log.info("orderName: {}, messageId: {}", orderInfo.getOrderName(), orderInfo.getMessageId());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);

        channel.basicAck(deliveryTag, false);

Start test

Share here for the time being, welcome to correct!

Keywords: RabbitMQ queue

Added by james2010 on Fri, 14 Jan 2022 16:02:06 +0200