Analysis of RabbitMQ learning notes and AMQP protocol

RabbitMQ

Related concepts of MQ

What is MQ

MQ(message queue), in its literal sense, is essentially a queue. FIFO is first in first out, but the content stored in the queue is message. It is also a cross process communication mechanism for upstream and downstream message delivery. In the Internet architecture, MQ is a very common upstream and downstream "logical decoupling + physical decoupling" message communication service. After using MQ, the upstream of message sending only needs to rely on MQ without relying on other services.

Features of MQ

  • Flow peak elimination (using the nature of the queue)
  • Application decoupling (nature of using middleware)
  • Asynchronous processing (using the nature of the message)

RabbitMQ: RabbitMQ is a message middleware: it accepts and forwards messages. You can regard it as an express site. When you want to send a package, you put your package in the express station, and the courier will eventually send your express to the recipient. According to this logic, RabbitMQ has an express station and a courier to deliver the express for you. The main difference between RabbitMQ and express station is that it does not process express mail, but receives, stores and forwards message data.

Four core concepts:

  • Producer: the program that generates data and sends messages is the producer
  • Switch is a very important part of RabbitMQ, - on the one hand, it receives messages from producers, on the other hand, it pushes messages to queues. The switch must know exactly how to handle the messages it receives, whether to push these messages to a specific queue or multiple queues, or discard the messages, which depends on the switch type
  • Consumer: consumption and reception have similar meanings. Most of the time, consumers are a program waiting to receive messages. Please pay attention to producers and consumers
    Most of the time, the and message oriented middleware are not on the same machine. The same application can be both a producer and a consumer.
  • Queue: queue is a data structure used internally by RabbitMQ. Although messages flow through RabbitMQ and applications, they can only be stored in the queue. The queue is only constrained by the memory and disk limitations of the host. It is essentially a large message buffer. Many producers can send messages to a queue, and many consumers can try to receive data from a queue. This is how we use queues.

Internal structure:

Message queuing protocol

What protocols do message middleware use: Openwire, AMQP, MQTT, Kafka, and OpenMessage protocols.

Difference between message oriented middleware protocol and http protocol

  • Message oriented middleware protocol is relatively simple and efficient
  • http is generally a short connection. The message middleware protocol is a long-term behavior of obtaining messages. If there is a problem, it is necessary to persist the data to ensure high availability.

Message persistence

Save the data to disk instead of memory and disappear when the server is restarted and disconnected, so that the data can be saved permanently.

Message distribution policy

Publish and subscribe, polling distribution, fair distribution, retransmission, message pull

RabbitMQ core

  • Hello World: simple event. Single producer, single consumer, single queue (simple mode)
  • Work Queue: single queue distribution task. Single producer, multi consumer, single queue (working mode)
  • Publish Subscribe: multi queue distribution task. Single producer, single switch, multi queue, multi consumer (publish / subscribe mode)
  • Routing: routing mode.
  • Topics: topic mode.

docker installing RabbitMQ

# Download Image
docker pull rabbitmq

Download from the official website: rabbitmq official website

rabbitmq three ports

4369/tcp, 5671-5672/tcp, 15691-15692/tcp, 25672-15672/tcp
  • 5672: client end communication port

  • 15672: management interface ui port

  • 25672: internal communication port between server s

Add user

# Create user
rabbitmqctl add_user username password
# Set user roles
rabbitmqctl set_user_tags username administrator
# Set user permissions
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
# List users
rabbitmqctl list_users

AMQP protocol

AMQP (Advanced Message Queuing Protocol) is a network protocol for asynchronous message transmission between processes.

This middleware server is a data server. It can receive messages, send them (data) to different consumers according to the specified route according to any standard, and store them (data) in memory or disk when the consumers cannot receive messages quickly.

AMQP mainly does three things

  • The ability to create any type of switch and message queue
  • The ability to connect switches and message queues to create an ideal message processing system
  • Ability to control completely by agreement

AMQP protocol:

AMQP permits peers to create multiple independent threads of control. Each channel acts as a virtual connection that share a single socket.

