RabbitMQ prevents message loss

Message loss scenario

There are three main scenarios for MQ message loss:
1. The rabbitmq server does not receive the message sent by the message producer; Cause message loss
2.rabbitmq does not persist the message after receiving it, resulting in message loss
3. After receiving the message, the consumer did not have time to deal with it, and the consumer went down, resulting in the loss of the message

1. The message sent by the producer was not sent to the rabbit switch

Solution: Message asynchronous confirmation mechanism (confirm mechanism)
In the integrated springboot project; Enable the confirm mechanism through the configuration file

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123123
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.connection-timeout=15000

#Enable confirm confirmation mechanism
spring.rabbitmq.publisher-confirms=true
#Turn on the return confirmation mechanism
spring.rabbitmq.publisher-returns=true
#When set to true, the consumer will be monitored by return if the message is not routed to the appropriate queue and will not be deleted automatically
spring.rabbitmq.template.mandatory=true

After the confirm mechanism is enabled, the callback code will be called every time the producer sends a message; Developers need to write the logic of the callback function to deal with the failed message

@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * confirm The mechanism only guarantees that messages arrive at the exchange, but does not guarantee that messages can be routed to the correct queue
     * @param correlationData Information of the message sent (switch, route, message body, etc.)
     * @param ack true Success, false, failure
     * @param cause Error message
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    	// Failure, the general solution is to store the sending failure message in the scheduled task queue; Try to resend the message; Fail again,
    	// It will no longer be sent and will be processed manually
        if (!ack) {
            log.error("rabbitmq confirm fail,cause:{}", cause);
            // ... failure handling logic
        }
    }
}

2. The switch was not sent to the queue

Solution: Return mode to ensure that messages are sent from the switch to the queue.
1. Enable return mode

#Turn on the return mechanism
spring.rabbitmq.publisher-returns=true

2. Develop callback function code

@Component
public class Sender implements RabbitTemplate.ReturnCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }
 
    //By implementing the ReturnCallback interface, it will be triggered if the message fails to be sent from the switch to the corresponding queue (for example, it will be triggered if the queue cannot be found according to the routingKey specified when sending the message)
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("Message body message: " + message);
		System.out.println("news replyCode: " + replyCode);
		System.out.println("describe: " + replyText);
		System.out.println("Switch used by message exchange: " + exchange);
		System.out.println("The routing key used by the message routing: " + routingKey);
	}
}

3. The switch, queue and message are not persistent

Switches, queues and messages are not persistent. When rabbitmq's service is restarted, these information will be lost.

Switch persistence
Set the persistence property when declaring the switch

	/**
	 * Description of construction parameters:
	 * Parameter 1: switch name
	 * Parameter 2: durable: true indicates persistence, and false indicates non persistence
	 * Parameter 3: autoDelete: true auto delete, false not auto delete
	 */
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchangeName", true, false);
    }

Queue persistence
Set the persistence property when declaring the queue

    public Queue queue() {
    	/**
    	 * @param queueName Queue name
    	 * @param durable Queue persistence, true persistence, false non persistence
    	 * @param exclusive Whether exclusive, true not exclusive, false exclusive; General false is configured here
    	 * @param autoDelete Whether to delete automatically. If there is no producer, the queue will be deleted automatically
    	 * @param args Queue parameters
    	 */
        return new Queue("queueName", true, false, false, args);
    }

Message persistence

Message persistence is persistent by default. No configuration required

4. The consumer received the message and did not execute the business logic, resulting in the loss of the message

Solution: manual confirmation message mechanism
Profile configuration

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: root
    password: 123123
    virtual-host: /test
    publisher-confirms: true   # Enable send confirmation
    publisher-returns: true  # Enable send failure fallback
    
    #Enable ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #Take manual response
        #concurrency: 1 # Specify the minimum number of consumers
        #max-concurrency: 1 #Specify the maximum number of consumers
        retry:
          enabled: true # Do you support retry

Consumer code

@Component
public class Consumer {
	@RabbitHandler
	public void consumeMsg(String msg, Channel channel, Message message) throws IOException {
		//Get the message and delay consumption
		try {
			// ... consumption message business logic

			/**
			 * deliveryTag	Random label information of the message
			 * multiple	Batch; true indicates that the value less than deliveryTag will be ack ed at one time
			 */
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			
		} catch (InterruptedException e) {
			e.printStackTrace();
			/**
			 * deliveryTag	Random label information of the message
			 * multiple	Batch; true indicates that the value less than deliveryTag will be ack ed at one time
			 * requeue	Is the rejected message re queued
			 */
			channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
		}
	}
}

When there is an unexpected business; The message will return to the queue; It will be distributed to other normal consumer s for consumption

Keywords: Java RabbitMQ Distribution

Added by shorty114 on Thu, 28 Oct 2021 11:28:11 +0300