Dark horse RabbitMQ advanced learning notes

RabbitMQ advanced content introduction

RabbitMQ advanced features

  • Message reliability delivery
  • Consumer ACK
  • Consumer end current limiting
  • TTL
  • Dead letter queue
  • Delay queue
  • Logging and monitoring
  • Message reliability analysis and tracking
  • Administration

RabbitMQ application problems

  • The information is reliable, please guarantee
  • Message idempotency processing

RabbitMQ cluster construction

  • RabbitMQ high availability cluster

1. RabbitMQ advanced features

1.1 reliable delivery of messages

1. Definition

When using RabbitMQ, the message sender wants to eliminate any message loss or delivery failure scenarios. RabbitMQ provides us with two ways to control the delivery reliability mode of messages.

  • confirm mode
  • Return return mode

The path of rabbitMQ message delivery is:
pruducer —>rabbitMQ broker ---->exchange------>queue ----> consumer

  • If the message goes from producer to exchange, a confirmCallback will be returned.
  • If the message fails to be delivered from exchange – > queue, a returnCallback will be returned.
    We will use these two callback s to control the reliable delivery of messages.

2. Producer code implementation

1) New spring project rabbitmq producer spring

2) Configure POM xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rabbitmq-producer-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3) Configure rabbitmq related information

rabbitmq.properties

rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

4) spring configuration file, defining queues and switches

spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--Define and manage switches and queues-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--definition rabbitMqTemplate Object operation can send messages easily in code-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--Message reliability delivery production end-->
    <rabbit:queue id="test_queue_confirm" name="queue_confirm_test"></rabbit:queue>
    <rabbit:direct-exchange name="exchange_confirm_test">
        <rabbit:bindings>
            <rabbit:binding queue="queue_confirm_test" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>


</beans>

5) Write a test class to test

ProducerTest.java


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /*
    Confirmation mode:
        Steps:
            1,Confirmation mode on: enabled in ConnectionFactory, publisher confirms = "true"
            2,Define the ConfirmCallback callback function in rabbitstamp

    */

    @Test
    public void testConfirm(){
        //2. Define callback
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /***
             *
             * @param correlationData For relevant configuration information, see parameter configuration information in convertAncSend
             * @param ack exchange Switch, whether the message has been successfully received. true means success and false means failure
             * @param cause Failure reason
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm Method was executed...");
                if(ack){
                    //Received successfully
                    System.out.println("Received successfully:"+cause);
                }else{
                    //Receive failed
                    System.out.println("Receive failure message:"+cause);
                    //Do some processing to send the message again
                }
            }
        });

        //3. Send message
        rabbitTemplate.convertAndSend("exchange_confirm_test111","confirm","message confirm...");
    }


    /**
     * Fallback mode: when the message is sent to exchange and the exchange fails to route to the Queue, returnCallback will be executed
     *  Steps:
     *      1,Enable fallback mode: also enabled in ConnectionFactory, publisher returns = "true"
     *      2,Set ReturnCallback
     *      3,Set the mode in which Exchange processes messages:
     *          ①If the message is not routed to the Queue, the message is discarded (default)
     *          ②If the message is not routed to the Queue, it is returned to the message sender ReturnCallback
     *
     */
    @Test
    public void testReturn(){
        //Set the mode for the switch to process failure messages
        //If this item is not set, the message will be discarded by default and the callback will not be triggered
        rabbitTemplate.setMandatory(true);

        //Set ReturnCallback
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   Message object
             * @param replyCode Error code
             * @param replyText error message
             * @param exchange  Switch
             * @param routingKey    Routing key
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Yes return");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });

        //3. Send message
        rabbitTemplate.convertAndSend("exchange_confirm_test","confirm111","message confirm...");
    }
}

6) Test results

  • Confirm the mode test result: prompt that there is no such switch
  • Return mode test results:
    Note: when the mode of the switch processing failure message is not set, it is the default mode. The message will be discarded and the callback function will not be called.

