Basic learning of RabbitMQ

preface

Basic learning about RabbitMQ

1, What is RabbiMQ?

RabbitMQ is a message oriented middleware: it accepts and forwards messages. You can regard it as an express site. When you want to send a package, you put your package in the express station, and the courier will eventually send your express to the recipient. According to this logic, RabbitMQ is an express station, and a courier will deliver the express for you. The main difference between RabbitMQ and express station is that it does not process express mail, but receives, stores and forwards message data.

2, What problems can it solve?

  • Flow peak elimination
For example, if the order system can process 10000 orders at most, this processing capacity is more than enough to deal with orders in normal periods
 In the normal period, we can return the results one second after placing an order. However, in the peak period, if 20000 orders are placed, the operating system can't handle them and can only limit them
 Users are not allowed to place orders after the number of orders exceeds 10000. Using message queue as buffer, we can cancel this limit and divide the orders placed in one second
 Scattered for a period of time. At this time, some users may not receive the successful operation of placing an order until more than ten seconds after placing an order, but it is more difficult than those who cannot place an order
 Better test
  • . Application decoupling
Taking e-commerce applications as an example, there are order system, inventory system, logistics system and payment system. After the user creates the order, if the coupling
 Call the inventory system, logistics system and payment system. If any subsystem fails, the order operation will be abnormal. When transformed into based
 After using message queuing, the problems of inter system calls will be reduced a lot. For example, the logistics system needs a few minutes to repair due to failure. stay
 In these few minutes, the memory to be processed by the logistics system is cached in the message queue, and the user's order can be completed normally. When logistics
 After the system is restored, you can continue to process the order information. Medium order users can't feel the failure of the logistics system, so as to improve the availability of the system.
  • Asynchronous processing
Some inter service calls are asynchronous, such as A call B,B It takes a long time to execute, but A Need to know B When can it be implemented
 Well, there used to be two ways, A Call after a period of time B Query of api Query. perhaps A Provide a callback api, B End of execution
 Then call api notice A Service. These two methods are not very elegant. Using message bus can easily solve this problem, A call B clothes
 After service, just listen B Processing completed messages, when B When processing is complete, a message is sent to MQ,MQ Will forward this message to A clothes
 Business. such A The service does not need to be called circularly B Query of api,No need to provide callback api. same B The service does not have to do these operations. A clothes
 The service can also get the message of successful asynchronous processing in time.

3, Four cores

Producer, consumer, switch, queue

4, Working principle

Noun introduction

  1. Broker: an application that receives and distributes messages. RabbitMQ Server is the Message Broker
  2. Virtual host: designed for multi tenancy and security factors, the basic components of AMQP are divided into a virtual group, which is similar to the concept of namespace in the network. When multiple different users use the services provided by the same RabbitMQ server, they can be divided into multiple vhosts. Each user creates an exchange / queue in its own vhost
  3. Connection: TCP connection between publisher / consumer and broker
  4. Channel: if a connection is established every time RabbitMQ is accessed, the overhead of establishing TCPConnection when the message volume is large will be huge and the efficiency will be low. A channel is a logical connection established within a connection. If the application supports multithreading, each thread usually creates a separate channel for communication. The AMQP method contains a channel id to help the client and message broker identify the channel, so the channels are completely separated. Channel as a lightweight
  5. Exchange: message arrives at the first stop of the broker, matches the routing key in the query table according to the distribution rules, and distributes the message to the queue. Common types are: direct (point-to-point), topic (publish subscribe) and fan out
    (multicast)
  6. Queue: the message is finally sent here to wait for the consumer to pick it up
  7. Binding: the virtual connection between exchange and queue. The binding can contain routing key s. The binding information is saved in the query table in exchange for the distribution basis of message s

5, Work Queues

polling

When the same group of consumers consume messages in the same queue, the polling method is adopted by default

Unfair distribution

If unfair distribution is required, it needs to be configured on the consumer side: those who can do more work, and the worker threads with good performance seize the message
The consumer side should set auto ack to false for basic QoS to be effective

6, Message response

A consumer may take some time to complete a task if one of the consumers processes a long task and only completes it
What happens when part of it suddenly hangs up. Once RabbitMQ delivers a message to the consumer, it immediately marks the message for deletion. In this case, a consumer suddenly hangs up, and we will lose the message being processed. And the subsequent message sent to the consumer because it cannot be received.
In order to ensure that the message is not lost during sending, rabbitmq introduces a message response mechanism. The message response is: after receiving and processing the message, the consumer tells rabbitmq that it has been processed, and rabbitmq can delete the message.

There are two main schemes for message response:

  1. Automatic response (unsafe, no corresponding processing in case of error) ---- > fast processing efficiency (the problem is: if the connection or channel is closed on the consumer side, the message will be lost, and too many messages will run out of memory)

  2. Manual response (the problem to be solved is that the message will not be lost)

    Channel. Basicack (for positive confirmation)
    Channel. Basicnack (for negative confirmation)
    Channel. Basicreject (for negative acknowledgement) - -- > this message is not processed. Direct discard

    Multiple: true and false. There are multiple messages to be confirmed in a channel. If it is true, all unresponsive messages will be confirmed and the message response will be received
    false is more secure, but not as efficient as true

