RabbitMQ how to ensure the reliability of messages and Spring AMQP features

Mainstream Message Oriented Middleware

ActiveMQ

summary

from Apache Produce,Java development,support JMS1.1 Agreement and J2EE14 standard.

Support a wide range of connection protocols: OpenWire/STOMP/REST/XMPP/AMQP

Support multiple voice clients,Support Plug-Ins

Convenient management,Easy to configure cluster agent

advantage

be based on AVA,Cross platform operation

Can use BC Connect to multiple databases

It has perfect interface, monitoring and security mechanism

Automatic reconnection and error retry

shortcoming

Less active community RabbitMO

At present, the focus is on 60 products Apollo,For 5 Bug Less maintenance

It is not suitable for application scenarios with thousands of queues

RabbitMQ

summary

The most popular message oriented middleware

high reliability,Support sending confirmation,Delivery confirmation and other characteristics

High availability,Support image queue

Support Plug-Ins

advantage

be based on Erlang,Support high concurrency

Support multiple platforms,Multiple clients,Complete documentation

High reliability

It has a large-scale application in Internet companies,High community activity

shortcoming

Erlang Voice is relatively small,Not conducive to secondary development

Under Agent Architecture,The central node adds latency,Affect performance

use AMQP agreement,There is a learning cost to use

RocketMQ

summary

Alibaba team development,Stand the test of double 11

It can ensure strict message order

100 million message accumulation capacity

Rich message pull mode

advantage

be based on Java,Facilitate secondary development

A single machine supports more than 10000 persistent queues

Both memory and disk have a copy of data,Guaranteed performance+High availability

More active development,The version update is fast

shortcoming

There are not many types of clients,More mature is Java and C++

No, Web Management interface,Provides a CL(Command line interface)

Community attention and maturity are not as good as RabbitMQ

Kafka

summary

LinkedIn Developed a distributed log submission system

Unique zoning characteristics,Applicable to big data system

High performance and good scalability

Replicable, fault tolerant

advantage

Native distributed system

Zero copy technology,reduce IO Operation steps,Improve system throughput

Fast persistence:Can be in O(1)Message persistence under system overhead

Support batch data sending and fetching

shortcoming

More than 64 queues per machine/Partition time,Obvious performance degradation

Use short polling mode,Real time depends on the polling interval

Consumption failed. Retry is not supported

Poor reliability

RabbitMQ related terms

Erlang

Erlang language, developed by Ericsson, is a programming language born for switch software development

characteristic

General concurrency oriented programming language,For distributed systems

Virtual machine based interpretation run,Cross platform deployment

Inter process context switching is much more efficient than C language

With and native Socket Same delay

RabbitMQ is implemented in Erlang at the bottom, so it has high-performance characteristics

AMQP protocol

AMQP (Advanced Message Queuing Protocol) is a network protocol for asynchronous message transmission between processes.


Broker:Applications for receiving and distributing messages, RabbitMQ namely Message Broker

Virtual host:fictitious Broker,Isolate multiple units

Connection: publisher/ consumer and broker Between TCP connect

Channel: connection Internal logical connection,Typically, each thread creates a separate channel

Routing Key:Routing key,Used to indicate that the routing forwarding of messages is equivalent to the address of express delivery

Exchange:Switch,It is equivalent to the distribution center of express

Queue:Message queue,The message was finally sent here to wait consumer Take away

Binding: exchange and queue Virtual connection between for message Distribution basis

Exchange

In the implementation of AMQP protocol or RabbitMQ, the core component is Exchange, which undertakes the core function of RabbitMO - routing and forwarding

effect

Exchange yes AMQP Agreement and RabbitMQ Core components of

Exchange The function of is to provide a route for the message according to the binding relationship and routing key, and forward the message to the corresponding queue

Exchange There are 4 types: Direct/Topic/Fanout/Headers among Headers The former three are mainly used

1.Direct Exchange

Direct Exchange (direct routing). If the Routing Key in the message is consistent with the Binding Key, Direct Exchange sends the message to the corresponding queue

2.Fanout Exchange

For Fanout Exchange (broadcast routing), each message sent to Fanout Exchange will be distributed to all bound queue s

3.Topit Exchange

Topic Exchange: Topic Exchange distributes messages to the target Queue according to the Routing Key and general configuration rules

Full match: similar to Direct

Binding Key Medium#: matches any number of word s

Binding Key Medium*:Match any one word

RabbitMQ installation

Windows setup

Installing Erlang OTP https://www.erlang.org/downloads

Install RabbitMQ https://www.rabbitmq.com/

Linux Installation

Docker based installation

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Web side management tools

RabbitMQ Web management tools are also called management console and control console

The management console is RabbitMQ The most commonly used management and configuration tools

The management console is also very useful for business development and debugging

Enable front-end plug-ins

Start application: get into bin Directory execution rabbitmq-plugins enable rabbitmq management

Browser open:127.0.0.1:15672

Default user name: guest Default password: guest

function

Overview: viewing nodes/Cluster status

Connections: Viewing connection

Channels: Viewing channel

Switch: view and operate switch status

Queue: view and operate queue status

Management: advanced management functions

Use of command line tools

Status view

View status: rabbitmqctl status

View bindings: rabbitmqctl list_bindings

see channel: rabbitmqctl list_channels

see connection: rabbitmqctl list_connections

View consumer: rabbitmqctl list_consumers

View switch: rabbitmqctl list_exchanges

View queue: rabbitmqctl list_queues

Delete queue: rabbitmqctl delete_queue

Empty queue: rabbitmqctl purge_queue

User related

New user: rabbitmqctl add_user

Modify user password: rabbitmqctl change_password

View user: rabbitmqctl list_users

Set user roles: rabbitmqctl rabbitmqctl set_user_tags

Application start stop

Start application: rabbitmqctl start_app

Close app: rabbitmqctl stop_app,retain Erlang Virtual machine (paused)

Close app: rabbitmqctl stop,And close Erlang virtual machine

Cluster correlation

Join the cluster: rabbitmqctl join_cluster

Leave the cluster: rabbitmqctl reset

Mirrored Queues

Set the mirror queue: rabbitmqctl sync_queue

Cancel mirror queue: rabbitmqctl cancel_sync_queue

Basic use of RabbitMQ

Add dependency

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

Configuring RabbitMQ

Affirms Exchange

Affirms Queue

