catalogue
1.TTL mechanism
1.1 implementation scheme
In the current e-commerce business, the order is created successfully. Generally, a certain time will be given to wait for payment to start the countdown. If the user does not pay within this period of time, the default order will be cancelled.
How to realize this function?
Regular polling (database, etc.)
If the user orders successfully, the order data will be put into the database, and the payment status will be put into the database, and the user's payment will be easier
Change database status. Regularly poll the payment status of the database, and cancel the order if it exceeds 30 minutes.
Advantages: simple design and implementation.
Disadvantages: it requires a lot of IO operations on the database, which is inefficient.
Timer
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); Timer timer = new Timer(); TimerTask timerTask = new TimerTask() { @Override public void run() { System.out.println("The user has not paid, and the transaction is cancelled:" + simpleDateFormat.format(new Date(System.currentTimeMillis()))); timer.cancel(); } }; System.out.println("Waiting for user payment:" + simpleDateFormat.format(new Date(System.currentTimeMillis()))); // Execute timerTask in 10 seconds timer.schedule(timerTask, 10 * 1000);
Advantage: didn't expect
Disadvantages: there is no persistence mechanism.
Inflexible (only start time and repetition interval can be set, which is not suitable for other businesses).
The thread pool cannot be used. One timer and one thread.
There is no real management plan.
Scheduler and other timers
// Thread factory ThreadFactory factory = Executors.defaultThreadFactory(); // Use thread pool ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory); System.out.println("Start waiting for user payment for 10 seconds:" + format.format(new Date())); service.schedule(new Runnable() { @Override public void run() { System.out.println("If the user fails to pay, the transaction is cancelled:" + format.format(new Date())); }// Wait 10s unit second }, 10, TimeUnit.SECONDS);
Advantages: it can be executed by multiple threads to avoid mutual influence of tasks to a certain extent, and the exception of a single task does not affect other tasks
Disadvantages: high concurrent delivery. It is not recommended to use scheduled tasks, which wastes server performance
TTL of RabbitMQ
Set TTL through message and queue dimensions
Advantages: the timing function can be realized without consuming too much server performance
Disadvantages: the capacity and stacking capacity of any middleware are limited, and the introduction of message oriented middleware will increase many problems
Because the capacity and stacking capacity of message middleware are limited, if some messages can't be consumed all the time, there needs to be something to explain.
At present, there are two methods to set the TTL of messages. If the two methods are used at the same time, the TTL of messages has been subject to the smaller value between the two.
1. By setting the Queue property, all messages in the Queue have the same expiration time.
2. The TTL of each message can be set separately.
1.2 native API implementation
try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // Create a queue (actually AMQP default is used as a direct switch) // Set queue properties Map<String, Object> arguments = new HashMap<>(); // Set TTL of queue arguments.put("x-message-ttl", 30000); // Set the idle survival time of the queue (if the queue has no consumers at all and has not been used, how long the queue can survive) arguments.put("x-expires", 10000); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.basicPublish("", QUEUE_NAME, //Set the expiration time of the message itself new AMQP.BasicProperties().builder().expiration("30000").build(), message.getBytes()) } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
In addition, you can set the global TTL through the command line and execute the following commands:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
Default rule:
1. If x-message-ttl is not set, the message will not expire.
2. If x-message-ttl is set to 0, it means that the message will be discarded immediately unless it can be delivered directly to the consumer.
1.3 SpringBoot implementation
@Configuration public class RabbitConfig { @Bean public Queue queueTTLWaiting() { Map<String, Object> props = new HashMap<>(); // For the messages in the queue, it is set to wait for 10s props.put("x-message-ttl", 10000); // Set the idle survival time of the queue (if the queue has no consumers at all and has not been used, how long the queue can survive) props.put("x-expires", 1000); Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props); return queue; } }
@RestController public class controller{ @RequestMapping("/pay/msgttl") public String sendTTLMessage() throws UnsupportedEncodingException { MessageProperties properties = new MessageProperties(); //Set the expiration time of the message itself properties.setExpiration("5000"); Message message = new Message("Sent WAITINGMESSAGE".getBytes("utf-8"), properties); rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message); return "msg-ttl-ok"; } }
2. Dead letter queue
DLX, fully known as dead letter excahnge, dead letter switch.
In each delivery system, the user places an order and invokes the order service, and then the order service invokes the delivery system to notify the delivery personnel to deliver the order. At this time, the order system and the delivery system adopt MQ asynchronous communication.
How to deal with a user who places an order and then cancels the order after the takeout receives the order? How to deal with a user who has not received the order for a long time?
In this scenario, we can consider defining a dead letter switch and binding a dead letter queue. When the message becomes a dead letter, the message will be sent to the dead letter queue, which is convenient for us to check the reason for the failure of the message, so as to j carry out the next step, whether to cancel the order or arrange other takeout workers.
After the message arrives at the queue, it is re sent to a special switch (DLX). At the same time, the queue bound to DLX is called "dead letter queue".
The following situations lead to bad messages:
1. The message is rejected (Basic.Reject / Basic.Nack) and the request parameter is set to false.
2. The message has expired.
3. The queue has reached its maximum length.
At the same time, when dealing with exceptions, if the message cannot be consumed normally by consumers, we can put it into the dead letter queue to analyze the exceptions in the program later, so as to improve and optimize the system.
2.1 native API implementation
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // Define a dead letter exchange (also an ordinary exchange) channel.exchangeDeclare("exchange.dlx", "direct", true); // Define a normal service switch channel.exchangeDeclare("exchange.biz", "fanout", true); Map<String, Object> arguments = new HashMap<>(); // Set queue TTL arguments.put("x-message-ttl", 10000); // Set the dead letter exchange associated with the queue (if the queue message is still not consumed after the TTL expires, it will be added to the dead letter queue) arguments.put("x-dead-letter-exchange", "exchange.dlx"); // Set the routingKey of the dead letter exchange associated with the queue. If there is no special specification, use the routingKey of the original queue arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test"); //Define switch queue and binding relationship channel.queueDeclare("queue.biz", true, false, false, arguments); channel.queueBind("queue.biz", "exchange.biz", ""); channel.queueDeclare("queue.dlx", true, false, false, null); // Dead letter queue and dead letter exchanger channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test"); channel.basicPublish("exchange.biz", "", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx.test".getBytes()); } catch (Exception e) { e.printStackTrace(); }
2.2 SpringBoot implementation
@Configuration public class RabbitConfig { @Bean public Queue queue() { Map<String, Object> props = new HashMap<>(); // The lifetime of the message in the queue is 10s props.put("x-message-ttl", 10000); // Set the dead letter exchange associated with the queue (if the queue message is still not consumed after the TTL expires, it will be added to the dead letter queue) props.put("x-dead-letter-exchange", "ex.go.dlx"); // Set the routingKey of the dead letter exchange associated with the queue. If there is no special specification, use the routingKey of the original queue props.put("x-dead-letter-routing-key", "go.dlx"); Queue queue = new Queue("q.go", true, false, false, props); return queue; } }
Generally speaking, the dead letter queue will be used in conjunction with the TTL of RabbitMQ. After the message times out, it will be automatically routed to the dead letter queue.
3. Delay queue
In certain business scenarios, the messages we send to the message queue do not necessarily want to be consumed immediately, for example:
Buy a train ticket in 12306 and select a seat in the time period when the order has not been paid. The system will lock the seat. If the payment has not been made after the time, the system will automatically release the seat. How to achieve similar functions?
1. You can use the scheduled task to scan once every minute. If it takes more than 15 minutes, it will be released, but this is a waste of system resources.
2. The passive expiration time of distributed lock and distributed cache can be used. After 15 minutes, the lock will be released automatically, but this will occupy the resources of the system for a long time.
3. It can be realized through TTL and dead letter queue. However, TTL scanning is to scan back from the head of the queue. The first message added does not timeout, and subsequent messages will not be scanned. If the expiration time of messages in the queue is not fixed, subsequent messages will expire but still not be processed.
Therefore, the delay queue can be used. After the seat is locked successfully, a delay message will be sent to the delay switch. The delay switch polls all messages. The message will be consumed after the specified time. The consumption process is to check whether the seat is in the "paid" state;
RabbitMQ itself does not provide the function of delay queue. You can use RabbitMQ_ delayed_ message_ The biggest difference between TTL and exchange plug-in is that TTL stores messages in the dead letter queue, while TTL stores messages in the delay switch.
1. The producer sends the message (msg) and routing key to the specified delay exchange
2. The delay exchange stores messages, continuously scans all messages, waits for the message to expire, finds and binds its own queue according to the routing key, and sends the message to it
3. The queue sends the message to the customer listening to it
3.1 use of delay queue
Download plug-ins
Download address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
Install plug-ins
Copy the plug-in to the installation path of rabbitmq server
# Enable plug-ins rabbitmq-plugins list rabbitmq-plugins enable rabbitmq_delayed_message_exchange #Restart rabbitmq server systemctl restart rabbitmq-server
@Configuration public class RabbitConfig { @Bean public Exchange exchange() { Map<String, Object> props = new HashMap<>(); props.put("x-delayed-type", ExchangeTypes.FANOUT); Exchange exchange = new CustomExchange("ex.delayed", "xdelayed-message", true, false, props); return exchange; }
public class PublishController { @Autowired private AmqpTemplate rabbitTemplate; @RequestMapping("/prepare/{seconds}") public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException { // RabbitMQ only checks whether the message at the head of the queue has expired. If it has expired, it will be placed in the dead letter queue // If the first message expires for a long time, 10s, and the second message is 3s, the system will first look at the first message, wait until the first message expires, and put it into DLX // The second message will only be checked at this time, but in fact, the second message has already expired, but it is not put into DLX before the first message. // Plug in rabbitmq_delayed_message_exchange helped us with this. MessageProperties properties = new MessageProperties(); properties.setHeader("x-delay", (seconds - 10) * 1000); Message message = new Message((seconds + "Hold a sales department meeting in seconds.").getBytes("utf-8"),properties); rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",message); return "The alarm clock has been set. Let us know in advance"; } }