Message Middleware--RabbitMQ Summary

Basic knowledge

Why use message middleware

decoupling

In applications with multiple systems, it is common that system A affects B, C, D data. The usual practice is to call the interfaces of other systems in A, so that the dependencies between systems are too large, and if new systems are added later, you need to add appropriate logic in A. This is too coupled for maintenance.

After joining MQ, system A does not need to add calls from other systems, just send messages, other systems listen for messages, and process them in their own systems. Adding or deleting code does not require changing system A, just canceling this type of message listening in your own.

asynchronous

Many times it involves the invocation between multiple services. Clients make requests, and A calls back the interface of B, C, D, and finally returns the execution result to the client. In such a process, the execution time of interface A receives the influence of other services, which is the sum of their execution time. If A does not care about the execution of B, C, D, then MQ can be used. A Sends a message and returns it directly, thereby increasing the response time of the interface.

Peak Clipping


When the system is facing a large number of requests, it will put a lot of pressure on the database. After the introduction of MQ, you can take a certain amount of data from MQ each time, according to the actual processing capacity of the database, and process and extract from it.

Producer/Consumer

Installation of RabbitMQ

Normal Installation

Go directly to the official website to download the installation package.
https://www.rabbitmq.com/

docker installation

// Pull out the mirror
docker pull rabbitmq

// Start Container
docker run -it --name rabbitmq  \ 
-e RABBITMQ_DEFAULT_USER=admin \ 
-e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672  \ 
-d  rabbitmq

// Enter container to open management interface
docker exec -it rabbitmq sh
//Open management interface
rabbitmq-plugins enable rabbitmq_management

By visiting http://localhost:15672/ You can see the management interface

Integrate RabbitMQ in Springboot

Introducing dependencies
		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

Configure queues and switches

@Configuration
public class RabbitmqConfig {

@Bean
    public Queue msgQueue(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("MSG_MQ", true, false, false);
    }
@Bean
    public DirectExchange msgExchange(){
        return new DirectExchange("MSG_ECHANGE", true, false);
    }
    @Bean
    public Binding mailBinding(){
        return BindingBuilder
                .bind(mailQueue())
                .to(msgExchange())
                .with("MSG_ROUTING");
    }
}
Producer sends message
@RequestMapping("/v1/demo")
@RestController
public class DemoController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sed_queue")
    public void sendMsg(){
         rabbitTemplate.convertAndSend("MSG_ECHANGE", "MSG_ROUTING", "How miserable you are:"+ System.currentTimeMillis());
    }
}
Consumer Acceptance Message

Use @RabbitListener to listen on the message queue and consume when there is a message in the queue

@Component
public class RabbitListner {
    @RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
    public void handleMsg(String msg){
        System.out.println("msg-"+ msg);
    }
}

queues and queuesToDeclare are different: when using queuesToDeclare, when the service starts, it goes back to MQ to detect the existence of a queue to listen on, and without it it it it it is created

Composition of RabbitMQ

  • Broker: Message Queuing service process. This process consists of two parts: Exchange and Queue.
  • Exchange: Message queue switch. Route messages to a queue according to certain rules.
  • Queue: Message queue, the queue in which messages are stored.
  • Producer: Message producer. Producer clients route messages to queues with switches.
  • Consumer: Message consumer. Messages stored in the consumer queue.