Declare that the switch is bound to the queue
 public void initMq(Channel channel) throws IOException, TimeoutException, InterruptedException {
        log.info("-------MQ Switch,queue,Binding initialization started---------");

        log.info("-------Declare switch start---------");
        /**
         * Claim switch
         * Whether the switch name routing mode is persistent or whether special parameters are automatically deleted
         */
        channel.exchangeDeclare("exchange.test", BuiltinExchangeType.DIRECT, true, false, null);

        log.info("-------Claim queue start---------");
        /**
         * Declare a queue
         *
         * Parameters:
         * queue – The name of the queue
         * Persistent - true if we declare a persistent queue (the queue will continue to exist after the server is restarted)
         * Exclusive – true if we declare an exclusive queue (this connection only)
         * autoDelete – true if we declare an auto delete queue (the server will delete it when it is no longer in use)
         * Parameters - other properties of the queue (construction parameters)
         */
        channel.queueDeclare("queue.test", true, false, false, null);

        log.info("-------Declare the binding relationship between switch and queue---------");
        /**
         * Bind queue to switch
         *
         * Parameters:
         * queue – The name of the queue
         * exchange – Name of exchange
         * routingKey – Routing key for binding
         */
        channel.queueBind("queue.test", "exchange.test", "key.test");

    }

Consumer monitoring message

To use asynchronous threads and newly opened threads for message listening, you need to create a thread pool to prevent thread explosion

Configure thread pool

Processing flow of ThredPoolTaskExcutor

When the pool size is less than corePoolSize,Create a new thread and process the request

When the pool size is equal to corePoolSize,Put the request in workQueue The idle threads in the pool go workQueue Fetch and process tasks

When workQueue When the task cannot be placed, a new thread will be created into the pool and handle the request. If the pool size is too large maximumPoolSize,Just use RejectedExecutionHandler For rejection

When the number of threads in the pool is greater than corePoolSize When, extra threads wait keepAliveTime For a long time, if there is no request to deal with, it will be destroyed by itself
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //Set the number of core threads
        threadPool.setCorePoolSize(10);
        //Set the maximum number of threads
        threadPool.setMaxPoolSize(100);
        //The buffer queue used by the thread pool
        threadPool.setQueueCapacity(10);
        //Waiting for the task to complete at shutdown -- indicates waiting for all threads to complete execution
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // The waiting time (0 by default, which will stop immediately) is not forced to stop after xx seconds
        threadPool.setAwaitTerminationSeconds(60);
        //  Thread name prefix
        threadPool.setThreadNamePrefix("MQ-Async-");
        // Initialize thread
        threadPool.initialize();
        return threadPool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

Message listening method

private void startMessageMonitoring(Channel channel) throws IOException {
        log.info("-------Consumer monitoring starts---------");
        /**
         * Use the server generated consumerTag to start non local and non exclusive consumers.
         * Only basic Deliver and basic Cancel access to the AMQP method (this is sufficient for most cases). View the method with the Consumer parameter to access all application callbacks.
         *
         * Parameters:
         * queue – The name of the queue
         * autoAck – True if the server should consider that the message is confirmed once sent; false if the server should expect explicit confirmation
         * DeliverCallback – Callback when delivering message
         * cancelCallback – Callback when consumer cancels
         */
        channel.basicConsume("queue.test", true, deliverCallback, cancelCallback);

        while (true) {

        }
    }

    DeliverCallback deliverCallback = (consumerTag, message) -> {
        log.info("-------Consumer receives message,Start processing---------");
        String messageBody = new String(message.getBody());
        log.info("Received message:{}", messageBody);
    };

    CancelCallback cancelCallback = (consumerTag) -> {
        log.info("-------Callback when consumer cancels---------");
    };

Listening call configuration

    /**
     * Add @ Configuration annotation to this class
     * 
     * When the project starts, asynchronously obtain a thread from the thread pool to perform MQ initialization and queue listening
     *
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException
     */
    @Async
    @Autowired
    public void messageMonitoring() throws IOException, TimeoutException, InterruptedException {
        log.info("-------Create connection start---------");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            this.initMq(channel);

            this.startMessageMonitoring(channel);

        }
    }

Producer sends message

 public static void sendMsg(long number) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            log.info("Producer sends message:{}", number);
            channel.basicPublish("exchange.test", "key.test", null, String.valueOf(number).getBytes());
        }
    }
    public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            sendMsg(i);
        }
    }

Perform test

Start project

  INFO 14360 --- [  restartedMain] c.e.springboot.config.RabbitMQConfig     : -------Create connection start---------
  INFO 14360 --- [  restartedMain] c.e.springboot.config.RabbitMQConfig     : -------MQ Switch,queue,Binding initialization started---------
  INFO 14360 --- [  restartedMain] c.e.springboot.config.RabbitMQConfig     : -------Declare switch start---------
  INFO 14360 --- [  restartedMain] c.e.springboot.config.RabbitMQConfig     : -------Claim queue start---------
  INFO 14360 --- [  restartedMain] c.e.springboot.config.RabbitMQConfig     : -------Declare the binding relationship between switch and queue---------
  INFO 14360 --- [  restartedMain] c.e.springboot.config.RabbitMQConfig     : -------Consumer monitoring starts---------

send message

 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:0
 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:1
 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:2
 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:3
 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:4

Message listening

  INFO 14360 --- [pool-1-thread-4] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 14360 --- [pool-1-thread-4] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 14360 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 14360 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Received message:1
  INFO 14360 --- [pool-1-thread-6] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 14360 --- [pool-1-thread-6] c.e.springboot.config.RabbitMQConfig     : Received message:2
  INFO 14360 --- [pool-1-thread-7] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 14360 --- [pool-1-thread-7] c.e.springboot.config.RabbitMQConfig     : Received message:3
  INFO 14360 --- [pool-1-thread-8] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 14360 --- [pool-1-thread-8] c.e.springboot.config.RabbitMQConfig     : Received message:4

How to ensure the reliability of messages

Sender

use RabbitMQ The sender side confirms that the confirmation message is successfully sent to RabbitMQ And processed

use RabbitMQ Message return mechanism. If the target queue is not found, the middleware will notify the sender

Sender acknowledgement mechanism

After the message is sent, if the middleware receives the message, it will give a reply to the sender

The producer receives the response to confirm whether the message is sent to the middleware normally

Three confirmation mechanisms

Single synchronization confirmation

Multiple synchronization confirmation

Asynchronous acknowledgement

###Single synchronization confirmation mechanism

to configure channel,Enable confirmation mode: channel.confirmSelect()

Each time a message is sent, call channel.waitForConfirms()Method, waiting for confirmation
public static void sendMsg(long number) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.confirmSelect();
            log.info("Producer sends message:{}", number);
            channel.basicPublish("exchange.test", "key.test", null, String.valueOf(number).getBytes());
            if (channel.waitForConfirms()){
                log.info("MQ confirm success");
            }else {
                log.info("MQ confirm failed");
            }
        }
    }
