Message queuing MQ
I took notes for the first time. I hope to leave a message to correct the deficiencies. I will continue to improve later*
1. Why MQ
1. Flow peak elimination
2. Application decoupling
When a business is composed of multiple system calls, the failure of any subsystem will lead to the abnormal operation of the whole business. After this operation is transformed into a message queue based method, the calls between systems are decoupled. Some information to be processed is cached in the message queue, so that it will not cause problems in one subsystem and exceptions in the whole system
3. Asynchronous processing
2.MQ classification
1.ActiveMq,
2.KafKa, commonly used for big data processing, with high throughput (in millions)
Disadvantages:
3.RocketMQ
RabbitMq
RabbitMq
working principle:
1.Hello World
2.Work queues
2.1 concept:
When a producer sends a large number of messages, multiple worker threads need to receive messages to process a large number of messages. (Note: a message can only be processed sequentially, not multiple times)
2.2 polling distribution message
Send messages in turn when there are few messages
2.3 message response
In order to ensure that the message sending process is not lost, rabbitmq introduces a message response mechanism: when the consumer receives and processes the message, he tells rabbitmq that it has been processed, and rabbitmq can delete the message.
2.3. 3 automatic response
After the message is successfully sent, it is considered that the message has been successfully transmitted, that is, the consumer receives the message and considers that the message processing is successful and can be deleted
2.3. 4 method of message response
2.3. 5 messages automatically rejoin the team
If the consumer loses connection and does not send ACK confirmation, the message will be re queued and re processed by other consumers. In this way, even if a consumer dies unexpectedly, it can be guaranteed that the message will not be lost.
2.3. 6 message manual response
The message is not lost in the manual response and is put back in the queue for re consumption
producer
public class Producer { //Queue name public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException { Channel channel = RabbitmqUtils.getChannel(); //Declaration queue AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); //Enter information from the console Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("Producer sends message" + message); } } }
consumer
public class Consumer1 { public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException { Channel channel = RabbitmqUtils.getChannel(); System.out.println("C1 Short waiting time for message processing"); DeliverCallback deliverCallback = (consumerTag , message)->{ //Sleep for 1s try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Received small"+ new String(message.getBody())); //Manual answer /** * 1.Tag of message, tag * 2.Batch response */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //Callback when canceling a message CancelCallback cancelCallback = consumerTag->{ System.out.println("Message consumption interrupted"); }; channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback); } }
2.4 persistence
Queue persistence
The queue still exists after the system is restarted
//Create a queue channel.queueDeclare(QUEUQ_NAME, false,false,false,null);
If the second parameter is true, it is a persistent queue,
Note that when there is a non persistent queue near the queue, it needs to be deleted and recreated to make it a persistent queue.
Message persistence
When sending a message, declare the message as a persistent message
channel.basicPublish("",QUEUQ_NAME,null,message.getBytes());
The third parameter 'props' is persistent_ TEXT_ When plan, this is a persistent message
channel.basicPublish("",QUEUQ_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
2.5 unfair distribution
Different consumers have different ability to process messages. In order to improve the speed of message processing, unfair distribution is introduced
Note: unfair distribution is set in the consumer code
channel.basicQos(1);
2.6 pre value
You can specify the number of messages consumers get in advance,
channel.basicQos(x);
x represents the preset quantity
3. Release confirmation
If the message has not been saved to the disk during persistence, it will be lost if it is lost by the consumer. Therefore, the release confirmation mode is introduced, that is, the message must be saved to the disk after publication, and can be consumed only after the producer confirms that it has been saved successfully.
3.1 enable release confirmation mode
channel.confirmSelect();
3.2 single confirmation release
Send a confirmation, that is, after publishing a message, you can send the next message only after confirmation.
Disadvantages: the publishing speed is very slow
Channel channel = RabbitmqUtils.getChannel(); //Queue declaration channel.queueDeclare("q1",true,false,false,null); //Open release confirmation channel.confirmSelect(); //start time long begin = System.currentTimeMillis(); //Batch send messages for(int i =0;i<100;i++){ String msg = i+""; channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8)); //Confirm after sending a single message boolean flag = channel.waitForConfirms(); if(flag){ System.out.println("Message sent successfully"); } } //End time long end = System.currentTimeMillis()-begin; System.out.println("Time consuming for individual acknowledgement messages"+end+"ms"); }
3.3 batch confirmation
//Batch send messages for(int i =0;i<100;i++){ String msg = i+""; channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8)); //When 100 entries are reached, batch confirmation is performed in turn if(i%100 == 0){ //confirm channel.waitForConfirms(); } }
3.4 asynchronous confirmation Publishing
Send all the messages first. When the switch confirms, if the callback ackcallback is received but not received, it will also call back the ackcallback. In this way, the message that failed to be sent will be received asynchronously, and it can be sent again at this time.
public static void m3() throws IOException, InterruptedException { Channel channel = RabbitmqUtils.getChannel(); //Queue declaration channel.queueDeclare("q1",true,false,false,null); //Open release confirmation channel.confirmSelect(); //start time long begin = System.currentTimeMillis(); //The message preparation listener listens for which messages succeed and which messages fail //Function to confirm successful callback of message ConfirmCallback ackCallback = (deliveryTag,multiple)->{ System.out.println("Confirmed message" + deliveryTag); }; //Function of message acknowledgement failure callback /** * Parameter 1 Tags for messages * Parameter 2 Is batch confirmation not performed */ ConfirmCallback nackCallback = (deliveryTag,multiple)->{ System.out.println("Unacknowledged messages" + deliveryTag); }; channel.addConfirmListener(ackCallback,nackCallback); //Send message asynchronously for(int i =0;i<1000;i++){ String msg = i+""; channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8)); } //End time long end = System.currentTimeMillis()-begin; System.out.println("Time consuming for batch confirmation messages"+end+"ms"); }
How to handle asynchronous unacknowledged messages
public static void m3() throws IOException, InterruptedException { Channel channel = RabbitmqUtils.getChannel(); //Queue declaration channel.queueDeclare("q1",true,false,false,null); //Open release confirmation channel.confirmSelect(); /* *Thread safe and orderly hash table, suitable for high concurrency * 1.Associate sequence numbers with messages * 2.Batch delete * 3.Support high concurrency * */ ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>(); //start time long begin = System.currentTimeMillis(); //The message preparation listener listens for which messages succeed and which messages fail //Function to confirm successful callback of message ConfirmCallback ackCallback = (deliveryTag,multiple)->{ if(multiple){ //2. Delete the confirmed message ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag); longStringConcurrentNavigableMap.clear(); }else { concurrentSkipListMap.remove(deliveryTag); } System.out.println("Confirmed message" + deliveryTag); }; //Function of message acknowledgement failure callback /** * Parameter 1 Tags for messages * Parameter 2 Is batch confirmation not performed */ ConfirmCallback nackCallback = (deliveryTag,multiple)->{ //3. Print as confirmation message String msg = concurrentSkipListMap.get(deliveryTag); System.out.println("Unacknowledged message: " + msg ); System.out.println("Unacknowledged messages" + deliveryTag); }; channel.addConfirmListener(ackCallback,nackCallback); //Send message asynchronously for(int i =0;i<1000;i++){ String msg = i+""; channel.basicPublish("","q1",null,msg.getBytes(StandardCharsets.UTF_8)); //1. Record messages sent concurrentSkipListMap.put(channel.getNextPublishSeqNo(),msg); } //End time long end = System.currentTimeMillis()-begin; System.out.println("Time consuming for batch confirmation messages"+end+"ms"); }
4. Switch
4.1 publish / subscribe mode
4.2 switch concept:
The core idea history of RammitMQ's messaging model: the producer's message will not be sent directly to the queue. Usually, the producer does not know which queue the message is delivered to
Therefore, the producer can only send messages to the switch, which receives messages from the producer and pushes them into the queue.
4.3 types of exchans
There are several types of switches in total
direct (routing), topic (topic), headers (headers), fan out (fan out, type of publish / subscribe)
Anonymous Exchanges
4.4 temporary queue
If there is no persistent queue, it will be deleted when disconnected.
Create a temporary queue with a random name
channel.queueDeclare().getQueue();
4.5 binding
The switch can bind multiple queues and judge the sent messages through the RoutingKey
4.6 Fanout, publish and subscribe
This type of switch broadcasts the received messages to all bound queues
Declare a switch. The first parameter (switch name), the second parameter (switch name)
channel.exchangeDeclare("los","fanout");
Bind queue
channel.queueBind(queue,exchanges)
4.7 direct
Adding routingkey during binding
queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
4.8 topic
Compared with the more powerful types of direct and fanout,
In tpoic, you can specify the routingKey according to your * and # to achieve the effects of single sending, partial group sending and group sending
Where: * stands for one word,,,,, # stands for multiple words
Note: the routingKey size cannot exceed 255 bytes
5. Dead letter queue
5.1 concept of dead letter queue
- Dead letter queue: DLX, dead letter exchange
- With DLX, when a message becomes a dead message in a queue, it can be re publish ed to another Exchange, which is DLX
5.2 source of dead letter
- Message TTL expired
- The queue has reached the maximum length (the queue is full and can no longer be added to the data mq)
- The message is rejected (basic.reject or basic.nack) and request = false
5.2 dead letter case code
5.2. 1. Message TTL expires
Producer code
public class producer { //General switch public static String NORMAL_EXAHANGE = "normalExchange"; //Dead letter switch public static String DEAD_EXAHANGE = "deadExchange"; //Normal queue public static String NORMAL_QUEUE = "normalQueue"; //Dead letter queue public static String DEAD_QUEUE = "deadQueue"; public static void main(String[] args) throws Exception{ //Acquisition channel Channel channel = RabbitmqUtils.getChannel(); //Dead letter message, set TTL time 10s AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i =0;i<=10;i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXAHANGE, "normal", properties, msg.getBytes(StandardCharsets.UTF_8)); } } }
Ordinary consumer code: / / send to the dead letter exchange if receiving fails
public static void main(String[] args) throws Exception{ //Acquire channel Channel channel = RabbitmqUtils.getChannel(); //Declare common switch channel.exchangeDeclare(NORMAL_EXAHANGE, BuiltinExchangeType.DIRECT); //Declare dead letter switch channel.exchangeDeclare(DEAD_EXAHANGE,BuiltinExchangeType.DIRECT); //Declare normal queue Map<String, Object> arguments = new HashMap<>(); //Set dead letter switch with normal queue arguments.put("x-dead-letter-exchange",DEAD_EXAHANGE); //Set dead letter routingKey arguments.put("x-dead-letter-routing-key","dead"); channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //Declare dead letter queue channel.queueDeclare(DEAD_QUEUE,false,false,false,null); //binding channel.queueBind(NORMAL_QUEUE,NORMAL_EXAHANGE,"normal"); channel.queueBind(DEAD_QUEUE,DEAD_EXAHANGE,"dead"); //Claim to receive message DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("Consumer1 Received messages"+new String(message.getBody())); }; //The cancellation message is a callback CancelCallback cancelCallback = consumerTag->{ System.out.println("Message consumption interrupted"); }; // channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback); }
Dead letter consumer code: / / consume the messages in the dead letter queue directly through the name of the dead letter queue
public static void main(String[] args) throws Exception{ //Acquire channel Channel channel = RabbitmqUtils.getChannel(); //Claim to receive message DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("Consumer1 Received messages"+new String(message.getBody())); }; //Callback when canceling a message CancelCallback cancelCallback = consumerTag->{ System.out.println("Message consumption interrupted"); }; // channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback); }
5.2. 2 the queue has reached the maximum length
When modifying the code of queue declaration, the maximum length parameter is added
//Declare normal queue Map<String, Object> arguments = new HashMap<>(); //Set dead letter switch with normal queue arguments.put("x-dead-letter-exchange",DEAD_EXAHANGE); //Set dead letter routingKey arguments.put("x-dead-letter-routing-key","dead"); //Sets the maximum length of a normal pair of columns arguments.put("x-max-length",6);
5.2. 3 message rejected
Modify the message confirmation return function. / / manually answer the judgment message and reject it
//Refuse
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
//Claim to receive message DeliverCallback deliverCallback = (consumerTag,message)->{ String msg1 = new String(message.getBody()); if(msg1.equals("info5")){ System.out.println("Consumer1 Received messages"+msg1+"Rejected"); channel.basicReject(message.getEnvelope().getDeliveryTag(),false); }else { System.out.println("Consumer1 Received messages" + msg1); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } };
//Manual answer
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
6. Delay queue
7. Release confirmation advanced
Some unknown reasons in the production environment lead to rabbitmq downtime or restart, producer message delivery failure and message loss. Manual recovery and restart are required. In order to ensure the security and reliability of the message, release confirmation needs to be processed, and the confirmCallBack interface must be used to process the result of message confirmation. At this time, the confirmation callback interface must be opened, Because the publish confirmation mode is turned off by default, it needs to be turned on manually:
spring.rabbitmq.publisher-confirm-type=correlated
rabbitmq server down, producer failed to send message
7.1 example, switch confirmation
Create a publish confirmation configuration class to declare switches and queues
@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; //Declare business Exchange @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } // Claim confirmation queue @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } // Declare and confirm queue binding relationship @Bean public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("key1"); } }
The id of the message specified in the message producer is through CorrelationData, which is the interface used in the message callback interface to receive message parameters, confirm success, and the failure reason. It sends an error message, such as the queue of the specified routingKey does not exist or rabbitmq goes down:
@GetMapping("/sendMsg/{msg}") public void sendMsg(@PathVariable(value = "msg") String msg){ //Callback interface CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,"key2" ,msg,correlationData); log.info("The content of the message sent is: {}",msg); }
At this time, if the callback interface is not opened, it will be found that the message has been lost and has not been processed. Configure the message callback interface:
@Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnsCallback { public MyCallBack(@Autowire RabbitTemplate rabbitTemplate) { rabbitTemplate.setConfirmCallback(this); } /** * Switch confirmation callback method * 1. Switch received, callback * 1.1 correlationData Save the ID and related information of the callback message * 1.2 ack Switch receives message true * 1.3 cause There is no reason for success * 2. The switch didn't receive it * 2.1 correlationData Save the ID and related information of the callback message * 2.2 ack The switch did not receive the message false * 2.3 cause Reasons for failure * @param correlationData Save the ID and related information of the callback message * @param ack Does the switch receive messages * @param cause Reasons for failure */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : null; if (ack){ log.info("Switch has received ID by: {}News of",id); }else { log.info("Switch not received ID by: {}News of,The reason is: {}",id,cause); } } }
Then, when the switch, routingkey and queue have problems, the failure information will be returned.
7.2 fallback message
7.2.1 Mandatory parameter
**When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message. If it is found that the message is not routable, the message will be directly discarded. At this time, the producer does not know the event that the message is discarded** So how can I get messages that can't be routed to help me find a way to deal with them? At least let me know. I can handle it myself. By setting the mandatory parameter, the message can be returned to the producer when the destination is unreachable during message delivery.
Modify profile
spring: rabbitmq: host: 39.106.41.153 port: 5672 username: root password: root publisher-confirm-type: correlated publisher-returns: true
Callback method
package com.vleus.rabbitmq.springbootrabbitmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author vleus * @date 2021 At 23:34 on July 28 */ @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //injection rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /* Switch confirmation callback method: 1,Send a message and the switch receives the callback: 1.1 correlationData Save the ID and related information of the callback message; 1.2 Switch receives message true 1.3 cause null 2,Sending message switch failed to receive message callback 2.1 correlationData Save the ID and related information of the callback message; 2.2 Switch receives message false 2.3 cause The reason why the switch failed to receive messages */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("The switch has received the message,id by{}", id); } else { log.info("The switch has received it id by{}News of,For reasons: {}", id, cause); } } //The implementation returns the message to the producer when the destination is unreachable during message delivery //Fallback is performed only when the message cannot reach the destination @Override public void returnedMessage(ReturnedMessage returned) { Message message = returned.getMessage(); //news String exchange = returned.getExchange(); //Switch String replyText = returned.getReplyText(); // String routingKey = returned.getRoutingKey(); //Routing key log.error("news{},Switched{}return,The reason for return is: {},route key by: {}", new String(message.getBody()),exchange,replyText,routingKey); } }
7.3 backup switch
With the mandatory parameter and fallback message, we gain the perception of undeliverable messages, and have the opportunity to find and process producer messages when they cannot be delivered. But sometimes, we don't know how to handle these messages that can't be routed. We can make a log at most, then trigger an alarm, and then handle them manually. It is not elegant to handle these unrouted messages through logs, especially when the producer's service has multiple machines, manual copying logs will be more troublesome and error prone. Moreover, setting the mandatory parameter will increase the complexity of producers and need to add logic to deal with these returned messages. What if you don't want to lose messages and increase the complexity of producers? In the previous article on setting the dead letter queue, we mentioned that the dead letter switch can be set for the queue to store those failed messages, but these non routable messages have no chance to enter the queue, so the dead letter queue cannot be used to save messages. In RabbitMQ, there is a backup switch mechanism, which can deal with this problem well. What is a backup switch? The backup switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a switch, we create a spare tire for it. When the switch receives a non routable message, it will forward the message to the backup switch for forwarding and processing, Usually, the backup switch is Fanout, so that all messages can be delivered to the bound queue. Then we bind a queue under the backup switch, so that all messages that cannot be routed by the original switch will enter the queue. Of course, we can also establish an alarm queue to monitor and alarm with independent consumers.
Statement:
Declare the corresponding backup switch when declaring the switch
//Claim switch @Bean("confirmExchange") public DirectExchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME). durable(true) .alternate(BACKUP_EXCHANGE). //Forward to backup switch build(); // return new DirectExchange(CONFIRM_EXCHANGE_NAME); }
Details:
Configuration class
@Configuration public class ConfirmConfig { //Define switch public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; //queue public static final String CONFIRM_QUEUE = "confirm.queue"; //routingKey public static final String ROUTING_KEY = "key1"; //Backup switch public static final String BACKUP_EXCHANGE = "backup_exchange"; //Backup queue public static final String BACKUP_QUEUE = "backup_queue"; //Alarm queue public static final String WARNING_QUEUE = "warning_queue"; //Claim switch @Bean("confirmExchange") public DirectExchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME). durable(true) .alternate(BACKUP_EXCHANGE). //Forward to backup switch build(); // return new DirectExchange(CONFIRM_EXCHANGE_NAME); } //Declaration queue @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } //binding @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue, @Qualifier("confirmExchange")DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY); } //Backup switch @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE); } //Declare backup queue @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE).build(); } //Declare alarm queue @Bean("warningQueue") public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE).build(); } //The backup queue is bound to the backup switch @Bean public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue")Queue backupQueue, @Qualifier("backupExchange")FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } //The alarm queue is bound to the backup switch @Bean public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue")Queue warningQueue, @Qualifier("backupExchange")FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); } }
Consumer:
@Slf4j @Component public class WarningConsumer { //Receive alarm messages @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE) public void consumeWarningQueue(Message message) { String msg = new String(message.getBody()); log.error("The alarm found a non routable message: {}",msg); } }
Note: Backup switches take precedence over fallback messages
8. Priority queue
8.1 usage scenarios
If there is an order payment urging scenario in our system, Taobao will push the order placed by our customer on tmall to us in time. If the user fails to pay within the time set by the user, it will push a text message reminder to the user. But for Taobao, it must be divided into large customers and small customers, right, such as apple, A big business like Xiaomi can at least create a lot of profits for us in a year, so of course, their orders must be given priority. In the past, our back-end system used redis to store regular polling. We all know that redis can only use List as a simple message queue, but can not achieve a priority scenario, Therefore, after the order quantity is large, RabbitMQ is used for transformation and optimization. If it is found that the order of a large customer is given a relatively high priority, otherwise it is the default priority.
be careful:
The following things need to be done to enable the queue to achieve priority: the queue needs to be set as the priority queue, the message needs to be set as the priority of the message, and the consumer needs to wait until the message has been sent to the queue to consume, because this is the opportunity to sort the messages
8.2 example
producer
public class Producer { private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { //Give the message a priority attribute AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); for (int i = 1; i <11; i++) { String message = "info"+i; if(i==5){ channel.basicPublish("", QUEUE_NAME, properties, message.getBytes()); }else{ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } System.out.println("Send message complete:" + message); } } } }
consumer
public class Consumer { private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //Set the maximum priority of the queue. The maximum priority can be set to 255. The official website recommends 1-10. If the setting is too high, compare memory and CPU Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, params); System.out.println("Consumers start waiting for consumption......"); DeliverCallback deliverCallback=(consumerTag, delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("Message received:"+receivedMessage); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{ System.out.println("Called when the consumer cannot consume the message, such as when the queue is deleted"); }); } }
9. Inert queue
9.1 concept
A queue in which messages are saved on disk
9.2 usage scenarios
When consumers go offline or cannot consume, they use inert queues to avoid a large number of messages piling up in memory
9.3 declaration method
Just add a parameter when declaring the queue
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
10. Cluster
A single MQ can not deal with the situation that a large number of producers send messages, so a cluster is introduced. Multiple MQS form a cluster
10.1 image queue
RabbitMQ is the default cluster mode and does not guarantee the high availability of queues. Although queue information, switches and bindings can be copied to any node in the cluster, the queue content will not be copied. Although this mode solves the node pressure of a project group, the downtime of queue nodes directly leads to the unavailability of the queue and can only wait for restart, Therefore, in order to work normally even when the queue node goes down or fails, it is necessary to copy the queue content to each node in the cluster and create a mirror queue.
10.2 Federation plug-in
The Federation plug-in is used to copy queue messages between different RabbitMQ clusters. The cluster can be an intranet or a public network, which is transparent to applications, that is, applications will not perceive it and do not need to write relevant code.