RabbitMQ (introduction, concept, installation and springboot integration)

RabbitMQ (introduction, concept, installation and springboot integration)

1, MQ introduction

In computer science, message queue (English: Message queue) is a way of communication between processes or between different threads of the same process. The storage column of software is used to process a series of inputs, usually from users. Message queue provides asynchronous communication protocol. The records in each storage column contain detailed data, including the time of occurrence, the type of input equipment, and specific input parameters, that is, the sender and receiver of the message do not need to interact with the message queue at the same time. The message is kept in the queue until it is retrieved by the recipient.

1.1. realization
  • Message queues are often stored in a linked list structure. Processes with permissions can write or read messages to the message queue.
  • At present, there are many open source implementations of message queues, including JBoss messaging Joram, Apache ActiveMQ, Sun0pen Message Queue, IBM MQ, Apache Qpid, and HTTPSQS.
  • Currently, RabbitMQ, RocketNQ, ActiveMQ, Kafka, ZeroNQ, MetaMq and other message queues are widely used, while some databases such as Redis, Mysql and phxsql can also realize the function of message queue.
1.2. characteristic

MQ is a typical representative of the consumer producer model. One end keeps writing messages to the message queue, while the other end can read or subscribe to the messages in the queue. MQ is similar to JMS, but the difference is that JMS is a standard and API definition of SUN JAVA message middleware service, while MQ follows the specific implementation and product of AMQP protocol.
be careful:

  1. AMQP, or advanced message queuing protocol, is an application layer standard and advanced message queuing protocol that provides unified messaging services. It is an open standard of application layer protocol and is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages, which is not limited by different products and development languages of the client / middleware.
  2. JMS, Java message service application program interface, is an API for message oriented middleware in Java platform,
    It is used to send messages between two applications or in distributed systems for asynchronous communication. Java message service is a platform independent API, and most MOM providers support JMS. Most of the common message queues implement the JMS API, such as
    ActiveMQ, redis, RabbitMQ, etc.
1.3. Advantages and disadvantages

advantage

Application coupling, asynchronous processing, traffic clipping

  • decoupling

    Traditional mode:

Disadvantages of traditional mode:
The coupling between systems is too strong. As shown in the above figure, system A directly calls the codes of system B and system C in the code. If system D is connected in the future, system A needs to modify the code, which is too troublesome!
Middleware mode:

Advantages of middleware mode:
To write A message to the message queue, the message system needs to subscribe from the message queue itself, so system A does not need to make any modification.
. asynchronous
Traditional mode:

Disadvantages of traditional mode:
- some unnecessary business logic runs synchronously, which is too time-consuming.

Middleware mode:

Advantages of middleware mode:
To write A message to the message queue, the message system needs to subscribe from the message queue itself, so system A does not need to make any modification.

  • Peak clipping

    Traditional mode:

Disadvantages of traditional mode:
When there is a large amount of concurrency, all requests are directly connected to the database, resulting in abnormal database connection

Middleware mode:

Advantages of middleware mode:
System A slowly pulls messages from the message queue according to the concurrency that the database can handle. In production, this short peak backlog is allowed.
shortcoming
Reduced system availability and increased system complexity

1.4. Usage scenario

Message queue is an important component in distributed system. Its general use scenario can be simply described as: when it is not necessary to obtain results immediately, but the concurrency needs to be controlled, it is almost the time when message queue needs to be used
In the project, some time-consuming operations without immediate return are extracted for asynchronous processing, which greatly saves the request response time of the server and improves the throughput of the system.

1.5. Why use RabbitMQ

AMQP, namely Advanced Message Queuing Protocol, is an open standard of application layer protocol, which is designed for message oriented middleware. Message middleware is mainly used for decoupling between components. The sender of the message does not need to know the existence of the message consumer, and vice versa.

The main features of AMQP are message oriented, queue oriented, routing (including point-to-point and publish / subscribe), reliability and security.

RabbitMQ is an open source AMQP implementation. The server side is written in Erlangi language and supports a variety of clients, such as Python
Ruby, . NET,Java, JMS, C,PHP,ActionScript, XMPP, STONP, etc. AJAX is supported. It is used to store and forward messages in distributed systems, and performs well in ease of use, scalability, high availability and so on.

The summary is as follows:

  • Based on AMQP protocol
  • High concurrency (a concept of capacity, the maximum number of tasks that the server can accept).
  • High performance (a concept of speed, the number of tasks that the server can handle in a unit time)
  • High availability (a persistent concept, which is the proportion of time the server can work normally per unit time).
  • Strong community support, and many companies are using it
  • Support Plug-Ins
  • Support multiple languages