Summary:

  • Set publisher confirms = "true" of ConnectionFactory to enable confirmation mode.
  • Use rabbittemplate Setconfirmcallback sets the callback function. When the message is sent to exchange, the confirm method is called back. Judge the ack in the method. If it is true, the sending succeeds. If it is false, the sending fails and needs to be handled.
  • Set publisher returns = "true" of ConnectionFactory to enable fallback mode.
  • Use rabbittemplate Setreturncallback sets the return function. When the message fails to be routed from exchange to queue, if rabbittemplate Setmandatory (true) parameter, the message will be returned to the producer. And execute the callback function returnedMessage.
  • The transaction mechanism is also provided in RabbitMQ, but the performance is poor, so it will not be explained here.
    Use the following methods of channel to complete transaction control
    -txSelect() is used to set the current channel to transaction mode
    -txCommit(), used to commit transactions
    -txRollback(), used to roll back transactions

1.2,Consumer Ack

1. Definition

ack refers to knowledge, which is confirmed. Indicates the confirmation method after the consumer receives the message.
There are three confirmation methods:

  • Automatic confirmation: acknowledge = "none"
  • Manual confirmation: acknowledge = "manual"
  • Confirm according to the abnormal conditions: acknowledge = "auto", (this method is troublesome and will not be explained)

Automatic acknowledgement means that once a message is received by the Consumer, it will be automatically acknowledged and the corresponding message will be removed from the message cache of RabbitMQ.
However, in the actual business processing, it is likely that the message will be lost if there is an exception in the business processing after the message is received.
If manual validation is established, channel. is required after successful business processing. Basicack(), manually sign in. If there is an exception, call channel Basicnack() method to automatically resend the message.

2. Consumer code implementation

1) New rabbitmq consumer spring project

2) Write POM XML import dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rabbitmq-consumer-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3) MQ connection configuration rabbitmq properties

rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=heima
rabbitmq.password=heima
rabbitmq.virtual-host=/itcast

4) spring configuration file, set listening container

spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>
	<!--scanning com.itheima.listener All classes under the package-->
    <context:component-scan base-package="com.itheima.listener"></context:component-scan>
    

    <!--Define listener container-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>
    </rabbit:listener-container>

</beans>

5) Write the listening class acklistener java

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


/*
* Consumer ACK Mechanism:
*   1,Set manual sign in. acknowledge=“manual”
*   2,Let the listener class implement the ChannelAwareMessageListener interface
*   3,If the message is processed successfully, call channel's basicAck() to sign in
*   4,If the message processing fails, call channel's basicNack() to refuse to sign in, and the broker sends it to the consumer again
* */
@Component
public class AckListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    	Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try{
            //1. Receive conversion message
            System.out.println(new String(message.getBody()));

            //2. Processing business logic
            System.out.println("Processing business logic...");
            int i = 3/0;//An error occurred
            //3. Manual sign in
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){

            //4. Refuse to sign
            /*
            * The third parameter: request: return to the queue. If it is set to true, the message will return to the queue, and the broker will resend the message to the consumer
            *
            * */
            channel.basicNack(deliveryTag,true,true);
            //channel.basicReject(deliveryTag,true); Only single confirmation is allowed
        }
    }
}

6) Write a test class to test consumertest java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test1(){
        while(true){

        }
    }
}

7) Test results

  • Currently, there is a message in the queue
  • After running, the consumer thinks that the business is wrong and abnormal, resulting in that the message is not signed in, and the message returns to the queue again. The broker will resend the message to the consumer, and so on.

Consumer Ack summary

  • Set the acknowledge attribute in the rabbit: listener container tag, and set the ack mode. none: automatic confirmation, manual: manual confirmation
  • If there is no exception on the consumer side, call channel basicAck(deliveryTag,false); Method to confirm the sign in message
  • If there is an exception, call basicNack or basicReject in catch, reject the message, and let MQ retransmit the message.

Message reliability summary

  1. Persistence
      - exchange should be persistent
      - the queue needs to be persistent
      - message should be persistent
  2. The manufacturer should confirm
  3. The consumer needs to confirm the Ack
  4. Broker high availability

1.3 current limiting at consumer end

1. Definition

  • Why current limit?
    Reduce the pressure on the system to process requests.

2. Code implementation:

1) Create a new qoslistener java

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


