catalogue
What does the advanced part include?
Release confirmation of springboot version
Callback interface optimization
What does the advanced part include?
In the production environment, RabbitMQ is restarted due to some unknown reasons. During RabbitMQ restart, the delivery of producer messages fails, resulting in message loss, which needs to be processed and recovered manually. So we began to think about how to deliver RabbitMQ messages reliably? Especially in such extreme cases, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages?
Release confirmation of springboot version
Confirmation mechanism scheme
Code architecture diagram
Create a springboot project and quickly add RabbitMQ and web dependencies (omitted)
Modify profile
spring: application: name: rabbitmq-springboot rabbitmq: host: 192.168.31.65 port: 5672 username: admin password: 123 virtual-host: / publisher-confirm-type: correlated #Callback method after successful publishing
Spring. Com needs to be added to the configuration file rabbitmq. publisher-confirm-type=correlated
Attribute value:
NONE: disable publishing confirmation mode, which is the default value
CORRELATED: the callback method will be triggered after the message is successfully published to the exchange
SIMPLE: it is not used much, which is equivalent to answering every time, with obvious disadvantages
Add configuration class
/** * Publish confirmation (Advanced) configuration class */ @Configuration public class ConfirmConfig { //Switch public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; //queue public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //routingKey public static final String CONFIRM_ROUTING_KEY = "key1"; //Declaration switch @Bean public DirectExchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true). withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build(); } //Declaration queue @Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //binding @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmqueue, @Qualifier("confirmExchange") DirectExchange confirmexchange) { return BindingBuilder.bind(confirmqueue).to(confirmexchange).with(CONFIRM_ROUTING_KEY); } }
Message producer
/** * Start sending message test confirmation (Advanced release confirmation) */ @RestController @RequestMapping("/confirm") @Slf4j public class ProduceController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "key1", message + "key1", correlationData); log.info("The content of the message sent is:{}", message); //The queue does not exist. It is deliberately written incorrectly to simulate that the queue cannot receive messages CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "key2", message + "key2", correlationData2); log.info("The content of the message sent is:{}", message); } }
Callback interface
@Component @Slf4j public class MyCallback implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; //Inject the implementation class into the internal interface @PostConstruct //injection public void init() { rabbitTemplate.setConfirmCallback(this); } /** * Switch confirmation callback method * * @param correlationData Save the id and related information of the callback message * @param ack Indicates that the success of the message received by the switch is true and the failure is false * @param cause Success is null and failure is the reason for failure */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("Switch has received id by:{}News of", id); } else { log.info("The switch hasn't received it yet id by:{}news,For reasons:{}", id, cause); } } }
Message consumer
/** * Publish confirmation advanced: receive message */ @Component @Slf4j public class ConfirmConsume { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirm(Message message) { log.info("Received queue messages:{}", new String(message.getBody())); } }
result
It can be seen that two messages were sent. The RoutingKey of the first message is "key1" and the RoutingKey of the second message is "key2". Both messages were successfully received by the switch and the confirmation callback of the switch was also received. However, the consumer only received one message because the RoutingKey of the second message is inconsistent with the BindingKey of the queue, No other queue can receive this message, and all the second messages are directly discarded.
Fallback message
When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message. If it is found that the message cannot be routed, the message will be directly discarded. At this time, the producer does not know that the message is discarded. So how can I help me deal with messages that cannot be routed? At least let me know. I can handle it myself. By setting the mandatory parameter, you can return the message to the producer when the destination cannot be reached during message delivery.
Configuration yml file
rabbitmq: host: 192.168.31.65 port: 5672 username: admin password: 123 virtual-host: / publisher-confirm-type: correlated #Callback method after successful publishing publisher-returns: true #Fallback after failed to open message routing
Callback interface optimization
@Component @Slf4j public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; //Inject the implementation class into the internal interface @PostConstruct //injection public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * Switch confirmation callback method * * @param correlationData Save the id and related information of the callback message * @param ack Indicates that the success of the message received by the switch is true and the failure is false * @param cause Success is null and failure is the reason for failure */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("Switch has received id by:{}News of", id); } else { log.info("The switch hasn't received it yet id by:{}news,For reasons:{}", id, cause); } } /** * The fallback method returns the message to the producer when an error occurs in the message delivery process, resulting in inaccessibility * Only failed methods can call this method * * @param message news * @param replyCode Error code * @param replyText Error message * @param exchange Switch */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("news:{}Switched:{}Back, the reason for back is:{},routingKey yes:{}", new String(message.getBody()), exchange, replyText, routingKey); } }
result
Backup switch
In RabbitMQ, there is a mechanism to back up the switch.
What is a backup switch? The backup switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a switch, we create a spare tire for it. When the switch receives a non routable message, it will forward the message to the backup switch for forwarding and processing, Usually, the backup switch is Fanout, so that all messages can be delivered to the queue bound to it. Then we bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter this queue. Of course, we can also establish an alarm queue to monitor and alarm with independent consumers.
Code architecture diagram
Modify configuration class
/** * Publish confirmation (Advanced) configuration class */ @Configuration public class ConfirmConfig { //Switch public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; //queue public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //routingKey public static final String CONFIRM_ROUTING_KEY = "key1"; //Backup switch public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; //Backup queue public static final String BACKUP_QUEUE_NAME = "backup.queue"; //Alarm switch public static final String WARNING_QUEUE_NAME = "warning.queue"; //Declaration switch @Bean public DirectExchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true). withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build(); } //Declaration queue @Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } //binding @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmqueue, @Qualifier("confirmExchange") DirectExchange confirmexchange) { return BindingBuilder.bind(confirmqueue).to(confirmexchange).with(CONFIRM_ROUTING_KEY); } //Create backup switch @Bean public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //Create queue @Bean public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } //Alarm queue @Bean public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } //binding @Bean public Binding backupBinding(@Qualifier("backupExchange") FanoutExchange fanoutExchange, @Qualifier("backupQueue") Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean public Binding warningBinding(@Qualifier("backupExchange") FanoutExchange fanoutExchange, @Qualifier("warningQueue") Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); } }
Alarm consumers
/** * Alarm: Consumer */ @Component @Slf4j public class warnConsume { //Receive alarm message @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveErrMsg(Message message) { String msg = new String(message.getBody()); log.error("call the police! Non routable message found:{}", msg); } }
Test precautions
When restarting the project, you need to change the original confirm Exchange is deleted because we have modified its binding properties, otherwise an error will be reported
result
Summary
When the mandatory parameter and the backup switch can be used together, if they are turned on at the same time, where will the message go? Who has the highest priority? According to the above results, the answer is that the backup switch has the highest priority.