RabbitMQ introduction series 4 -- message confirmation mechanism

1 problem introduction

When using RabbitMQ, we may encounter such a problem that the producer does not know whether the message has arrived at the server after sending the message, which is a mystery to the producer. By default, the producer does not receive any response. So if we want to know where the news is going, what should we do? As a result, RabbitMQ's message confirmation mechanism came out.

Two mechanisms of message confirmation

2.1 transaction mechanism

Note that after sending a message, the transaction mechanism will block the sender until RabbitMQ responds to send the next message. Therefore, the performance of the transaction mechanism is poor, which will cause the low throughput of RabbitMQ.

2.2 confirm mechanism

Compared with transaction mechanism, confirm mechanism is more lightweight.

We need to note that the transaction mechanism and confirm mechanism are mutually exclusive and cannot have both. This blog will focus on the confirm mechanism.

3 code

Next, we use the confirm mechanism to confirm the message. Note that we use the manual ack to confirm whether the message is received correctly (the default is to use the automatic ACK). If we choose manual ack but do not send ack to RabbitMQ, it may lead to some serious consequences.

No explanation, just code first

3.1 application.properties

server.port: 8080
spring.application.name: provider
spring.rabbitmq.host: 127.0.0.1
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
spring.rabbitmq.virtual-host: /

# Turn on the confirm mechanism
spring.rabbitmq.publisher-confirms: true
# open return Acknowledgement mechanism
spring.rabbitmq.publisher-returns: true
# Manual response
spring.rabbitmq.listener.simple.acknowledge-mode: manual
# Specify the minimum number of consumers
spring.rabbitmq.listener.simple.concurrency: 1
# Specify the maximum number of consumers
spring.rabbitmq.listener.simple.max-concurrency: 1
# Whether retry is supported
spring.rabbitmq.listener.simple.retry.enabled: true

3.2 configuration

package com.example.provider.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Direct connected switch
 * @author 30309
 *
 */
@Configuration
public class DirectRabbitConfig {

	//Queue named DirectQueue
    @Bean
    public Queue DirectQueue() {
        return new Queue("DirectQueue",true);  //true indicates whether it is persistent 
    }
 
    //Direct connect switch named DirectExchange
    @Bean
    DirectExchange DirectExchange() {
        return new DirectExchange("DirectExchange");
    }
 
    //Bind queue to switch and set to match key: DirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
    	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // Message sending failure returned to the queue, publisher returns needs to be configured for the profile: true
        rabbitTemplate.setMandatory(true);
 
        // Message return, the configuration file needs to configure publisher returns: true
        // The ReturnCallback interface is used to implement the callback when a message is sent to the RabbitMQ switch, but there is no corresponding queue bound to the switch.
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("Message sending failed:No corresponding queue bound to switch");
        });
 
        // Message confirmation, the configuration file needs to configure publisher confirms: true
        // The ConfirmCallback interface is used to receive ack callbacks after messages are sent to the RabbitMQ exchanger.
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("Message sent successfully:Message sent to RabbitMQ exchanger");
            } else {
                System.out.println("Message sending failed:Message not sent to RabbitMQ exchanger");
            }
        });
 
        return rabbitTemplate;
    }
}


3.3 producers

package com.example.provider.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * Producer
 * @author 30309
 *
 */
@RestController
public class SendMessageController{
 
    @Autowired
    RabbitTemplate rabbitTemplate; 
 
    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
    	
        //Send message with binding key value DirectRouting to switch DirectExchange
    	rabbitTemplate.convertAndSend("DirectExchange", "DirectRouting", "Hello World");
    	
        return "ok";
    }
 
}

3.4 consumers

package com.example.consumer.receiver;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

/**
 * Consumer 1
 * @author 30309
 *
 */
@Component
public class DirectReceiver1 {
 
	@RabbitListener(queues = "DirectQueue")//The name of the listening queue is DirectQueue
    @RabbitHandler
    public void process(String str,Channel channel, Message message) {
        System.out.println("DirectReceiver1 Consumer receives message: " + str );

        
		try {
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			//Deny the news, be denied, then rejoin the team and be consumed again
			//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
			//Reject the message, the message will be discarded and will not be returned to the queue
			//channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

Note that when we use basicAck, we need to pass two parameters. The first parameter indicates the unique ID. when the second parameter is true, we can confirm all messages whose unique ID is less than or equal to the incoming value at one time (that is, batch processing manual confirmation is used to reduce network traffic).

Unique ID: after the consumer registers with RabbitMQ, a Channel will be established. When RabbitMQ pushes a message to the consumer, it will carry an ID, which represents the unique ID of the message delivered by RabbitMQ to the Channel, and it is a monotonically increasing positive integer. Note that the range of unique identity IDS is limited to channels.

3.5 test results

Producer:

Consumer:

Prove our test is successful

4 Summary

  1. A message will be removed from the message queue after it is correctly received by consumers. If a queue is not subscribed by any consumers, the messages in the queue will be cached all the time.
  2. If the message is correctly received through ACK, every message needs to be acknowledged. Among them, acknowledgement can be divided into manual ACK and automatic ack. Automatic ack mode is used by default.
  3. Automatic ack can confirm the message immediately after it is sent to the consumer, but there is also the possibility of losing the message (the consumer throws an exception). Manual ack means that the consumer calls ACK, nack and reject to confirm, and can perform some operations after the business fails.
  4. If the message is not acked, it will be sent to the next consumer. If a service forgets the ACK, RabbitMQ will think that the processing capacity of the service is limited and will not send data to it.

Reference resources: Spring boot integrates rabbitmq and supports message confirmation mechanism
RabbitMQ: message sending confirmation and message receiving confirmation (ACK)

Keywords: RabbitMQ Spring Java less

Added by gin on Mon, 28 Oct 2019 05:20:21 +0200