/*
* Consumer Current limiting mechanism
*   1,Make sure the Ack mechanism is manual confirmation
*   2,listener-container Configuration properties
*       preFetch = 1,It means that the consumer will pull one message from mq for consumption every time, and will not continue to pull the next message until the consumption is manually confirmed.
* */
@Component
public class QosListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        
        //1. Get message
        System.out.println(new String(message.getBody()));

        //2. Processing business logic
        
        //3. Sign for
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}

2) Configure the listening container spring rabbitmq consumer xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <context:component-scan base-package="com.itheima.listener"></context:component-scan>

    <!--Define listener container-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
        <!--<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>-->
        <rabbit:listener ref="qosListener" queue-names="queue_confirm_test"></rabbit:listener>
    </rabbit:listener-container>

</beans>

3) Test class consumertest java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test1(){
        while(true){

        }
    }
}

4) Test of sending 10 messages to the production end

 @Test
    public void testSend(){
        for (int i = 0; i < 10; i++) {
            //3. Send message
            rabbitTemplate.convertAndSend("exchange_confirm_test","confirm","message confirm...");
        }
    }

5) Test results

① When the consumer does not manually sign in messages and preFetch = 1, the consumer will only receive one message.


② When the consumer does not set the number of prefetches and does not sign in manually, the consumer obtains all 10 messages at one time


③ When prefetch=1 and the consumer is set to sign in manually, the message side will pull messages one by one

Summary of current limit at consumer end

  • Configure the prefetch attribute in < rabbit: listener container >, and set how many messages the consumer pulls at a time
  • The confirmation mode of the consumer must be manual confirmation. acknowledge="manual"

1.4,TTL

1. Definition

  • The full name of TTL is Time To Live.
  • When the message reaches the survival time and has not been consumed, it will be automatically cleared.
  • RabbitMQ can set the expiration time for messages or for the entire Queue.

Summary:
① Set the parameter of queue expiration time: x-message-ttl, unit: ms, which will uniformly expire the messages of the whole queue
② Set the expiration time of the message and use the parameter: expiration. Unit: ms (milliseconds). When the message is in the column header (consumption), it will separately judge whether the message has expired.
③ If both are set, the time period shall prevail.

2. Code implementation:

1) New project rabbitmq TTL producer spring

2)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rabbitmq-producer-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3) Configure spring rabbitmq producer xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--Define and manage switches and queues-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--definition rabbitMqTemplate Object operation can send messages easily in code-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--Message reliability delivery production end-->
    <rabbit:queue id="test_queue_confirm" name="queue_confirm_test"></rabbit:queue>
    <rabbit:direct-exchange name="exchange_confirm_test">
        <rabbit:bindings>
            <rabbit:binding queue="queue_confirm_test" key="confirm"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--TTL-->
    <rabbit:queue id="test_queue_ttl" name="test_queue_ttl">
        <!--set up queue Parameters of-->
        <rabbit:queue-arguments>
            <!--x-message-ttl Refers to the expiration time of the queue-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test-exchange-ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>


</beans>

4) Test:

ProducerTest.java

/*
    * TTL:Expiration time
    * 1,Unified expiration time of queue
    *
    * 2,Message individual expiration time
    *
    * be careful:
    *   ①If the expiration time of the message is set, the expiration time of the queue is also set, whichever is shorter
    *   ②When the queue expires, all messages in the queue will be removed
    *   ③After a message expires, only when the message is at the top of the queue will it be judged whether it has expired (removed)
    * */
    @Test
    public void testTTl(){
        /*
        1,Unified expiration time of queue
        for (int i = 0; i < 10; i++) {
            //3,send message
            rabbitTemplate.convertAndSend("test-exchange-ttl","ttl.hehe","message ttl...");
        }*/



        //Post processing object of message
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1. Set the message of message
                message.getMessageProperties().setExpiration("5000");
                //2. Return this message
                return message ;
            }
        };
        /*2,Message individual expiration time*/
        rabbitTemplate.convertAndSend("test-exchange-ttl","ttl.hehe","message ttl...",messagePostProcessor);

    }

5) Results:

① Queue unified Expiration:


After 10 seconds, the unified expires and the message is removed