22:56:05.072 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:0
22:56:05.151 [main] INFO com.example.springboot.config.RabbitMQConfig - MQ confirm success
22:56:05.714 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:1
22:56:05.776 [main] INFO com.example.springboot.config.RabbitMQConfig - MQ confirm success

Multiple synchronization confirmation mechanism

to configure channel,Enable confirmation mode: channel.confirmSelect()

After sending multiple messages, call channel.waitForConfirms()Method, waiting for confirmation
public static void sendMsg(long number) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.confirmSelect();
            log.info("Producer sends message:{}", number);
            for (int i = 0; i < 5; i++) {
                channel.basicPublish("exchange.test", "key.test", null, String.valueOf(number).getBytes());
            }
            if (channel.waitForConfirms()){
                log.info("MQ confirm success");
            }else {
                log.info("MQ confirm failed");
            }
        }
    }

Producer sends message

22:59:36.793 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:0
22:59:36.893 [main] INFO com.example.springboot.config.RabbitMQConfig - MQ confirm success

Consumer monitoring message

  INFO 2692 --- [ool-1-thread-12] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-12] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0

Asynchronous synchronous acknowledgement mechanism

to configure channel,Enable confirmation mode: channel.confirmSelect()

stay channel Add listening on: addConfirmListener,After sending a message, this method will be called back to notify whether the message was sent successfully

Asynchronous acknowledgement may be a single message,It can also be multiple,depend on MQ
    public static void sendMsg(long number) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
             
            channel.confirmSelect();
            ConfirmListener confirmListener=new ConfirmListener() {
                /**
                 *
                 * @param deliveryTag The number of the message sent, the number of the item
                 * @param multiple Confirm multiple messages, TRUE: confirm multiple messages
                 * @throws IOException
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("handleAck,deliveryTag:{},multiple:{}",deliveryTag,multiple);
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("handleNack,deliveryTag:{},multiple:{}",deliveryTag,multiple);
                }
            };
            channel.addConfirmListener(confirmListener);
            log.info("Producer sends message:{}", number);
            for (int i = 0; i < 5; i++) {
                channel.basicPublish("exchange.test", "key.test", null, String.valueOf(number).getBytes());
            }

            Thread.sleep(5000);
        }
    }
  INFO 2692 --- [ool-1-thread-12] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-12] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 2692 --- [ool-1-thread-13] c.e.springboot.config.RabbitMQConfig     : Received message:0
23:07:14.580 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:0
23:07:14.657 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:1,multiple:false
23:07:14.666 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:5,multiple:true

Message return mechanism

Opening method

stay RabbitMQ There is a key configuration item in the basic configuration: Mandatory

Mandatory if it is false,RabbitMQ Messages that cannot be routed will be discarded directly

Mandatory if it is true,RabbitMQ Will process messages that cannot be routed
public static void sendMsg(long number) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.confirmSelect();
            ConfirmListener confirmListener=new ConfirmListener() {
                /**
                 *
                 * @param deliveryTag The number of the message sent, the number of the item
                 * @param multiple Confirm multiple messages, TRUE: confirm multiple messages
                 * @throws IOException
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("handleAck,deliveryTag:{},multiple:{}",deliveryTag,multiple);
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("handleNack,deliveryTag:{},multiple:{}",deliveryTag,multiple);
                }
            };
            channel.addConfirmListener(confirmListener);

            //Add message return mechanism
//            channel.addReturnListener(new ReturnListener() {
//                @Override
//                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                    log.info("handleReturn,replyCode:{},replyText:{},exchange:{},routingKey:{},properties:{},replyText:{}",replyCode,replyText,exchange,routingKey,properties,new String(body));
//                }
//            });
            
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return returnMessage) {
                    log.info("handleReturn,replyCode:{},replyText:{},exchange:{},routingKey:{},properties:{},replyText:{}",returnMessage.getReplyCode(),returnMessage.getReplyText(),returnMessage.getExchange(),returnMessage.getRoutingKey(),returnMessage.getProperties(),new String(returnMessage.getBody()));
                }
            });


            log.info("Producer sends message:{}", number);
            for (int i = 0; i < 5; i++) {
                // Mandatory: enable message return mechanism
                channel.basicPublish("exchange.test", "key.test222222", true,null, String.valueOf(number).getBytes());
            }

            Thread.sleep(5000);
        }
    }
23:31:29.263 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:0
23:31:29.339 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleReturn,replyCode:312,replyText:NO_ROUTE,exchange:exchange.test,routingKey:key.test222222,properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null),replyText:0
23:31:29.339 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:1,multiple:false
23:31:29.346 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleReturn,replyCode:312,replyText:NO_ROUTE,exchange:exchange.test,routingKey:key.test222222,properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null),replyText:0
23:31:29.346 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:2,multiple:false
23:31:29.346 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleReturn,replyCode:312,replyText:NO_ROUTE,exchange:exchange.test,routingKey:key.test222222,properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null),replyText:0
23:31:29.346 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:3,multiple:false
23:31:29.347 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleReturn,replyCode:312,replyText:NO_ROUTE,exchange:exchange.test,routingKey:key.test222222,properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null),replyText:0
23:31:29.347 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:4,multiple:false
23:31:29.347 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleReturn,replyCode:312,replyText:NO_ROUTE,exchange:exchange.test,routingKey:key.test222222,properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null),replyText:0
23:31:29.347 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:5,multiple:false

Consumer

Need to use RabbitMQ The consumer side confirms that there is no processing exception for the confirmation message

Need to use RabbitMQ The flow limiting mechanism at the consumer end limits the message push speed and ensures the stability of the service at the receiving end

Consumer confirmation mechanism

Consumer ACK type

automatic ACK:After receiving the message, the consumer will automatically sign in the message

Manual ACK:After receiving the message, the consumer will not automatically sign in the message, and needs to explicitly sign in the business code

Manual ACK type

Single manual ACK:multiple=false

Multiple manual ACK:multiple=true

Single is recommended ACK
private void startMessageMonitoring(Channel channel) throws IOException {
        log.info("-------Consumer monitoring starts---------");
        /**
         * Use the server generated consumerTag to start non local and non exclusive consumers.
         * Only basic Deliver and basic Cancel access to the AMQP method (this is sufficient for most cases). View the method with the Consumer parameter to access all application callbacks.
         *
         * Parameters:
         * queue – The name of the queue
         * autoAck – True if the server should consider that the message is confirmed once sent; false if the server should expect explicit confirmation
         * DeliverCallback – Callback when delivering message
         * cancelCallback – Callback when consumer cancels
         */
        //Autoack: true -- > false, manual sign in
        channel.basicConsume("queue.test", false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                log.info("-------Consumer receives message,Start processing---------");
                String messageBody = new String(message.getBody());
                log.info("Received message:{}", messageBody);
                // Manually sign in messages
                log.info("Manual sign in DeliveryTa:{}",message.getEnvelope().getDeliveryTag());
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

				// Sign in every 5 messages
                //if (message.getEnvelope().getDeliveryTag() % 5==0){
                //    channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
                //}
            }
        }, cancelCallback);

        while (true) {

        }
    }

    DeliverCallback deliverCallback = (consumerTag, message) -> {
        log.info("-------Consumer receives message,Start processing---------");
        String messageBody = new String(message.getBody());
        log.info("Received message:{}", messageBody);
    };

    CancelCallback cancelCallback = (consumerTag) -> {
        log.info("-------Callback when consumer cancels---------");
    };