AMQP allows the program to create multiple independent control threads. Each signal shares a socket port for virtual connection. The minimum number of channels is 1. Try to ensure the flow balance of each channel. It should not allow a very busy channel and starve to death (literally) a very poor channel.

A message may exist in many message queues, which can be counted by reference or copied.

Several modes of RabbitMQ

Create a message queue

/**
* queue Message queue name
* durable When true, if the server restarts, the message queue will remain unchanged, and data that is not persisted may be lost
* exclusive Is it for a consumer
* autoDelete Whether to delete the message queue automatically after all client s use it
* arguments Other parameters
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

Create a switch

/**
* exchange Switch name
* type Switch mode
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

Queue and switch binding

/**
* queue Message queue name
* exchange Switch name
* routingKey Routing key
*/
Queue.BindOk queueBind(String queue, String exchange Switch name, String routingKey) throws IOException;

Release news

/**
* exchange Switch name
* routingKey Routing key
* props Other properties
* body send content
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

receive messages

/**
* queue Message queue name
* autoAck Whether to respond automatically after successful consumption
* deliverCallback Callback of unsuccessful consumption by consumers
* cancelCallback Callback after consumption
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

simple mode

When the switch is not specified, there will generally be a default switch. The default switch mode is the direct routing mode.

Features: producer message queue consumer

Publish / subscribe to fanout mode

Features: broadcast mechanism, no routing key mode

Routing direct mode

Features: routing key matching mode

topic mode

Features: fuzzy routing key matching pattern

work mode

Features: distribution mechanism

When there are multiple consumers, which consumer will consume our news, and how can we balance the amount of consumer consumption information?

There are two main modes:

  • Distribution in polling mode: one consumer - bar, distributed equally;
  • Fair distribution: fair distribution shall be carried out according to the consumption capacity of consumers, with more fast processing and less slow processing, and distribution according to work;

Parameter header mode

Features: parameter matching mode

Usage scenarios of MQ

  • Decoupling, peak shaving, asynchronous

    • Synchronous and asynchronous problems (serial): the serial execution time is long
    • Parallel mode, asynchronous thread pool: you need to maintain your own thread pool. You need to implement persistence and high availability. The most important thing is to couple them in the application
    • Asynchronous message queuing:
  • High cohesion and low coupling

  • Peak clipping of flow

  • Reliable messaging and reliable production of distributed transactions

  • Data synchronization of index, cache and static processing

  • Flow monitoring

  • Log monitoring (ELK)

  • Order placing, order distribution and ticket grabbing

SpringBoot integrates RabbitMQ

Take fanout subscription / publish mode as an example

Add dependent package

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Producer class

// server.serverOrderService
package com.liuhao.springproducer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Service
public class serviceOrderService {

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void makeOrder(String userId, String productId, Integer num) {
        String s = UUID.randomUUID().toString();
        String exchangeName = "fanout_order";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, s);
    }
}

Configuration class

package com.liuhao.springproducer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // Create switch
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order", true, false);
    }

    // Create message queue
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Queue wxQueue() {
        return new Queue("wx.fanout.queue", true);
    }
	
    // Binding switches and message queues
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding wxBinding() {
        return BindingBuilder.bind(wxQueue()).to(fanoutExchange());
    }
}

consumer

package com.liuhao.demo1.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailService {

    @RabbitHandler
    public void receive(String msg) {
        System.out.println(msg);
    }
}

TTL expiration time

The expiration time TTL indicates that the expected time can be set for the message, and within this time, it can be received and obtained by the consumer; The message will be deleted automatically

RabbitMQ can set TTL for message queues. At present, there are two methods to set.

  • The first method is to set the queue attribute, and all messages in the queue have the same expiration time.
  • The second method is to set the message separately, and the TTL of each message can be different.

If the above two methods are used at the same time, the expiration time of the message shall be subject to the value with the smaller TTL between them. Once the lifetime of the message in the queue exceeds the set TTL value, it is called dead message. It is delivered to the dead letter queue, and the consumer will no longer receive the message.

Setting up queues

@Bean
public Queue wxQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 5000);  // The message lifetime of this queue is 5000 seconds
    return new Queue("wx.fanout.queue", true, false, false, args);
}

The default setting of message queue is that the parameter key value of TTL is x-message-ttl, which can be recognized automatically.

Setting messages

public void makeOrder(String userId, String productId, Integer num) {
    String s = UUID.randomUUID().toString();
    String exchangeName = "fanout_order";
    String routingKey = "";
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("5000");  // Set expiration time
            message.getMessageProperties().setContentEncoding("utf-8");
            return message;
        }
    };

    rabbitTemplate.convertAndSend(exchangeName, routingKey, s, messagePostProcessor);
}

convertAndSend is an overloaded method here

public void convertAndSend(String exchange, String routingKey, 
                           Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
    this.convertAndSend(exchange, routingKey, message, messagePostProcessor, (CorrelationData)null);
}

Dead letter queue

DLX, fully known as dead letter exchange, can be called dead letter switch or dead letter mailbox. When a message becomes dead message in a queue, it can be re sent to another switch, which is DLX. The queue bound to DLX is called dead message queue

The message may become a dead letter due to the following reasons:

  • Message rejected
  • Message expiration
  • The queue has reached its maximum length

DLX is also a normal switch. It is no different from a general switch. It can be specified on any queue. In fact, it is to set the genus of a queue
Sex. When there is a dead letter in the queue, Rabbitmq will automatically republish the message to the set DLX and then route it to another queue
Column, i.e. dead letter queue.

To use dead letter queue, you only need to set the queue parameter x-dead-letter-exchange to specify the switch when defining the queue.

@Bean
public Queue wxQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 5000);  // The message lifetime of this queue is 5000 seconds
    args.put("x-dead-letter-exchange", "DLX");
    args.put("x-dead-letter-routing-key", "DLX_routing_key");
    return new Queue("wx.fanout.queue", true, false, false, args);
}

fanout does not need to be configured.

Memory disk monitoring

Memory warning

When the memory usage exceeds the configured threshold or the remaining disk space exceeds the configured threshold, RabbitMQ will temporarily block the connection of the client and stop
Receive the message from the client to avoid the crash of the server, and the mentality detection mechanism of the client and server will also fail.

Setting method: Command

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

fraction/value is the memory threshold. The default is 0.4/2GB, which means that when the memory of RabbitMQ exceeds 40%, a warning will be generated
And block the connection of all producers.

The threshold value modified by this command will be invalid after the broker is restarted. The threshold value set by modifying the configuration file will not disappear with the restart, but it will take effect only after the broker is restarted as the configuration file is modified.

Setting method: configuration file

# Relative value
vm_memory_high_watermark.relative = 0.6
# absolute value
vm_memory_high_watermark.absolute = 50MB

Memory page feed

Before a Broker node and memory blocking producer, Bai will try to page change the messages in the queue to disk to free up memory space. Both persistent and non persistent messages will be written to disk. The persistent message itself has a copy in the disk, so the persistent messages will be cleared from memory during the transfer process.

By default, when the memory reaches the threshold of 50%, page feed will be processed.

In other words, when the memory threshold is 0.4 by default, when the memory exceeds 0.4 * 0.5 = 0.2, page feed will be performed.

# Generally less than 1
vm_memory_high_watermark_paging_ratio = 0.7  

Disk alert

When the remaining disk space is lower than the determined threshold, RabbitMQ will also block the producer, which can avoid the server crash caused by running out of disk space due to the continuous page change of non persistent messages.

By default: when the disk alert is 50MB, the alert will be performed. Indicates that the current disk space of 50MB will block the producer and stop the process of page feed of memory messages to the disk.

This threshold can be reduced, but it can not completely eliminate the possibility of crash caused by disk exhaustion. For example, in the gap between two disk space checks, the first
The first check is 60MB, the second check may be 1MB, and a warning will appear.

Distributed transaction

Concept of distributed transaction: distributed transaction refers to that the operation of transaction is located on different nodes, and the AICD characteristics of transaction need to be guaranteed. For example, in the order placement scenario, if the inventory and order are not on the same node, distributed transactions are involved.

Distributed transaction mode

Two phase commit (2PC), by introducing a coordinator to coordinate the behavior of participants and decide whether these participants really want to execute transactions.

  • Disadvantages: synchronization blocking, single point problem, inconsistent data, no fault tolerance mechanism.

Compensation transaction (TCC) is actually a compensation mechanism adopted by TCC. Its core idea is to register a corresponding confirmation and compensation (revocation) operation for each operation. It is divided into three stages:

  • The Try stage is mainly to detect the business system and reserve resources
  • The Confirm phase is mainly used to Confirm and submit the business system. When the next ry phase is successfully executed and the Confirm phase is started, the default Confirm phase will not make mistakes. Lang, as long as Try is successful, the Confirm will be successful.
  • The Cancel phase is mainly used to Cancel 1 reserved resource release when the business is executed in the state of business execution error and needs to be rolled back.
  • Disadvantages: steps 2 and 3 may fail

Local message table (asynchronous). The local message table and the business data table are in the same database. In this way, local transactions can be used to ensure that the operations on these two tables meet the transaction characteristics, and message queues are used to ensure the final consistency.

  • Disadvantages: the message table is coupled to the business system, and there are many chores.

MQ transaction message, asynchronous scenario, strong universality and high expansibility.

  • In the first stage, the Prepared message will get the address of the message. The second stage executes local transactions, and the third stage accesses the message through the address obtained in the first stage and modifies the status.
  • That is to say, in the business method, you need to submit two requests to the message queue, one sending message and one confirming message. If the confirmation message fails to be sent, RabbitMQ will regularly scan the transaction messages in the message cluster. When the Prepared message is found, it will confirm to the message sender. Therefore, the manufacturer needs to implement a check interface. RabbitMQ will decide whether to roll back or continue to send the confirmation message according to the policy set by the sender. This ensures that the message sending and the local transaction succeed or fail at the same time.

Production reliability

  • In order to ensure that the data must be sent to MQ
  • In the same transaction, add a redundant table (the fields are generally message content and message status) to record the order data and the status of whether each data is successfully sent
  • Then, after the publish/confirm mechanism provided by RabbitMQ is used to start the confirmation mechanism, if the message is sent to MQ normally, the receipt message will be obtained, and then the sending state will be changed to the sent state
  • If

Use the message confirmation mechanism to give the message receipt to the producer to ensure the reliability of the producer.

Open confirmation mechanism: Publisher confirm type: correlated

If the message confirmation mechanism receives a receipt message that has not been transmitted to MQ, it uses a timer to deliver it to MQ every certain time, so as to ensure that the message can be delivered to the message queue and then modify the data.

// The method modified by @ PostConstruct will run when the server loads the servlet and will only be executed by the server once,
// Execute after constructor and before init()
@PostConstruct
public void regCallback() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String orderId = correlationData.getId();
            if (!ack) {
                System.out.println("Message transmission failed");
                return ;
            }
            try {
                // Update database (sql, orderId)
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
}

Consumption reliability

If the message queue has reached the message queue, consumers can directly obtain data from the message queue, but when there is no desired data, there will be a life and death cycle.

Solution

  • Control the number of dead cycles + dead letter queue (retry)
  • try+catch + manual ack (ACK nowledge mode: manual)
  • try+catch + manual ack + dead letter queue processing

Manual ack

/* Manual ack tells mq that the message is consumed normally */
channel.basicAck(tag, false);
/**
* The second false indicates that there is no retransmission when nack occurs
*/
channel.basicNAck(tag, false, false);
@RabbitListener(queues = {"", ""})
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
        throws IOException {
    System.out.println("consumerDoAck: " + data);

    if (data.contains("success")) {
        // In the ack mechanism of RabbitMQ, the second parameter returns true, indicating that this message needs to be delivered to other consumers for re consumption
        channel.basicAck(deliveryTag, false);
    } else {
        // The third parameter, true, indicates that the message will re-enter the queue
        channel.basicNack(deliveryTag, false, true);
    }
}

Keywords: Java RabbitMQ Distribution Middleware

Added by seanlyons on Fri, 04 Mar 2022 02:47:24 +0200