② Messages expire individually

After 5 seconds, the message is at the top of the queue (waiting for consumption) and is removed

1.5 dead letter queue

1. Definition

Dead letter queue, English abbreviation: DLX. Dead Letter Exchange (Dead Letter Exchange). When the message becomes Dead Message, it can be re sent to another switch, which is DLX.

2. There are three situations when a message becomes a dead letter:

1. The queue message length reaches the limit;
2. The consumer rejects the consumption message, basicNack/basicReject, and does not put the message into the original target queue, request = false;
3. There is a message expiration setting in the original queue, and the message arrival timeout has not been consumed;

3. Queue bound dead letter switch:

Set parameters for the queue: x-dead-letter-exchange and x-dead-letter-routing-key

4. Code implementation

1)spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--Define and manage switches and queues-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--definition rabbitMqTemplate Object operation can send messages easily in code-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--
        Dead letter queue:
            1,Declare a normal queue( test_queue_dlx)And switches( test_exchange_dlx)
            2,Declare dead letter queue( queue_dle)Dead letter switch( exchange_dlx)
            3,Normal queue binding dead letter switch
                Set two parameters:
                    x-dead-letter-exchange:Name of dead letter switch
                    x-dead-letter-routing-key: To the dead letter exchange routingkey
    -->

    <!--
        1,Declare a normal queue( test_queue_dlx)And switches( test_exchange_dlx)
    -->
    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3,Normal queue binding dead letter switch-->
        <rabbit:queue-arguments>
            <!--3.1,x-dead-letter-exchange:Name of dead letter switch-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx"></entry>
            <!--3.1,x-dead-letter-routing-key: To the dead letter exchange routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry>
            <!--4.1 Set the expiration time of the queue ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
            <!--4.2 Set the length limit of the queue max-length-->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--
        2,Declare dead letter queue( queue_dle)Dead letter switch( exchange_dlx)
   -->
    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

2)Producer.java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /*
    * Send test dead letter message
    *   1,Expiration time
    *   2,Length limit
    *   3,Message rejection
    *
    * */

    @Test
    public void testDlx(){
       //1. Test expiration time, dead letter message
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha", "test expiration time, dead letter message");

        //2. After testing the length limit, the message is dead letter
       /* for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","Test expiration time, dead letter message ");
        }*/

        //3. Test message rejection, dead letter message
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","Test expiration time, dead letter message");

    }
}

3) Consumer: dlxlistener java

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;



@Component
public class DlxListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try{
            //1. Receive conversion message
            System.out.println(new String(message.getBody()));

            //2. Processing business logic
            System.out.println("Processing business logic...");
            int i = 3/0;//An error occurred
            //3. Manual sign in
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){

            //4. Refuse to sign
            /*
            * The third parameter: request: return to the queue. If it is set to true, the message will return to the queue, and the broker will resend the message to the consumer
            *
            * */
            System.out.println("Exception occurred, reject!");
            //Refuse to receive, do not return to the queue, request = false
            channel.basicNack(deliveryTag,true,false);
            //channel.basicReject(deliveryTag,true); Only single confirmation is allowed
        }
    }
}

4) Consumer: spring rabbitmq consumer xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <context:component-scan base-package="com.itheima.listener"></context:component-scan>

    <!--Define listener container-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
        <!--<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>-->
        <!--<rabbit:listener ref="qosListener" queue-names="queue_confirm_test"></rabbit:listener>-->
        
        <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>
    </rabbit:listener-container>

</beans>

5) Results

a) Message expiration generates dead letter queue


b) Length limit to generate dead letter queue

Due to the length limit, the 10 messages sent later become dead letters


After the expiration time arrives, 10 messages in the normal queue will also become dead letters:

c) Message rejection generates dead letter

Summary:
1. Dead letter switch and dead letter queue are no different from ordinary ones
2. When a message becomes a dead letter, if the queue is bound to the dead letter switch, the message will be rerouted to the dead letter queue by the dead letter switch
3. There are three situations when a message becomes a dead letter
(1) The queue message length has reached the limit
(2) The consumer rejects the consumption message and does not return to the queue
(3) There is a message expiration setting in the original queue, and the message arrival timeout has not been consumed