23:58:58.213 [main] INFO com.example.springboot.config.RabbitMQConfig - Producer sends message:0
23:58:58.285 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:1,multiple:false
23:58:58.292 [AMQP Connection ip:5672] INFO com.example.springboot.config.RabbitMQConfig - handleAck,deliveryTag:5,multiple:true
  INFO 3272 --- [pool-1-thread-4] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 3272 --- [pool-1-thread-4] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 3272 --- [pool-1-thread-4] c.e.springboot.config.RabbitMQConfig     : Manual sign in DeliveryTa:1
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Manual sign in DeliveryTa:2
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Manual sign in DeliveryTa:3
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Manual sign in DeliveryTa:4
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 3272 --- [pool-1-thread-5] c.e.springboot.config.RabbitMQConfig     : Manual sign in DeliveryTa:5

Return to queue

If the callback queue is set, the message is NACK After that, it will return to the end of the queue and wait for further processing

It is generally not recommended to open the replay queue, because it is basically an exception to process an exception message for the first time and again
 //Autoack: true -- > false, manual sign in
        channel.basicConsume("queue.test", false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                log.info("-------Consumer receives message,Start processing---------");
                String messageBody = new String(message.getBody());
                log.info("Received message:{}", messageBody);
                // Manual sign in message multiple:false: sign in single, true: sign in multiple
                //log.info("manually sign in deliveryta: {}", message. Getenvelope() getDeliveryTag());
                //channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

				// Sign in every 5 messages
                //if (message.getEnvelope().getDeliveryTag() % 5==0){
                //    channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
                //}

                //Manually reject and return to the queue
                log.info("Manual rejection,Return to queue,rejection DeliveryTa:{}",message.getEnvelope().getDeliveryTag());
                channel.basicNack(message.getEnvelope().getDeliveryTag(), false,true);
            }
        }, cancelCallback);

  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Manual rejection,Return to queue,rejection DeliveryTa:1365
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Manual rejection,Return to queue,rejection DeliveryTa:1366
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Manual rejection,Return to queue,rejection DeliveryTa:1367
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Manual rejection,Return to queue,rejection DeliveryTa:1368
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : -------Consumer receives message,Start processing---------
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Received message:0
  INFO 11212 --- [ool-1-thread-18] c.e.springboot.config.RabbitMQConfig     : Manual rejection,Return to queue,rejection DeliveryTa:1369

Consumer end current limiting mechanism

QOS of RabbitMQ

QoS Service quality assurance function

It ensures that a certain number of messages are not confirmed,Don't consume new news

QoS The premise of the function is that automatic confirmation is not used

QoS principle

QoS The principle is that when a certain number of messages on the consumer end are not received ACK When confirming RabbitMQ Do not push new messages to the consumer

RabbitMQ use QoS The mechanism realizes the current limit at the consumer end

Parameter setting of consumer end current limiting mechanism

ρrefetch Count:How many unconfirmed messages can be pushed for a consumer

global:true Current limiting for the entire consumer end false:For the current channel

prefetch size:0(Single message size limit,Generally 0)

prefetch Size And globa Two items, RabbitmQ Temporarily not implemented

Current limiting is not used

The message producer sends multiple messages at once

Message listener thread every 1 s Execute a sign in

Consumer initiated service,Receive all sent messages at once
private void startMessageMonitoring(Channel channel) throws IOException {
        log.info("-------Consumer monitoring starts---------");
        //Autoack: true -- > false, manual sign in
        channel.basicConsume("queue.test", false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                log.info("-------Consumer receives message,Start processing---------");
                String messageBody = new String(message.getBody());
                log.info("Received message:{}", messageBody);

                // Sign in every second
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // Manually sign in messages
                log.info("Manual sign in DeliveryTa:{}",message.getEnvelope().getDeliveryTag());
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        }, cancelCallback);

        while (true) {

        }
    }

    DeliverCallback deliverCallback = (consumerTag, message) -> {
        log.info("-------Consumer receives message,Start processing---------");
        String messageBody = new String(message.getBody());
        log.info("Received message:{}", messageBody);
    };

    CancelCallback cancelCallback = (consumerTag) -> {
        log.info("-------Callback when consumer cancels---------");
    };


Use current limiting

Set prefetchCount value

    private void startMessageMonitoring(Channel channel) throws IOException {
        log.info("-------Consumer monitoring starts---------");
        // How many unconfirmed messages can be pushed for a consumer
        channel.basicQos(2);
        //Autoack: true -- > false, manual sign in
        channel.basicConsume("queue.test", false, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                log.info("-------Consumer receives message,Start processing---------");
                String messageBody = new String(message.getBody());
                log.info("Received message:{}", messageBody);

                // Sign in every second
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // Manually sign in messages
                log.info("Manual sign in DeliveryTa:{}",message.getEnvelope().getDeliveryTag());
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        }, cancelCallback);

        while (true) {

        }
    }

    DeliverCallback deliverCallback = (consumerTag, message) -> {
        log.info("-------Consumer receives message,Start processing---------");
        String messageBody = new String(message.getBody());
        log.info("Received message:{}", messageBody);
    };

    CancelCallback cancelCallback = (consumerTag) -> {
        log.info("-------Callback when consumer cancels---------");
    };


RabbitMQ

A large number of accumulated messages will put great pressure on RabbitMQ. RabbitMQ message expiration mechanism needs to be used to prevent a large number of messages from being overstocked

After expiration, it will be discarded directly and cannot alert the system for abnormal operation. RabbitMQ dead letter queue needs to be used to collect expired messages for analysis

Message expiration mechanism

Expiration time TTL of RabbitMQ

Rabbitmo The expiration time of is called TTL( Time to live),survival time 

