[RabbitMQ] release and confirm how the advanced part has achieved the effect with Springboot

catalogue

What does the advanced part include?

Release confirmation of springboot version

Confirmation mechanism scheme

Code architecture diagram

Modify profile

Add configuration class

Message producer

Callback interface

Message consumer

result

Fallback message

Configuration yml file

Callback interface optimization

result

Backup switch

Code architecture diagram

Modify configuration class

Alarm consumers

Test precautions

result

summary

What does the advanced part include?

In the production environment, RabbitMQ is restarted due to some unknown reasons. During RabbitMQ restart, the delivery of producer messages fails, resulting in message loss, which needs to be processed and recovered manually. So we began to think about how to deliver RabbitMQ messages reliably? Especially in such extreme cases, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages?

Release confirmation of springboot version

Confirmation mechanism scheme

Code architecture diagram

Create a springboot project and quickly add RabbitMQ and web dependencies (omitted)

Modify profile

spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: 192.168.31.65
    port: 5672
    username: admin
    password: 123
    virtual-host: /
    publisher-confirm-type: correlated  #Callback method after successful publishing

Spring. Com needs to be added to the configuration file rabbitmq. publisher-confirm-type=correlated

Attribute value:

NONE: disable publishing confirmation mode, which is the default value

CORRELATED: the callback method will be triggered after the message is successfully published to the exchange

SIMPLE: it is not used much, which is equivalent to answering every time, with obvious disadvantages

Add configuration class

/**
 * Publish confirmation (Advanced) configuration class
 */
@Configuration
public class ConfirmConfig {

    //Switch
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    //queue
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    //routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";
    //Declaration switch
    @Bean
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).
                withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }
    //Declaration queue
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    //binding
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmqueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmexchange) {
        return BindingBuilder.bind(confirmqueue).to(confirmexchange).with(CONFIRM_ROUTING_KEY);
    }
}

Message producer

/**
 * Start sending message test confirmation (Advanced release confirmation)
 */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProduceController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                "key1", message + "key1", correlationData);
        log.info("The content of the message sent is:{}", message);

        //The queue does not exist. It is deliberately written incorrectly to simulate that the queue cannot receive messages
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                "key2", message + "key2", correlationData2);
        log.info("The content of the message sent is:{}", message);
    }
}

Callback interface

@Component
@Slf4j
public class MyCallback implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //Inject the implementation class into the internal interface
    @PostConstruct //injection
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * Switch confirmation callback method
     *
     * @param correlationData Save the id and related information of the callback message
     * @param ack             Indicates that the success of the message received by the switch is true and the failure is false
     * @param cause           Success is null and failure is the reason for failure
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("Switch has received id by:{}News of", id);
        } else {
            log.info("The switch hasn't received it yet id by:{}news,For reasons:{}", id, cause);
        }

    }
}

Message consumer

/**
 * Publish confirmation advanced: receive message
 */
@Component
@Slf4j
public class ConfirmConsume {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirm(Message message) {
        log.info("Received queue messages:{}", new String(message.getBody()));
    }
}

result

It can be seen that two messages were sent. The RoutingKey of the first message is "key1" and the RoutingKey of the second message is "key2". Both messages were successfully received by the switch and the confirmation callback of the switch was also received. However, the consumer only received one message because the RoutingKey of the second message is inconsistent with the BindingKey of the queue, No other queue can receive this message, and all the second messages are directly discarded.  

Fallback message

When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message. If it is found that the message cannot be routed, the message will be directly discarded. At this time, the producer does not know that the message is discarded. So how can I help me deal with messages that cannot be routed? At least let me know. I can handle it myself. By setting the mandatory parameter, you can return the message to the producer when the destination cannot be reached during message delivery.

Configuration yml file

rabbitmq:
    host: 192.168.31.65
    port: 5672
    username: admin
    password: 123
    virtual-host: /
    publisher-confirm-type: correlated  #Callback method after successful publishing
    publisher-returns: true #Fallback after failed to open message routing

Callback interface optimization

@Component
@Slf4j
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //Inject the implementation class into the internal interface
    @PostConstruct //injection
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * Switch confirmation callback method
     *
     * @param correlationData Save the id and related information of the callback message
     * @param ack             Indicates that the success of the message received by the switch is true and the failure is false
     * @param cause           Success is null and failure is the reason for failure
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("Switch has received id by:{}News of", id);
        } else {
            log.info("The switch hasn't received it yet id by:{}news,For reasons:{}", id, cause);
        }

    }

    /**
     * The fallback method returns the message to the producer when an error occurs in the message delivery process, resulting in inaccessibility
     * Only failed methods can call this method
     *
     * @param message   news
     * @param replyCode Error code
     * @param replyText Error message
     * @param exchange  Switch
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("news:{}Switched:{}Back, the reason for back is:{},routingKey yes:{}",
                new String(message.getBody()), exchange, replyText, routingKey);
    }
}

result

Backup switch

In RabbitMQ, there is a mechanism to back up the switch.

What is a backup switch? The backup switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a switch, we create a spare tire for it. When the switch receives a non routable message, it will forward the message to the backup switch for forwarding and processing, Usually, the backup switch is Fanout, so that all messages can be delivered to the queue bound to it. Then we bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter this queue. Of course, we can also establish an alarm queue to monitor and alarm with independent consumers.

Code architecture diagram

Modify configuration class

/**
 * Publish confirmation (Advanced) configuration class
 */
@Configuration
public class ConfirmConfig {

    //Switch
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    //queue
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    //routingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //Backup switch
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    //Backup queue
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    //Alarm switch
    public static final String WARNING_QUEUE_NAME = "warning.queue";
    //Declaration switch
    @Bean
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).
                withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }
    //Declaration queue
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    //binding
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmqueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmexchange) {
        return BindingBuilder.bind(confirmqueue).to(confirmexchange).with(CONFIRM_ROUTING_KEY);
    }
    //Create backup switch
    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    //Create queue
    @Bean
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
    //Alarm queue
    @Bean
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
    //binding
    @Bean
    public Binding backupBinding(@Qualifier("backupExchange") FanoutExchange fanoutExchange,
                                 @Qualifier("backupQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean
    public Binding warningBinding(@Qualifier("backupExchange") FanoutExchange fanoutExchange,
                                  @Qualifier("warningQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

Alarm consumers

/**
 * Alarm: Consumer
 */
@Component
@Slf4j
public class warnConsume {
    //Receive alarm message
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveErrMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("call the police! Non routable message found:{}", msg);
    }
}

Test precautions

When restarting the project, you need to change the original confirm Exchange is deleted because we have modified its binding properties, otherwise an error will be reported

result

Summary

When the mandatory parameter and the backup switch can be used together, if they are turned on at the same time, where will the message go? Who has the highest priority? According to the above results, the answer is that the backup switch has the highest priority.

Keywords: Java RabbitMQ Spring Boot

Added by nando on Wed, 02 Feb 2022 18:28:21 +0200