Message loss scenario
There are three main scenarios for MQ message loss:
1. The rabbitmq server does not receive the message sent by the message producer; Cause message loss
2.rabbitmq does not persist the message after receiving it, resulting in message loss
3. After receiving the message, the consumer did not have time to deal with it, and the consumer went down, resulting in the loss of the message
1. The message sent by the producer was not sent to the rabbit switch
Solution: Message asynchronous confirmation mechanism (confirm mechanism)
In the integrated springboot project; Enable the confirm mechanism through the configuration file
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=123123 spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=15000 #Enable confirm confirmation mechanism spring.rabbitmq.publisher-confirms=true #Turn on the return confirmation mechanism spring.rabbitmq.publisher-returns=true #When set to true, the consumer will be monitored by return if the message is not routed to the appropriate queue and will not be deleted automatically spring.rabbitmq.template.mandatory=true
After the confirm mechanism is enabled, the callback code will be called every time the producer sends a message; Developers need to write the logic of the callback function to deal with the failed message
@Component @Slf4j public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); } /** * confirm The mechanism only guarantees that messages arrive at the exchange, but does not guarantee that messages can be routed to the correct queue * @param correlationData Information of the message sent (switch, route, message body, etc.) * @param ack true Success, false, failure * @param cause Error message */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // Failure, the general solution is to store the sending failure message in the scheduled task queue; Try to resend the message; Fail again, // It will no longer be sent and will be processed manually if (!ack) { log.error("rabbitmq confirm fail,cause:{}", cause); // ... failure handling logic } } }
2. The switch was not sent to the queue
Solution: Return mode to ensure that messages are sent from the switch to the queue.
1. Enable return mode
#Turn on the return mechanism spring.rabbitmq.publisher-returns=true
2. Develop callback function code
@Component public class Sender implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); } //By implementing the ReturnCallback interface, it will be triggered if the message fails to be sent from the switch to the corresponding queue (for example, it will be triggered if the queue cannot be found according to the routingKey specified when sending the message) @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("Message body message: " + message); System.out.println("news replyCode: " + replyCode); System.out.println("describe: " + replyText); System.out.println("Switch used by message exchange: " + exchange); System.out.println("The routing key used by the message routing: " + routingKey); } }
3. The switch, queue and message are not persistent
Switches, queues and messages are not persistent. When rabbitmq's service is restarted, these information will be lost.
Switch persistence
Set the persistence property when declaring the switch
/** * Description of construction parameters: * Parameter 1: switch name * Parameter 2: durable: true indicates persistence, and false indicates non persistence * Parameter 3: autoDelete: true auto delete, false not auto delete */ @Bean public TopicExchange exchange() { return new TopicExchange("exchangeName", true, false); }
Queue persistence
Set the persistence property when declaring the queue
public Queue queue() { /** * @param queueName Queue name * @param durable Queue persistence, true persistence, false non persistence * @param exclusive Whether exclusive, true not exclusive, false exclusive; General false is configured here * @param autoDelete Whether to delete automatically. If there is no producer, the queue will be deleted automatically * @param args Queue parameters */ return new Queue("queueName", true, false, false, args); }
Message persistence
Message persistence is persistent by default. No configuration required
4. The consumer received the message and did not execute the business logic, resulting in the loss of the message
Solution: manual confirmation message mechanism
Profile configuration
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring: rabbitmq: host: localhost port: 5672 username: root password: 123123 virtual-host: /test publisher-confirms: true # Enable send confirmation publisher-returns: true # Enable send failure fallback #Enable ack listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual #Take manual response #concurrency: 1 # Specify the minimum number of consumers #max-concurrency: 1 #Specify the maximum number of consumers retry: enabled: true # Do you support retry
Consumer code
@Component public class Consumer { @RabbitHandler public void consumeMsg(String msg, Channel channel, Message message) throws IOException { //Get the message and delay consumption try { // ... consumption message business logic /** * deliveryTag Random label information of the message * multiple Batch; true indicates that the value less than deliveryTag will be ack ed at one time */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (InterruptedException e) { e.printStackTrace(); /** * deliveryTag Random label information of the message * multiple Batch; true indicates that the value less than deliveryTag will be ack ed at one time * requeue Is the rejected message re queued */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
When there is an unexpected business; The message will return to the queue; It will be distributed to other normal consumer s for consumption