RabbitMQ The expiration time of is divided into messages T And queue TTL

news TTL Set the expiration time of a single message

queue TTL Set the expiration time of all messages in the queue

Message expiration time

    public static void sendMsg(long number) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.confirmSelect();
            ConfirmListener confirmListener = new ConfirmListener() {
                /**
                 *
                 * @param deliveryTag The number of the message sent, the number of the item
                 * @param multiple Confirm multiple messages, TRUE: confirm multiple messages
                 * @throws IOException
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    log.info("handleAck,deliveryTag:{},multiple:{}", deliveryTag, multiple);
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    log.info("handleNack,deliveryTag:{},multiple:{}", deliveryTag, multiple);
                }
            };
            channel.addConfirmListener(confirmListener);

            //Add message return mechanism
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return returnMessage) {
                    log.info("handleReturn,replyCode:{},replyText:{},exchange:{},routingKey:{},properties:{},replyText:{}", returnMessage.getReplyCode(), returnMessage.getReplyText(), returnMessage.getExchange(), returnMessage.getRoutingKey(), returnMessage.getProperties(), new String(returnMessage.getBody()));
                }
            });

            // Mandatory: enable message return mechanism
            for (int i = 0; i < 30; i++) {
                log.info("Producer sends message:{}", i);
                //Message expires in 5 seconds
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();
                channel.basicPublish("exchange.test", "key.test", true, basicProperties, String.valueOf(i).getBytes());
            }
        }
    }

Producer sends 10 messages

Automatically delete when 10 seconds are up

Queue message expiration time

Set queue message expiration time when declaring a queue

    public void initMq(Channel channel) throws IOException, TimeoutException, InterruptedException {
        log.info("-------MQ Switch,queue,Binding initialization started---------");

        log.info("-------Declare switch start---------");

        channel.exchangeDeclare("exchange.test", BuiltinExchangeType.DIRECT, true, false, null);

        log.info("-------Claim queue start---------");

        //Set queue expiration time
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-message-ttl",10000);
        channel.queueDeclare("queue.test", true, false, false, map);

        log.info("-------Declare the binding relationship between switch and queue---------");

        channel.queueBind("queue.test", "exchange.test", "key.test");

    }


After 10 seconds, all queue messages are deleted

Dead letter queue

Dead letter queue:The queue is configured DLX attribute(Dead- Letter- Exchange)

When a message becomes a dead letter( dead message)after,Can be republished to another Exchange,this Exchange It is also a general switch

After the dead letter is routed by the dead letter switch,Generally enter a fixed queue

Become a dead letter

Message rejected( reject/nack)also requeue= false

Interest expiration(TTL expire)

The queue has reached its maximum length

Set dead letter queue

Set the switch and queue for forwarding and receiving dead letters
	Exchange: dlx.exchange
	Queue: dIx.queue
	RoutingKey:#

Add parameters to the queue where dead letter needs to be set
	x-dead-letter-exchange=dlx.exchange
    public void initMq(Channel channel) throws IOException, TimeoutException, InterruptedException {
        log.info("-------MQ Switch,queue,Binding initialization started---------");

        log.info("-------Declare switch start---------");

        channel.exchangeDeclare("exchange.test", BuiltinExchangeType.DIRECT, true, false, null);

        log.info("-------Claim queue start---------");
        
        /**
         * If the queue exists, you need to delete the queue after modifying the parameters, otherwise an error will be reported
         */
        HashMap<String, Object> map = new HashMap<>();
        //Set queue expiration time
        map.put("x-message-ttl", 10000);
        //Set the switch into which messages expire
        map.put("x-dead-letter-exchange", "exchange.dlx");
        //Sets the maximum length of the queue
        map.put("x-max-length", 10);

        channel.queueDeclare("queue.test", true, false, false, map);

        log.info("-------Declare the binding relationship between switch and queue---------");

        channel.queueBind("queue.test", "exchange.test", "key.test");


        /*
          Dead letter switch
         */
        channel.exchangeDeclare("exchange.dlx", BuiltinExchangeType.TOPIC, true, false, null);
        /*
         * Declare dead letter queue
         */
        channel.queueDeclare("queue.dlx", true, false, false, null);
        /*
         * Declare that the dead letter queue is bound to the dead letter switch
         */
        channel.queueBind("queue.dlx", "exchange.dlx", "#");

    }


Spring AMQP features

Asynchronous message listening container

Original implementation:Implement thread pool and callback methods by yourself,And register the callback method

Spring boot:Automatically implement configurable thread pool,And automatically register callback methods,Just implement the callback method

The native Rabbit template is provided to facilitate sending and receiving messages

comparison basicPublish,More powerful,It can automatically realize message conversion and other functions

Rabbitadmin is provided natively to facilitate queue and switch declaration

Declaratively provide the registration method of queue, switch and binding relationship

You don't even need explicit registration code

Spring boot Config supports RabbitMQ natively

make full use of Spring boots The contract is greater than the configured attribute

Can be built implicitly Connection, Channel

1.RabbitAdmin

The RabbitAdmin class is used to manage RabbitMQ

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

RebbitAdmin function

declareExchange: Create switch

deleteExchange: Delete switch

declareQueue: Create queue

deleteQueue: Delete queue

purgeQueue: Empty queue

declareBinding: New binding relationship

removeBinding: Delete binding relationship

getQueueProperties: Query queue properties

Configure RabbitMQ service

@Configuration
public class RabbitConfig {

    @Autowired
    public void initRabbit() {
        /*
         * Create connection
         */
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("ip");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        /*
         * Claim switch
         */
        Exchange exchange = new DirectExchange("exchange.test");
        rabbitAdmin.declareExchange(exchange);
        /*
         * Declaration queue
         */
        Queue queue = new Queue("queue.test");
        rabbitAdmin.declareQueue(queue);
        /*
         * Declare binding relationship
         */
        Binding binding = new Binding(
                "queue.test",
                Binding.DestinationType.QUEUE,
                "exchange.test",
                "key.test",
                null);
        rabbitAdmin.declareBinding(binding);
    }
}

Simplify configuring RabbitMQ services

@Configuration
public class RabbitConfig {


    @Bean
    public Exchange exchange1() {
        return new DirectExchange("exchange.test");
    }

    @Bean
    public Queue queue1() {
        return new Queue("queue.test");
    }