Four Switches

  • DirectExchange: A directly connected switch requires a queue to be bound and a routeKey value to be specified, similar to peer-to-peer sending. DirectExchange is used in the demo above
  • FanoutExchange: After binding a required queue to this switch, a message sent to the switch is forwarded to all queues connected to the switch. This pattern is similar to a publish subscription.
 @Bean
    public Queue faQueue1(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("fa.queue1", true, false, false);
    }
 @Bean
    public Queue faQueue2(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("fa.queue2", true, false, false);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout.exchange", true, false);
    }

    @Bean
    public Binding bindingFanoutExchange(){
        return BindingBuilder
                .bind(faQueue1())
                .to(fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutExchange1(){
        return BindingBuilder
                .bind(faQueue2())
                .to(fanoutExchange());
    }

@RabbitListener(queuesToDeclare = @Queue("fa.queue1"))
    public void faQueue1(String msg){
        System.out.println("faQueue1-"+msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("fa.queue2"))
    public void faQueue2(String msg){
        System.out.println("faQueue2-"+msg);
    }
@GetMapping("/sed_fanout")
    public void sendFanoutMsg(){
        rabbitTemplate.convertAndSend("fanout.exchange", null, "fanoutExchange: "+ System.currentTimeMillis());
    }
  • TopicExchange: A theme switch can also be called a wildcard switch. This switch matches by wildcards and routes to the corresponding queue. The wildcards #and * represent matching multiple and one, respectively.
    @Bean
    public Queue topicQueue1(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("topic.queue1", true, false, false);
    }

    @Bean
    public Queue topicQueue2(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("topic.queue2", true, false, false);
    }

    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange("topic.exchange1", true, false);
    }

    @Bean
    public Binding topicBinding1(){
        return BindingBuilder
                .bind(topicQueue1())
                .to(topicExchange1())
                .with("top.*");
    }

    @Bean
    public Binding topicBinding2(){
        return BindingBuilder
                .bind(topicQueue2())
                .to(topicExchange1())
                .with("top.#");
    }
    @RabbitListener(queuesToDeclare = @Queue("topic.queue1"))
    public void topicQueue1(String msg){
        System.out.println("topicQueue1-"+msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("topic.queue2"))
    public void topicQueue2(String msg){
        System.out.println("topicQueue2-"+msg);
    }
    @GetMapping("/sed_topic")
    public void sendFanoutMsg(String key){
        rabbitTemplate.convertAndSend("topic.exchange1", key, "TopicExchange: "+ System.currentTimeMillis());
    }
  • HeadersExchange: This type of switch does not use as much as it does. It differs slightly from the three above in that instead of routingKey routing, it routes with key values in the matching request header. This switch is not used much.
@Bean
    public Queue headQueue(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("head.queue1", true, false, false);
    }

    @Bean
    public Queue headQueue1(){
        /**
         * name: Queue name
         * durable Is it persistent
         * exclusive Is it an exclusive queue that only the creator can use
         * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use
         */
        return new Queue("head.queue2", true, false, false);
    }

    @Bean
    public HeadersExchange headersExchange(){
        return new HeadersExchange("head.exchange", true, false);
    }


    @Bean
    public Binding headBinding(){
        Map<String, Object> headers = new HashMap<>();
        headers.put("abk", "asd");
        return BindingBuilder
                .bind(headQueue())
                .to(headersExchange())
                .whereAll(headers)
                .match();
    }

    @Bean
    public Binding headBinding1(){
        Map<String, Object> headers = new HashMap<>();
        headers.put("abk", "ack");
        return BindingBuilder
                .bind(headQueue1())
                .to(headersExchange())
                .whereAll(headers)
                .match();
    }
    @RabbitListener(queuesToDeclare = @Queue("head.queue1"))
    public void headQueue1(String msg){
        System.out.println("headQueue1-"+msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("head.queue2"))
    public void headQueue2(String msg){
        System.out.println("headQueue2-"+msg);
    }
@GetMapping("/sed_head_msg")
    public void sendHeaderMsg1(@RequestParam String msg,
                               @RequestBody Map<String, Object> map){

        MessageProperties messageProperties = new MessageProperties();
        //Message Persistence
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("UTF-8");
        //Add message
        messageProperties.getHeaders().putAll(map);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("head.exchange", null, message);
    }

Invoke request with postman

Background Print Out

Description matched head.queue2

Similarly, set the head value

Message Reliability


The diagram shows the entire process of message delivery, and we can roughly analyze which parts of the process result in unreliable or lost messages.

  • When the producer sends a message to the MQ, the MQ hangs up and the message is lost.
  • Producers send messages to MQ but there is no persistent queue, consumers haven't consumed yet, MQ hangs up and messages are lost.
  • Consumers consume the message, but when an error occurs or the program hangs up, the message is also lost.

For the above three cases, Rabbit provides us with solutions: persistence, confirm mechanism, ACK transaction mechanism.

Message Persistence

Configure Exchange and Queue persistence.
Set durable to true when creating Queue and Exchange

You can also use the default value, which is true

The same is true for switches

Message acknowledgement mechanism

During the period when the producer sent the message to MQ, MQ hung up, causing the message to be lost. Rabbit provides confirm and returnMessage methods to handle message loss.

springboot add configuration

## The new version uses publisher-confirm-type with three parameters
# none (disabled)
# correlated (trigger confirm callback)
# Simple (with correlated functionality, rabbitTemplate can also call waitForConfirms or waitForConfirmsOrDie)
# Default false for publisher-confirms in older versions
spring.rabbitmq.publisher-confirm-type=simple
# Message does not match queue to trigger returnMessage callback
spring.rabbitmq.publisher-returns= true
# Use mandatory first when publisher-returns and mandatory are used together
spring.rabbitmq.template.mandatory= true

Implement RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback

@Component
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String error) {
       if(ack){
           System.out.println("Message sent successfully");
       } else {
           System.out.println("Message sending failed");
       }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("replyCode:").append(replyCode).append(",")
        .append("replyText:").append(replyText).append(",")
        .append("exchange:").append(exchange).append(",")
        .append("routingKey:").append(routingKey).append(",");
    }
}


No match to route trigger returnMessage

Find switch trigger confirm
No switches and queues found

ACK Transaction Mechanism

Message acknowledgment mechanism solves the problem in MQ process, while ACK solves the problem of message loss in consumer processing.

The consumer accepts the message, fails in the process of processing and manually refuses to sign, puts it back in the queue for re-consumption, and manually signs and receives after successful consumption.

Configure Manual Mode

### Turn on manual mode
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## Minimum number of consumers
spring.rabbitmq.listener.simple.concurrency=1
## Maximum number of consumers
spring.rabbitmq.listener.simple.max-concurrency=1

Transforming consumers

  @RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
    public void handleMsg(String msg, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if("success".equals(msg)){
            channel.basicAck(deliveryTag, false);
        } else if("reply".equals(msg)) {
            // Difference between basicReject and basicNack BasicReject does not support batch basicNack support
            //  channel.basicReject(deliveryTag, true);
            channel.basicNack(deliveryTag, false, true);
        } else {
            channel.basicNack(deliveryTag, false, false);
        }
    }

basicAck: Successful confirmation message

  • deliveryTag: index of message
  • mutiple: Whether to batch confirm, when true, ack s all messages less than deliveryTag at a time

basicReject: Failed Reject

  • deliveryTag: index of message
  • requeue: Whether to re-queue

    basicNack: Failed Rejection
  • deliveryTag: index of message
  • mutiple: Batch reject, reject all messages less than deliveryTag at once
  • requeue: Whether to re-queue
Problems with ack
  • nack dead loop
    After the reply message is put back in the queue, the program still can't handle it. There will be an endless loop, continuous consumption, put in the queue, and know the problem is solved.
    My idea is to use a database to store message information. Then process notifications through timed tasks or via interface feedback
  • double ack
    When the automatic ACK is turned on, it is handled manually in the code, causing a message to trigger ACK twice, and one ack will fail.
  • Performance Consumption
    Manual ack mode is about 10 times slower than automatic mode, and many times the default is fine.
  • Manual ack, not replying in time can cause queue exceptions

Keywords: Java RabbitMQ Distribution

Added by webguy262 on Thu, 23 Dec 2021 04:33:49 +0200