2, Concept

  • About RabbitMQ: RabbitMQ is an open source implementation of AMQP (Advanced message queue protocol) developed by erlang.

  • Message: a message, which is unnamed, consists of a message header and a message body. The message body is opaque, while the message header consists of a series of optional attributes, including routing key, priority (priority relative to other messages), delivery mode (indicating that the message may need persistent storage), etc.

  • Publisher's message to the client is also a message to the publisher's application.

  • Exchange: a switch that receives messages sent by producers and routes them to queues in the server. There are four types of exchange: direct (default), fanout, topic, and headers (headers and direct switches are exactly the same, but their performance is much worse, and they are hardly used at present). Different types of exchange have different strategies for forwarding messages.

  • Queue: message queue, which is used to save messages until they are sent to consumers. It is the container of messages and the destination of messages. A message can be put into one or more queues. The message is always in the queue, waiting for the consumer to connect to the queue and take it away.

  • Binding: binding, used for the association between message queues and exchanges. A binding is a routing rule that connects the switch and message Queue based on the routing key, so the switch can be understood as a routing table composed of binding. The binding of Exchange and Queue can be many to many.

  • Connection: a network connection, such as a TCP connection.

  • Channel: channel, an independent bidirectional data flow channel in a multiplex connection. No matter whether the message is sent through the virtual channel or the real channel, the AMQP connection is completed. Because it is very expensive to establish and destroy TCP for the operating system, the concept of channel is introduced to reuse a TCP connection.

  • Consumer: the consumer of the message, which represents a client application that gets the message from the message queue.

  • Virtual Host: Virtual Host, which represents a batch of switches, message queues and related objects. The Virtual Host is sharing the same authentication and addition

    A separate server domain for a secure environment. Each vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. vhost is the basis of AMQP concept and must be specified when connecting. The default vhost of RabbitMQ is /.

  • Broker: represents the message queuing server entity

3, Installation

  1. Get image
#Specifies the version that contains the web control page
docker pull rabbitmq:management
  1. Run mirror
#Method 1: the default user is guest, and the password is also guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#Method 2: set user name and password
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

#Mode 3
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

	# 4369, 25672 (Erlang Discovery & cluster port) 
	# 5672, 5671 (AMQP port) 15672 (web management background port) 
	# 61613, 61614 (STOMP protocol port) 
	# 1883, 8883 (MQTT protocol port) 
	# https://www.rabbitmq.com/networking.html
  1. Accessing the ui interface
http://localhost:15672/

4, SpringBoot integrates RabbitMQ

4.1. Introduce dependency
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2. application.yml configuration
spring:
  rabbitmq:
    host: 192.168.10.123
    port: 5672
    virtual-host: /
    #Enable sender confirmation
    #publisher-confirms: true
    publisher-confirm-type: correlated
    # Enable the confirmation of the sender's message arrival queue
    publisher-returns: true
    template:
      #As long as you arrive at the queue, send asynchronously and call our return - confirm first
      mandatory: true
    listener:
      simple:
        #Manual ack message (manually confirm whether the message is consumed)
        acknowledge-mode: manual