    @Bean
    public Binding binding1() {
        return new Binding(
                "queue.test",
                Binding.DestinationType.QUEUE,
                "exchange.test",
                "key.test",
                null);
    }


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        // After declarative configuration, it needs to be called manually once, that is, it will be initialized only when it is used. For example, calling the connection method triggers the initialization of the switch queue
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // Initialize switch queue, etc
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

2.RabbitTemplate sends a message

use RabbitTemplate send message

Rabbittemplate It provides rich functions,Convenient messaging

Rabbittemplate Configuration can be explicitly passed in or implicitly declared

Declare RabbitTemplate Bean

@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        // After declarative configuration, it needs to be called manually once, that is, it will be initialized only when it is used. For example, calling the connection method triggers the initialization of the switch queue
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

Send a message using the send method

@Slf4j
@Component
public class SendMsg {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(String msg) {
   		log.info("send Method to send a message:{}",msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("1000");
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.send("exchange.test", "queue.test", message);
    }
}

Send a message using the convertAndSend method

@Slf4j
@Component
public class SendMsg {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public void convertAndSend(String msg) {
    	log.info("convertAndSend Method to send a message:{}",msg);
        rabbitTemplate.convertAndSend("exchange.test", "queue.test", msg);
    }
}

Perform test

    @Test
    public void test() {
        sendMsg.send("hello world");
        sendMsg.convertAndSend("hello world");
    }
  INFO 13840 --- [           main] com.example.springboot.config.SendMsg    : send Method to send a message:hello world
  INFO 13840 --- [           main] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:hello world

3. Sender confirmation and message return confirmation

RabbitTemplate sets the sender's confirmation and message return methods

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //Enable message return mechanism
        rabbitTemplate.setMandatory(true);
        //Message confirmation
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("correlationData:{}, ack:{}, cause{}", correlationData, ack, cause);
            }
        });
        // Message return
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey{}", message, replyCode, replyText, exchange, routingKey);
            }
        });
        return rabbitTemplate;
    }

When the message is sent again, it is found that the sender's confirmation and message return methods are not executed

Check the source code and find that the publisherConfirms property should be set to TRUE when creating a connection

public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) {
		this.publisherConfirms = connectionFactory.isPublisherConfirms();
		this.confirmsOrReturnsCapable =
				this.publisherConfirms || connectionFactory.isPublisherReturns();
	}

Modify ConnectionFactory

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("IP ");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        // After declarative configuration, it can be called manually once, otherwise it will be initialized when used, such as calling the connection method to trigger the initialization of the switch queue
        connectionFactory.createConnection();
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

Create a service and call to send a message (the callback using springBootTest does not take effect)

@RestController
public class TestController {
    @Autowired
    private SendMsg sendMsg;

    @GetMapping("/test")
    public String test(){
        sendMsg.send("hello world");
        sendMsg.convertAndSend("hello world");
        return "send success";
    }
}
  INFO 14576 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:hello world
  INFO 14576 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:hello world
  INFO 14576 --- [nectionFactory2] c.e.springboot.config.RabbitConfig       : correlationData:null, ack:true, causenull
  INFO 14576 --- [nectionFactory1] c.e.springboot.config.RabbitConfig       : correlationData:null, ack:true, causenull

CachingConnectionFactory.ConfirmType.SIMPLE mode cannot determine which message is confirmed

Modify ConnectionFactory

 connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

Carry CorrelationData when sending messages

    public void convertAndSend(String msg) {
        log.info("convertAndSend Method to send a message:{}",msg);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("123456");
        rabbitTemplate.convertAndSend("exchange.test", "key.test", msg,correlationData);
    }
  INFO 14576 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:hello world
  INFO 14576 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:hello world
  INFO 14576 --- [nectionFactory1] c.e.springboot.config.RabbitConfig       : correlationData:null, ack:true, causenull
  INFO 14576 --- [nectionFactory1] c.e.springboot.config.RabbitConfig       : correlationData:CorrelationData [id=123456], ack:true, causenull

4. Message listening container

SimpleMessageListenerContainer is a simple message listener container

characteristic

Set simultaneous listening to multiple queues, automatic start and automatic configuration RabbitMQ

Set the number of consumers (maximum quantity, minimum quantity, batch consumption)

Set the message confirmation mode, whether to return to the queue, and exception capture

Set exclusive, other consumer properties, etc

Set specific listeners, message converters, etc

Support dynamic settings and modify listener configuration during operation

Using listening containers

 @Bean
    public SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // Listening queue
        messageListenerContainer.setQueueNames("queue.test");
        // Concurrent consumption threads
        messageListenerContainer.setConcurrentConsumers(2);
        // Maximum concurrent consumption thread
        messageListenerContainer.setMaxConcurrentConsumers(5);
        
        // Message confirmation method
        //messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //Manual confirmation
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        /*-------------------Message listening method I-----------------------------*/
/*      
        // Message listening
        messageListenerContainer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        log.info("Message listening, message:{}", message.toString());
                    }
                });*/

        /*-------------------Message listening mode 2-----------------------------*/
       
        messageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                log.info("Message listening,message:{}", message.toString());
                log.info("news Ack,DeliveryTag:{}", message.getMessageProperties().getDeliveryTag());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });

        // Consumer end current limiting
        messageListenerContainer.setPrefetchCount(2);
        
        return messageListenerContainer;
    }

Send a message and start listening

  INFO 2544 --- [enerContainer-2] c.e.springboot.config.RabbitConfig       : Message listening,message:(Body:'hello world' MessageProperties [headers={spring_listener_return_correlation=85d4df83-af4b-426c-aed9-094d9272c9e9, spring_returned_message_correlation=123456}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.test, receivedRoutingKey=key.test, deliveryTag=5, consumerTag=amq.ctag-vZcoVv0Jx3Vgm07hQ8iDFA, consumerQueue=queue.test])
  INFO 2544 --- [enerContainer-2] c.e.springboot.config.RabbitConfig       : news Ack,DeliveryTag:5

5. Customize message listening container

The message listener adapter can be used to implement custom message listening

Simple mode

Define a handleMessage method. This method is used to listen to messages and is also a business processing callback. The method name must be handleMessage

@Slf4j
@Component
public class MsgService {

    public void handleMessage(byte[] body){
        String msg = new String(body);
        log.info("Message listening,implement handleMessage(),msg:{}",msg);
    }
}
    @Autowired
    private MsgService msgService;

	@Bean
    public SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // Listening queue
        messageListenerContainer.setQueueNames("queue.test");
        // Concurrent consumption threads
        messageListenerContainer.setConcurrentConsumers(2);
        // Maximum concurrent consumption thread
        messageListenerContainer.setMaxConcurrentConsumers(5);
        
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        // Consumer end current limiting
        messageListenerContainer.setPrefetchCount(2);

        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setDelegate(msgService);
        messageListenerContainer.setMessageListener(messageListenerAdapter);

        return messageListenerContainer;
    }