1.6 delay queue

1. Definition

Delay queue, that is, the message will not be consumed immediately after entering the queue, and will be consumed only after reaching the specified time.
Requirements:
  1. If the order is not paid within 30 minutes, cancel the order and roll back the inventory.
  2. Send SMS greetings after 7 days of new user registration.

Implementation method:
  1. Timer
  2. Delay queue


Unfortunately, the delay queue function is not provided in rabbitMQ.
However, TTL + dead letter queue combination can be used to achieve the effect of delay queue.

2. Code implementation

1) Production side: spring rabbitmq producer xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"
                               publisher-confirms="true"
                               publisher-returns="true"
    />
    <!--Define and manage switches and queues-->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--definition rabbitMqTemplate Object operation can send messages easily in code-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

 
    <!--
        Delay queue:
            1,Define normal switch( order_exchange)And queue( order_queue)
            2,Define dead letter switch( order_exchange_dlx)And queue( order_queue_dlx)
            3,Bind and set the expiration time of the normal queue to 30 minutes
    -->
    
    <!--1,Define normal switch( order_exchange)And queue( order_queue)-->
    <rabbit:queue name="order_queue" id="order_queue">
        <!--3,Bind and set the expiration time of the normal queue to 30 minutes-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx"></entry>
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel"></entry>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--2,Define dead letter switch( order_exchange_dlx)And queue( order_queue_dlx)-->
    <rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

2) Production side: producer test java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDelay() throws InterruptedException {
        //1. Send order message. In the future, in the order system, after the order is placed successfully, a message will be sent
        rabbitTemplate.convertAndSend("order_exchange","order.msg","Order information: id=1,time=333333");

        //2. Print countdown
        for (int i = 0; i < 10; i++) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }
}

3) Consumer: orderlistener java

package com.itheima.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


@Component
public class OrderListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try{
            //1. Receive conversion message
            System.out.println(new String(message.getBody()));

            //2. Processing business logic
            System.out.println("Processing business logic...");
            System.out.println("According to the order id Query its status...");
            System.out.println("Judge whether the status is payment success?");
            System.out.println("Cancel the order and roll back the inventory!");
            //3. Manual sign in
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){

            //4. Refuse to sign
            /*
            * The third parameter: request: return to the queue. If it is set to true, the message will return to the queue, and the broker will resend the message to the consumer
            *
            * */
            System.out.println("Exception occurred, reject!");
            //Refuse to receive, do not return to the queue, request = false
            channel.basicNack(deliveryTag,true,false);
            //channel.basicReject(deliveryTag,true); Only single confirmation is allowed
        }
    }
}

4) Consumer: spring rabbitmq consumer xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--Load profile-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- definition rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>

    <context:component-scan base-package="com.itheima.listener"></context:component-scan>

    <!--Define listener container-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
        <!--<rabbit:listener ref="ackListener" queue-names="queue_confirm_test"></rabbit:listener>-->
        <!--<rabbit:listener ref="qosListener" queue-names="queue_confirm_test"></rabbit:listener>-->
        <!--Define listeners to listen to normal queues-->
        <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>-->
        <!--Note that when implementing the delay queue function, the consumer listens to the dead letter queue-->
        <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
    </rabbit:listener-container>

</beans>

5) Consumer side: consumertest java

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test1(){
        while(true){

        }
    }
}

6) Results


Summary
1. Delay queue refers to that after a message enters the queue, it can be delayed for a certain time before consumption
2. RabbitMQ does not provide the function of delay queue, but the combination of TTL + dead letter queue can be used to achieve the effect of delay queue.

2. RabbitMQ application problems

2.1. Message reliability guarantee - Message compensation

2.2 message idempotency guarantee

1. Definition

Idempotency refers to one and more requests for a resource, which should have the same result for the resource itself. In other words, the impact of any number of executions on the resource itself is the same as that of the first execution.
In MQ, it refers to consuming multiple identical messages and getting the same result as consuming the message once.

2. Message idempotency Guarantee -- optimistic locking mechanism

Added by dgiberson on Mon, 07 Mar 2022 17:10:29 +0200