Basic knowledge
Why use message middleware
decoupling
In applications with multiple systems, it is common that system A affects B, C, D data. The usual practice is to call the interfaces of other systems in A, so that the dependencies between systems are too large, and if new systems are added later, you need to add appropriate logic in A. This is too coupled for maintenance.
After joining MQ, system A does not need to add calls from other systems, just send messages, other systems listen for messages, and process them in their own systems. Adding or deleting code does not require changing system A, just canceling this type of message listening in your own.
asynchronous
Many times it involves the invocation between multiple services. Clients make requests, and A calls back the interface of B, C, D, and finally returns the execution result to the client. In such a process, the execution time of interface A receives the influence of other services, which is the sum of their execution time. If A does not care about the execution of B, C, D, then MQ can be used. A Sends a message and returns it directly, thereby increasing the response time of the interface.
Peak Clipping
When the system is facing a large number of requests, it will put a lot of pressure on the database. After the introduction of MQ, you can take a certain amount of data from MQ each time, according to the actual processing capacity of the database, and process and extract from it.
Producer/Consumer
Installation of RabbitMQ
Normal Installation
Go directly to the official website to download the installation package.
https://www.rabbitmq.com/
docker installation
// Pull out the mirror docker pull rabbitmq // Start Container docker run -it --name rabbitmq \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 \ -d rabbitmq // Enter container to open management interface docker exec -it rabbitmq sh //Open management interface rabbitmq-plugins enable rabbitmq_management
By visiting http://localhost:15672/ You can see the management interface
Integrate RabbitMQ in Springboot
Introducing dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Configure queues and switches
@Configuration public class RabbitmqConfig { @Bean public Queue msgQueue(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("MSG_MQ", true, false, false); } @Bean public DirectExchange msgExchange(){ return new DirectExchange("MSG_ECHANGE", true, false); } @Bean public Binding mailBinding(){ return BindingBuilder .bind(mailQueue()) .to(msgExchange()) .with("MSG_ROUTING"); } }
Producer sends message
@RequestMapping("/v1/demo") @RestController public class DemoController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sed_queue") public void sendMsg(){ rabbitTemplate.convertAndSend("MSG_ECHANGE", "MSG_ROUTING", "How miserable you are:"+ System.currentTimeMillis()); } }
Consumer Acceptance Message
Use @RabbitListener to listen on the message queue and consume when there is a message in the queue
@Component public class RabbitListner { @RabbitListener(queuesToDeclare = @Queue("MSG_MQ")) public void handleMsg(String msg){ System.out.println("msg-"+ msg); } }
queues and queuesToDeclare are different: when using queuesToDeclare, when the service starts, it goes back to MQ to detect the existence of a queue to listen on, and without it it it it it is created
Composition of RabbitMQ
- Broker: Message Queuing service process. This process consists of two parts: Exchange and Queue.
- Exchange: Message queue switch. Route messages to a queue according to certain rules.
- Queue: Message queue, the queue in which messages are stored.
- Producer: Message producer. Producer clients route messages to queues with switches.
- Consumer: Message consumer. Messages stored in the consumer queue.
Four Switches
- DirectExchange: A directly connected switch requires a queue to be bound and a routeKey value to be specified, similar to peer-to-peer sending. DirectExchange is used in the demo above
- FanoutExchange: After binding a required queue to this switch, a message sent to the switch is forwarded to all queues connected to the switch. This pattern is similar to a publish subscription.
@Bean public Queue faQueue1(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("fa.queue1", true, false, false); } @Bean public Queue faQueue2(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("fa.queue2", true, false, false); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanout.exchange", true, false); } @Bean public Binding bindingFanoutExchange(){ return BindingBuilder .bind(faQueue1()) .to(fanoutExchange()); } @Bean public Binding bindingFanoutExchange1(){ return BindingBuilder .bind(faQueue2()) .to(fanoutExchange()); }
@RabbitListener(queuesToDeclare = @Queue("fa.queue1")) public void faQueue1(String msg){ System.out.println("faQueue1-"+msg); } @RabbitListener(queuesToDeclare = @Queue("fa.queue2")) public void faQueue2(String msg){ System.out.println("faQueue2-"+msg); }
@GetMapping("/sed_fanout") public void sendFanoutMsg(){ rabbitTemplate.convertAndSend("fanout.exchange", null, "fanoutExchange: "+ System.currentTimeMillis()); }
- TopicExchange: A theme switch can also be called a wildcard switch. This switch matches by wildcards and routes to the corresponding queue. The wildcards #and * represent matching multiple and one, respectively.
@Bean public Queue topicQueue1(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("topic.queue1", true, false, false); } @Bean public Queue topicQueue2(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("topic.queue2", true, false, false); } @Bean public TopicExchange topicExchange1(){ return new TopicExchange("topic.exchange1", true, false); } @Bean public Binding topicBinding1(){ return BindingBuilder .bind(topicQueue1()) .to(topicExchange1()) .with("top.*"); } @Bean public Binding topicBinding2(){ return BindingBuilder .bind(topicQueue2()) .to(topicExchange1()) .with("top.#"); }
@RabbitListener(queuesToDeclare = @Queue("topic.queue1")) public void topicQueue1(String msg){ System.out.println("topicQueue1-"+msg); } @RabbitListener(queuesToDeclare = @Queue("topic.queue2")) public void topicQueue2(String msg){ System.out.println("topicQueue2-"+msg); }
@GetMapping("/sed_topic") public void sendFanoutMsg(String key){ rabbitTemplate.convertAndSend("topic.exchange1", key, "TopicExchange: "+ System.currentTimeMillis()); }
- HeadersExchange: This type of switch does not use as much as it does. It differs slightly from the three above in that instead of routingKey routing, it routes with key values in the matching request header. This switch is not used much.
@Bean public Queue headQueue(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("head.queue1", true, false, false); } @Bean public Queue headQueue1(){ /** * name: Queue name * durable Is it persistent * exclusive Is it an exclusive queue that only the creator can use * autoDelete Declare that this queue is temporary and the last consumer automatically deletes it after use */ return new Queue("head.queue2", true, false, false); } @Bean public HeadersExchange headersExchange(){ return new HeadersExchange("head.exchange", true, false); } @Bean public Binding headBinding(){ Map<String, Object> headers = new HashMap<>(); headers.put("abk", "asd"); return BindingBuilder .bind(headQueue()) .to(headersExchange()) .whereAll(headers) .match(); } @Bean public Binding headBinding1(){ Map<String, Object> headers = new HashMap<>(); headers.put("abk", "ack"); return BindingBuilder .bind(headQueue1()) .to(headersExchange()) .whereAll(headers) .match(); }
@RabbitListener(queuesToDeclare = @Queue("head.queue1")) public void headQueue1(String msg){ System.out.println("headQueue1-"+msg); } @RabbitListener(queuesToDeclare = @Queue("head.queue2")) public void headQueue2(String msg){ System.out.println("headQueue2-"+msg); }
@GetMapping("/sed_head_msg") public void sendHeaderMsg1(@RequestParam String msg, @RequestBody Map<String, Object> map){ MessageProperties messageProperties = new MessageProperties(); //Message Persistence messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setContentType("UTF-8"); //Add message messageProperties.getHeaders().putAll(map); Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("head.exchange", null, message); }
Invoke request with postman
Background Print Out
Description matched head.queue2
Similarly, set the head value
Message Reliability
The diagram shows the entire process of message delivery, and we can roughly analyze which parts of the process result in unreliable or lost messages.
- When the producer sends a message to the MQ, the MQ hangs up and the message is lost.
- Producers send messages to MQ but there is no persistent queue, consumers haven't consumed yet, MQ hangs up and messages are lost.
- Consumers consume the message, but when an error occurs or the program hangs up, the message is also lost.
For the above three cases, Rabbit provides us with solutions: persistence, confirm mechanism, ACK transaction mechanism.
Message Persistence
Configure Exchange and Queue persistence.
Set durable to true when creating Queue and Exchange
You can also use the default value, which is true
The same is true for switches
Message acknowledgement mechanism
During the period when the producer sent the message to MQ, MQ hung up, causing the message to be lost. Rabbit provides confirm and returnMessage methods to handle message loss.
springboot add configuration
## The new version uses publisher-confirm-type with three parameters # none (disabled) # correlated (trigger confirm callback) # Simple (with correlated functionality, rabbitTemplate can also call waitForConfirms or waitForConfirmsOrDie) # Default false for publisher-confirms in older versions spring.rabbitmq.publisher-confirm-type=simple # Message does not match queue to trigger returnMessage callback spring.rabbitmq.publisher-returns= true # Use mandatory first when publisher-returns and mandatory are used together spring.rabbitmq.template.mandatory= true
Implement RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
@Component public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String error) { if(ack){ System.out.println("Message sent successfully"); } else { System.out.println("Message sending failed"); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("replyCode:").append(replyCode).append(",") .append("replyText:").append(replyText).append(",") .append("exchange:").append(exchange).append(",") .append("routingKey:").append(routingKey).append(","); } }
No match to route trigger returnMessage
Find switch trigger confirm
No switches and queues found
ACK Transaction Mechanism
Message acknowledgment mechanism solves the problem in MQ process, while ACK solves the problem of message loss in consumer processing.
The consumer accepts the message, fails in the process of processing and manually refuses to sign, puts it back in the queue for re-consumption, and manually signs and receives after successful consumption.
Configure Manual Mode
### Turn on manual mode spring.rabbitmq.listener.simple.acknowledge-mode=manual ## Minimum number of consumers spring.rabbitmq.listener.simple.concurrency=1 ## Maximum number of consumers spring.rabbitmq.listener.simple.max-concurrency=1
Transforming consumers
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ")) public void handleMsg(String msg, Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); if("success".equals(msg)){ channel.basicAck(deliveryTag, false); } else if("reply".equals(msg)) { // Difference between basicReject and basicNack BasicReject does not support batch basicNack support // channel.basicReject(deliveryTag, true); channel.basicNack(deliveryTag, false, true); } else { channel.basicNack(deliveryTag, false, false); } }
basicAck: Successful confirmation message
- deliveryTag: index of message
- mutiple: Whether to batch confirm, when true, ack s all messages less than deliveryTag at a time
basicReject: Failed Reject
- deliveryTag: index of message
- requeue: Whether to re-queue
basicNack: Failed Rejection - deliveryTag: index of message
- mutiple: Batch reject, reject all messages less than deliveryTag at once
- requeue: Whether to re-queue
Problems with ack
- nack dead loop
After the reply message is put back in the queue, the program still can't handle it. There will be an endless loop, continuous consumption, put in the queue, and know the problem is solved.
My idea is to use a database to store message information. Then process notifications through timed tasks or via interface feedback - double ack
When the automatic ACK is turned on, it is handled manually in the code, causing a message to trigger ACK twice, and one ack will fail. - Performance Consumption
Manual ack mode is about 10 times slower than automatic mode, and many times the default is fine. - Manual ack, not replying in time can cause queue exceptions