Set manual response:

Set batch response:

7, RabbitMQ persistence

Queue persistence

D: Identifies the persistence of the queue

Code settings:

If an unsustainable queue already exists, an error will be reported in the second declaration

Message persistence

(there is no absolute guarantee that messages will not be lost)

8, Release confirmation

concept

Between producer and mq
The producer sets the channel to confirm mode. Once the channel enters confirm mode, all cancellations published on the channel will be cancelled
All messages will be assigned a unique ID (starting from 1). Once the message is delivered to all matching queues, the broker will send an acknowledgement to the producer (including the unique ID of the message), which makes the producer know that the message has correctly arrived at the destination queue. If the message and queue are persistent, The confirmation message will be sent after the message is written to the disk. The delivery tag field of the confirmation message returned by the broker to the producer contains the serial number of the confirmation message. In addition, the broker can also set basic The multiple field of ACK indicates that all messages before this serial number have been processed.

strategy

Production end settings:

Single confirmation

Synchronous - safe but slow efficiency

public static void publishMessageSingle() throws Exception {//It took 136643 milliseconds
    Channel channel = RabbitMqUtils.getChannel();
    //Declaration of queue
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //Open release confirmation
    channel.confirmSelect();
    //start time
    long begin = System.currentTimeMillis();

    //Batch message sending
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("",queueName,null,message.getBytes());
        //A single message is immediately acknowledged
        boolean flag = channel.waitForConfirms();
        if(flag) {
            System.out.println("Message sent successfully");
        }
    }
    //End time
    long end = System.currentTimeMillis();
    System.out.println("release" + MESSAGE_COUNT + "Separate confirmation messages, time consuming" + (end-begin) + "millisecond");
}

Batch confirmation

Fast confirmation speed ----- > unable to know what message failed to send

public static void publishMessageBatch() throws Exception {//It took 1711 milliseconds
    Channel channel = RabbitMqUtils.getChannel();
    //Declaration of queue
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //Open release confirmation
    channel.confirmSelect();
    //start time
    long begin = System.currentTimeMillis();

    //Bulk confirmation message size
    int batchSize = 100;
    //Batch publish message, batch publish confirmation
    for (int i = 1; i <= MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("",queueName,null,message.getBytes());

        //When 100 messages are judged, batch confirmation is performed once
        if(i % batchSize == 0){
            boolean flag = channel.waitForConfirms();
            if(flag) {
                System.out.println("Message sent successfully");
            }
        }
    }

    //End time
    long end = System.currentTimeMillis();
    System.out.println("release" + MESSAGE_COUNT + "Batch confirmation messages, time consuming" + (end-begin) + "millisecond");

}

Asynchronous confirmation Publishing

It has good performance and can be well controlled in case of errors

//Asynchronous publish confirmation
public static void publishMessageAsync() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //Declaration of queue
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //Open release confirmation
    channel.confirmSelect();
    //start time
    long begin = System.currentTimeMillis();

    /**
     * A thread safe and orderly hash table, which is suitable for high concurrency
     *  1.Easily associate sequence numbers with messages
     *  2.Easy batch deletion of entries, as long as the serial number is given
     *  3.Support high concurrency
     */
    ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

    //Message confirmation success callback function
    ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
        //=====2. Delete the confirmed messages, and the rest are unconfirmed messages=====
        if(multiple) {
            ConcurrentNavigableMap<Long, String> confirmd =
                    outstandingConfirms.headMap(deliveryTag);
        } else {
            outstandingConfirms.remove(deliveryTag);
        }

        System.out.println("Confirmed message:" + deliveryTag);
    };
    /**
     * Message confirmation failure callback function
     *  1.Identification of the message
     *  2.Batch confirmation
     */
    ConfirmCallback nackCallback = (deliveryTag, multiple) -> {//It took 274 milliseconds
        //=====3. Print out the unconfirmed messages=====
        String message = outstandingConfirms.get(deliveryTag);
        System.out.println("Unacknowledged messages are:" + message + "Unacknowledged messages tag: " + deliveryTag);
    };
    /**
     * Prepare a message listener to listen for which messages succeed and which messages fail
     *  1.Listening object for successful message sending
     *  2.Listening object for message sending failure
     */
    channel.addConfirmListener(ackCallback,nackCallback);//Asynchronous notification

    for (int i = 1; i <= MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("",queueName,null,message.getBytes());

        //======1. Record all messages to be sent here, and the sum of messages=====
        outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
    }

    //End time
    long end = System.currentTimeMillis();
    System.out.println("release" + MESSAGE_COUNT + "Asynchronous acknowledgement messages, time consuming" + (end-begin) + "millisecond");
}

To be continued

Keywords: RabbitMQ message queue

Added by Steppio on Sun, 26 Dec 2021 01:10:10 +0200