4.3. RabbitConfig configuration class
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class MyRabbitConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    /**
     * Set the transmission message format to json
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * Customize RabbitTemplate
     * 1,The service will call back when it receives the message
     *      1,spring.rabbitmq.publisher-confirms: true
     *      2,Set confirmation callback
     * 2,When the message arrives in the queue correctly, it will be recalled
     *      1,spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2,Set the confirmation callback ReturnCallback
     *
     * 3,Consumer side confirmation (ensure that each message is consumed correctly, and then the broker can delete the message), that is, manual confirmation
     *      If the consumer goes down, the message will not be lost (channel.basicAck())
     *
     */
    // @After the postconstruct / / myrabbitconfig object is created, execute this method
    public void initRabbitTemplate() {

        /**
         * 1,ack=true as long as the message arrives at the Broker
         * correlationData: Unique associated data of the current message (this is the unique ID of the message)
         * ack: Whether the message was received successfully
         * cause: Reasons for failure
         */
       /* rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
            }
        });*/
        //Set confirmation callback
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * This failure callback is triggered as long as the message is not delivered to the specified queue
         * message: Message details of delivery failure
         * replyCode: Reply status code
         * replyText: Text content of reply
         * exchange: Which switch was this message sent to
         * routingKey: Which way Mail button was used for this message at that time
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

4.4. Rabbitmqconfig (create switches, queues, and bindings in container)
package com.lyh.mall.order.config;

import com.lyh.mall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.HashMap;


/**
 * MQ Switch, normal queue, dead letter queue and binding
 * Queue, Exchange and Binding in the container will be created automatically (when RabbitMQ does not exist)
 **/

@Configuration
public class MyRabbitMQConfig {

    /**
     * test
     */
    /*@RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("Received the expired order message: ready to close the order "+ orderEntity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }*/
  
   /**
     * Use JSON serialization mechanism for message conversion
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /* Queue, Exchange and Binding in the container will be created automatically (when RabbitMQ does not exist) */

    /**
     * Dead letter queue
     *
     * @return
     */
    @Bean
    public Queue orderDelayQueue() {
        /*
            Queue(String name,  Queue name
            boolean durable,  Persistent
            boolean exclusive,  Exclusive
            boolean autoDelete, Delete automatically
            Map<String, Object> arguments) attribute
         */
        HashMap<String, Object> arguments = new HashMap<>();
        //Dead letter routing
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        //Dead letter routing key
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // Message expiration time 1 minute
        //Create queue
        Queue queue = new Queue("order.delay.queue", true, false, false, arguments);

        return queue;
    }

    /**
     * Ordinary queue
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    }

    /**
     * TopicExchange
     * Create switch
     * @return
     */
    @Bean
    public Exchange orderEventExchange() {
        /*
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         * */
        return new TopicExchange("order-event-exchange", true, false);

    }

    /**
     * Bind dead letter queue
     * @return
     */
    @Bean
    public Binding orderCreateBinding() {
        /*
         * String destination, Destination (queue name or switch name)
         * DestinationType destinationType, Destination type (Queue, exhrange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    /**
     * Bind normal queue
     * @return
     */
    @Bean
    public Binding orderReleaseBinding() {

        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

    /**
     * Order release is directly bound to inventory release
     * @return
     */
    @Bean
    public Binding orderReleaseOtherBinding() {

        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    }


    /**
     * Commodity spike queue
     * @return
     */
    @Bean
    public Queue orderSecKillOrrderQueue() {
        Queue queue = new Queue("order.seckill.order.queue", true, false, false);
        return queue;
    }

    @Bean
    public Binding orderSecKillOrrderQueueBinding() {
        //String destination, DestinationType destinationType, String exchange, String routingKey,
        // 			Map<String, Object> arguments
        Binding binding = new Binding(
                "order.seckill.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.seckill.order",
                null);

        return binding;
    }

}
4.5. Test code
  • AmqpAdmin: management component
  • RabbitTemplate: message sending and processing component
  • @RabbitListener can listen to messages with three parameters (regardless of quantity or order): object content, message, channel and channel
import com.lyh.mall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;

@SpringBootTest
class MallOrderApplicationTests {

    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * send message
     */
    @Test
    void sendMessige(){
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("Hello!");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
        System.out.println("Sending message succeeded!!");
    }

    /**
     * Create switch
     */
    @Test
    void createExchange() {
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        System.out.println("Created successfully");
    }

    /**
     * Create queue
     */
    @Test
    void createQueue() {
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        System.out.println("Created successfully");
    }

    /**
     * Create binding
     */
    @Test
    void createBinding() {
        //String destination,
        // DestinationType destinationType [destination type],
        // String exchange,
        // String routingKey,
        //@Nullable map < string, Object > arguments
        Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        System.out.println("Created successfully");
    }

}

//The service layer adds listening annotations to obtain message data

/**
     * Listen for messages
     * queues Declare all queues that need to listen
     * org.springframework.amqp.core.Message
     * <p>
     * You can write the type of parameter
     * 1,Message essage: Native message details. Head + body
     * 2,Type of message sent: OrderReturnReasonEntity content;
     * 3,Channel channel:Current data transmission channel
     * <p>
     * Queue:Many people can listen. As long as a message is received, the queue deletes the message, and only one can receive the message
     * 1),Order service starts multiple: only one client can receive the same message
     * 2),When only one message is completely processed and the method runs, we can receive the next message
     */
     @RabbitListener(queues = {"hello-java-queue"})
    //Only this method of this class can accept Hello Java queue messages
    //@RabbitHandler / / annotate @ rabbitlistener on the class (queues = {"Hello Java queue"})
    public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {

        //Get the message body
//        byte[] body = message.getBody();
        //Get the message header
//        MessageProperties properties = message.getMessageProperties();

        System.out.println("Message received:" + content);

        //After the message is processed, manually confirm that the deliveryTag is automatically incremented in order in the Channel
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag->" + deliveryTag);

        try {
            if (deliveryTag % 2 == 0) {
                //Confirm to delete the message in the sign in queue false non batch mode
                channel.basicAck(deliveryTag, false);
            } else {
                //Reject return the third parameter - > true: rejoin the queue false: discard
                channel.basicNack(deliveryTag, false, true);
            }
        } catch (IOException e) {
            //Network interruption
        }
    }

    //    @RabbitHandler
    //public void receiveMessage2(OrderEntity content) {

       // System.out.println("received message:" + content);
    //}

Keywords: Java RabbitMQ Spring Boot

Added by webdes03 on Sun, 27 Feb 2022 04:59:11 +0200