Perform test

  INFO 14272 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:hello world
  INFO 14272 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:hello world
  INFO 14272 --- [enerContainer-2] c.example.springboot.config.MsgService   : Message listening,implement handleMessage(),msg:hello world

Why must the method name be: handleMessage?

MessageListenerAdapter class, see getListenerMethodName() in onMessage() method

	@Override
	public void onMessage(Message message, Channel channel) throws Exception { // NOSONAR
		// Check whether the delegate is a MessageListener impl itself.
		// In that case, the adapter will simply act as a pass-through.
		Object delegateListener = getDelegate();
		if (delegateListener != this) {
			if (delegateListener instanceof ChannelAwareMessageListener) {
				((ChannelAwareMessageListener) delegateListener).onMessage(message, channel);
				return;
			}
			else if (delegateListener instanceof MessageListener) {
				((MessageListener) delegateListener).onMessage(message);
				return;
			}
		}

		// Regular case: find a handler method reflectively.
		Object convertedMessage = extractMessage(message);
		String methodName = getListenerMethodName(message, convertedMessage);
		if (methodName == null) {
			throw new AmqpIllegalStateException("No default listener method specified: "
					+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
					+ "override the 'getListenerMethodName' method.");
		}

		// Invoke the handler method with appropriate arguments.
		Object[] listenerArguments = buildListenerArguments(convertedMessage, channel, message);
		Object result = invokeListenerMethod(methodName, listenerArguments, message);
		if (result != null) {
			handleResult(new InvocationResult(result, null, null, null, null), message, channel);
		}
		else {
			logger.trace("No result object given - no result to handle");
		}
	}

getListenerMethodName() returns this defaultListenerMethod

	protected String getListenerMethodName(Message originalMessage, Object extractedMessage) {
		if (this.queueOrTagToMethodName.size() > 0) {
			MessageProperties props = originalMessage.getMessageProperties();
			String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
			if (methodName == null) {
				methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
			}
			if (methodName != null) {
				return methodName;
			}
		}
		return getDefaultListenerMethod();
	}
protected String getDefaultListenerMethod() {
		return this.defaultListenerMethod;
	}

Default method name: handleMessage

	public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";


	private Object delegate;

	private String defaultListenerMethod = ORIGINAL_DEFAULT_LISTENER_METHOD;

Higher order mode

Customize the mapping relationship between queue name and method name

Message listening and callback business processing class

@Slf4j
@Component
public class MsgService {

    public void test1(byte[] body){
        String msg = new String(body);
        log.info("Message listening,implement handleMessage(),msg:{}",msg);
    }
}
    @Autowired
    private MsgService msgService;

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // Listening queue
        messageListenerContainer.setQueueNames("queue.test");
        // Concurrent consumption threads
        messageListenerContainer.setConcurrentConsumers(2);
        // Maximum concurrent consumption thread
        messageListenerContainer.setMaxConcurrentConsumers(5);
        
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // Consumer end current limiting
        messageListenerContainer.setPrefetchCount(2);

        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setDelegate(msgService);

		// Customize the mapping relationship between queue name and method name
        HashMap<String, String> map = new HashMap<>();
        map.put("queue.test","test1");
//        map.put("queue.test2","test2");
        messageListenerAdapter.setQueueOrTagToMethodName(map);
        messageListenerContainer.setMessageListener(messageListenerAdapter);

        return messageListenerContainer;
    }

Perform test

  INFO 7100 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:hello world
  INFO 7100 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:hello world
  INFO 7100 --- [enerContainer-2] c.example.springboot.config.MsgService   : Message listening,implement handleMessage(),msg:hello world

Core this queueOrTagToMethodName. size() > 0

protected String getListenerMethodName(Message originalMessage, Object extractedMessage) {
		if (this.queueOrTagToMethodName.size() > 0) {
			MessageProperties props = originalMessage.getMessageProperties();
			String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
			if (methodName == null) {
				methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
			}
			if (methodName != null) {
				return methodName;
			}
		}
		return getDefaultListenerMethod();
	}

6.MessageConverter message conversion

MessageConverter Used to automatically convert messages when sending and receiving messages

Jackson2JsonMessageConverter Is the most commonly used MessageConverter

Used to convert Json Format message,coordination ClassMapper Can be directly converted to POJO object

Using Jackson2JsonMessageConverter

Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter());
messageListenerAdapter.setMessageConverter(messageConverter);

The receiving parameter of the message listening callback method is a LinkedHashMap

  public void test1(Map<String,Object> map){
        log.info("Message listening,implement handleMessage(),msg:{}",map);
    }

Using ClassMapper

  @Autowired
    private MsgService msgService;

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        // Listening queue
        messageListenerContainer.setQueueNames("queue.test");
        // Concurrent consumption threads
        messageListenerContainer.setConcurrentConsumers(2);
        // Maximum concurrent consumption thread
        messageListenerContainer.setMaxConcurrentConsumers(5);

        // Consumer end current limiting
        messageListenerContainer.setPrefetchCount(2);

        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setDelegate(msgService);

        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        messageConverter.setClassMapper(new ClassMapper() {
            @Override
            public void fromClass(Class<?> clazz, MessageProperties properties) {
            }

            @Override
            public Class<?> toClass(MessageProperties properties) {
                return UserDTO.class;
            }
        });
        messageListenerAdapter.setMessageConverter(messageConverter);
        HashMap<String, String> map = new HashMap<>();
        map.put("queue.test", "test1");
//        map.put("queue.test2","test2");
        messageListenerAdapter.setQueueOrTagToMethodName(map);
        messageListenerContainer.setMessageListener(messageListenerAdapter);

        return messageListenerContainer;
    }

When sending a message, convert the POJO object to JSON and then to byte object, otherwise there is a pit

@Autowired
ObjectMapper objectMapper = new ObjectMapper();
    
public void convertAndSend(String msg) throws JsonProcessingException {
        log.info("convertAndSend Method to send a message:{}", userDTO);
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("123456");
        String writeValueAsString = objectMapper.writeValueAsString(userDTO);
        rabbitTemplate.convertAndSend("exchange.test", "key.test", writeValueAsString.getBytes(), correlationData);
    }
  INFO 10636 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:UserDTO(id=1, name=MQ)
  INFO 10636 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:UserDTO(id=1, name=MQ)
  INFO 10636 --- [enerContainer-2] c.example.springboot.config.MsgService   : Message listening,implement handleMessage(),msg:UserDTO(id=1, name=MQ)
  INFO 10636 --- [enerContainer-1] c.example.springboot.config.MsgService   : Message listening,implement handleMessage(),msg:UserDTO(id=1, name=MQ)

7.RabbitListener

RabbitListener is the final scheme of message listening in SpringBoot. It uses annotation declaration and has no intrusion into business code. It can also be configured in the SpringBoot configuration file

@Use of RabbitListener annotation

@RabbitListener Is a composite annotation. The following annotations can be nested

@Exchange: Automatic declaration Exchange

@Queue: Automatic declaration queue

@QueueBinding: Automatically declare binding relationships

bean declaring RabbitListenerContainerFactory

   @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

Act on class

@Slf4j
@Component
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.test")
public class MsgService {
    @RabbitHandler(isDefault = true)
    public void test1(@Payload Message message) {
        log.info("Message listening,implement handleMessage(),msg:{}", new String(message.getBody()));
    }
}

Act on the method

  @RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.test")
    public void test1(@Payload Message message) {
        log.info("Message listening,implement handleMessage(),msg:{}", new String(message.getBody()));
    }

Automatically declare switch, queue, binding relationship, parameter attribute

@Slf4j
@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("ip");
        connectionFactory.setPort(5672);
        connectionFactory.setPassword("guest");
        connectionFactory.setUsername("guest");
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        // After declarative configuration, it can be called manually once, otherwise it will be initialized when used, such as calling the connection method to trigger the initialization of the switch queue
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //Enable message return mechanism
        rabbitTemplate.setMandatory(true);
        // Message return
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey{}", message, replyCode, replyText, exchange, routingKey);
            }
        });
        //Message confirmation
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("correlationData:{}, ack:{}, cause{}", correlationData, ack, cause);
            }
        });
        return rabbitTemplate;
    }



    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
}
@Slf4j
@Component
public class MsgService {
    @RabbitListener(containerFactory = "rabbitListenerContainerFactory", admin = "rabbitAdmin",
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "queue.test",
                                    arguments = {
                                            @Argument(name = "x-message-ttl", value = "1000", type = "java.lang.Integer"),
                                            @Argument(name = "x-dead-letter-exchange", value = "exchange.dlx"),
                                    }),
                            exchange = @Exchange(name = "exchange.test", type = ExchangeTypes.FANOUT),
                            key = "key.test"
                    ),
                    @QueueBinding(
                            value = @Queue(name = "exchange.dlx"),
                            exchange = @Exchange(name = "exchange.dlx", type = ExchangeTypes.DIRECT),
                            key = "key.dlx"
                    ),
            }
    )
    public void test1(@Payload Message message) {
        log.info("Message listening,implement handleMessage(),msg:{}", new String(message.getBody()));
    }
}

Delete existing switches and queues and restart the project

  INFO 11692 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:UserDTO(id=1, name=MQ)
  INFO 11692 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:UserDTO(id=1, name=MQ)
  INFO 11692 --- [nectionFactory1] c.e.springboot.config.RabbitConfig       : correlationData:null, ack:true, causenull
  INFO 11692 --- [ntContainer#0-1] c.example.springboot.config.MsgService: Message listening, execute handleMessage(),msg:{"id":1,"name":"MQ"}
  INFO 11692 --- [ntContainer#0-1] c.example.springboot.config.MsgService: Message listening, execute handleMessage(),msg:{"id":1,"name":"MQ"}
  INFO 11692 --- [nectionFactory1] c.e.springboot.config.RabbitConfig       : correlationData:CorrelationData [id=123456], ack:true, causenull

Comment / delete the RabbitConfig configuration class created in application Properties configure MQ information. If the SpringBoot convention is greater than the configuration, the settings related to the RabbitConfig configuration class will be set automatically

spring.rabbitmq.addresses=ip
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=6379

# Enable send confirmation
spring.rabbitmq.publisher-confirm-type=correlated
# Start sending failed return
spring.rabbitmq.publisher-returns=true
# Manual sign in
spring.rabbitmq.listener.simple.acknowledge-mode=manual

Add message confirmation and fallback listening

@Configuration
@Slf4j
public class MqConfig {

    /**
     * Enable the callback processing of confirm and return mechanisms
     *
     * @return RabbitTemplate
     */
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //Enable message return mechanism
        rabbitTemplate.setMandatory(true);
        // Message return
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey{}", message, replyCode, replyText, exchange, routingKey);
            }
        });
        //Message confirmation
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("correlationData:{}, ack:{}, cause{}", correlationData, ack, cause);
            }
        });
        return rabbitTemplate;
    }
}

Note admin = "rabbitAdmin", the bean created by spring does not have the entire name of rabbitAdmin

containerFactory = "rabbitListenerContainerFactory" can be annotated or not. In fact, this Bean will be created, which can be directly referenced or not referenced

@Slf4j
@Component
public class MsgService {


    @RabbitListener(
            //containerFactory = "rabbitListenerContainerFactory", admin = "rabbitAdmin",
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "queue.test",
                                    arguments = {
                                            @Argument(name = "x-message-ttl", value = "1000", type = "java.lang.Integer"),
                                            @Argument(name = "x-dead-letter-exchange", value = "exchange.dlx"),
                                    }),
                            exchange = @Exchange(name = "exchange.test", type = ExchangeTypes.FANOUT),
                            key = "key.test"
                    ),
                    @QueueBinding(
                            value = @Queue(name = "exchange.dlx"),
                            exchange = @Exchange(name = "exchange.dlx", type = ExchangeTypes.DIRECT),
                            key = "key.dlx"
                    ),
            }
    )
    public void test1(@Payload Message message, Channel channel) throws IOException {
        log.info("Message listening,implement handleMessage(),msg:{}", new String(message.getBody()));
        log.info("Prepare for signing in,Sign in message Id:{}",message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

Delete existing switches and queues and restart the project

  INFO 14028 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : send Method to send a message:UserDTO(id=1, name=MQ)
  INFO 14028 --- [nio-8888-exec-1] com.example.springboot.config.SendMsg    : convertAndSend Method to send a message:UserDTO(id=1, name=MQ)
  INFO 14028 --- [ntContainer#0-1] c.example.springboot.config.MsgService: Message listening, execute handleMessage(),msg:{"id":1,"name":"MQ"}
  INFO 14028 --- [ntContainer#0-1] c.example.springboot.config.MsgService: ready to execute sign in, sign in message Id:1
 INFO 14028 --- [ntContainer#0-1] c.example.springboot.config.MsgService: Message listening, execute handleMessage(),msg:{"id":1,"name":"MQ"}
  INFO 14028 --- [ntContainer#0-1] c.example.springboot.config.MsgService: ready to execute sign in, sign in message Id:2

Keywords: Java RabbitMQ Spring Spring Boot message queue

Added by lordtrini on Thu, 13 Jan 2022 03:50:46 +0200