Recently, readers have encountered a problem with Alipay:
If an order is generated without payment for 30 minutes, it will be cancelled automatically. How can I do this?
In fact, such delayed tasks are often required in development, such as
-
Generate order 30 minutes unpaid, cancel automatically
-
Send a text message to the user 60 seconds after the order is generated
For the above tasks, we give a professional name to describe it as delayed tasks.
So this raises the question, what is the difference between a delayed task and a timed task?
There are three differences:
-
Timed tasks have clear trigger times, delayed tasks do not
-
Timed tasks have execution cycles, and delayed tasks execute within a period of time after an event triggers, without execution cycles
-
Timed tasks typically perform batch operations with multiple tasks, while delayed tasks typically perform a single task
Next, let's do a scenario analysis to see if the order is timed out.
Scheme Analysis
1) Database Polling
thinking
This scheme is typically used in small projects, where a thread periodically scans the database, determines if there is a timed-out order by the order time, and update s or delete s the order.
Realization
Internship session, I used quartz to achieve, a brief introduction.
The maven project introduces a dependency as follows
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.2</version> </dependency>
Call Demo class MyJob:
public class MyJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("It's time to scan the database."); } public static void main(String[] args) throws Exception { // Create Task JobDetail jobDetail = JobBuilder.newJob(MyJob.class) .withIdentity("job1", "group1").build(); // Create trigger Execute every 3 seconds Trigger trigger = TriggerBuilder .newTrigger() .withIdentity("trigger1", "group3") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); // Put the task and its triggers in the scheduler scheduler.scheduleJob(jobDetail, trigger); // Scheduler Starts Scheduling Tasks scheduler.start(); } }
Run the code, you can see that every 3 seconds, the output is as follows:
It's time to scan the database.
Advantages: Easy to use, supports cluster operations
Disadvantages:
-
High server memory consumption
-
There is a delay, like if you scan every 3 minutes, then the worst delay is 3 minutes
-
Assuming you have tens of millions of orders, such a scan every few minutes can cause a lot of database loss
2) Delayed queue of JDK
thinking
Implemented using the DelayQueue that comes with JDK, this is an unbounded blocking queue from which elements can only be retrieved when the delay expires. Objects placed in DelayQueue must implement the Delayed interface.
The DelayedQueue implementation workflow is as follows:
-
Poll(): Gets and removes the timeout element of the queue, returns NULL if none
-
take(): Gets and removes the timeout element of the queue, and if not wait s the current thread until an element meets the timeout condition and returns the result.
Realization
Define a class OrderDelay to implement Delayed:
public class OrderDelay implements Delayed { private String orderId; private long timeout; OrderDelay(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout + System.nanoTime(); } public int compareTo(Delayed other) { if (other == this) return 0; OrderDelay t = (OrderDelay) other; long d = (getDelay(TimeUnit.NANOSECONDS) - t .getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } // How much time is left to return to your custom timeout public long getDelay(TimeUnit unit) { return unit.convert(timeout - System.nanoTime(),TimeUnit.NANOSECONDS); } void print() { System.out.println(orderId+"Numbered orders have to be deleted."); } }
Test class Demo, we set a delay time of 3 seconds:
public class DelayQueueDemo { public static void main(String[] args) { List<String> list = new ArrayList<String>(); list.add("00000001"); list.add("00000002"); list.add("00000003"); list.add("00000004"); list.add("00000005"); DelayQueue<OrderDelay> queue = newDelayQueue<OrderDelay>(); long start = System.currentTimeMillis(); for(int i = 0;i<5;i++){ //Delay removal by three seconds queue.put(new OrderDelay(list.get(i), TimeUnit.NANOSECONDS.convert(3,TimeUnit.SECONDS))); try { queue.take().print(); System.out.println("After " + (System.currentTimeMillis()-start) + " MilliSeconds"); } catch (InterruptedException e) {} } } }
The output is as follows:
00000001 Numbered orders have to be deleted. After 3003 MilliSeconds 00000002 Numbered orders have to be deleted. After 6006 MilliSeconds 00000003 Numbered orders have to be deleted. After 9006 MilliSeconds 00000004 Numbered orders have to be deleted. After 12008 MilliSeconds 00000005 Numbered orders have to be deleted. After 15009 MilliSeconds
You can see that the order was deleted with a delay of 3 seconds.
Advantages: High efficiency, low task trigger time delay.
Disadvantages:
-
After the server restarts, all data disappears for fear of downtime
-
Cluster expansion is cumbersome
-
OOM exceptions can easily occur due to memory constraints, such as too many unpaid orders placed
-
Code complexity is high
3) Time-wheel algorithm
thinking
Start with the last time wheel chart:
Time-wheel algorithms can be analogous to clocks, such as the arrows (pointers) in the figure above that rotate at a fixed frequency in one direction, with each beat called a tick.
This allows you to see that the timer wheel consists of three important attribute parameters:
-
Ticks PerWheel (tick s in a round)
-
tickDuration (duration of a tick)
-
timeUnit
For example, when ticksPerWheel=60, tickDuration=1, timeUnit=seconds, this is exactly like the constant second hand movement in reality.
If the current pointer is above 1 and I have a task that takes 4 seconds to execute, the thread callback or message for that execution will be placed on 5. So what if you need to do after 20 seconds, since the number of slots in this ring structure is only 8, if you want 20 seconds, the pointer needs to rotate 2 more turns. Position is above 5 after 2 laps (20% 8 + 1)
Realization
We do this using Netty's HashedWheelTimer.
Add the following dependencies to pom.xml:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.24.Final</version> </dependency>
Test code HashedWheelTimerTest:
public class HashedWheelTimerTest { static class MyTimerTask implements TimerTask{ boolean flag; public MyTimerTask(boolean flag){ this.flag = flag; } public void run(Timeout timeout) throws Exception { System.out.println("To delete the order from the database."); this.flag =false; } } public static void main(String[] argv) { MyTimerTask timerTask = new MyTimerTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); int i = 1; while(timerTask.flag){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i+"Seconds have passed"); i++; } } }
The output is as follows:
1 Seconds have passed 2 Seconds have passed 3 Seconds have passed 4 Seconds have passed 5 Seconds have passed To delete the order from the database. 6 Seconds have passed
Benefits: High efficiency, lower task trigger delay time than delayQueue, lower code complexity than delayQueue.
Disadvantages:
-
After the server restarts, all data disappears for fear of downtime
-
Cluster expansion is cumbersome
-
OOM exceptions can easily occur due to memory constraints, such as too many unpaid orders placed
4) redis cache
Idea One
Utilize the zset of redis. zset is an ordered set, each element (member) is associated with a score, and the values in the set are sorted by score.
-
Add element: ZADD key score member [[score member] [score member]...]
-
Query elements sequentially: ZRANGE key start stop [WITHSCORES]
-
Query element score:ZSCORE key member
-
Remove element: ZREM key member [member...]
The tests are as follows:
Add a single element redis> ZADD page_rank 10 google.com (integer) 1 Add multiple elements redis> ZADD page_rank 9 baidu.com 8 bing.com (integer) 2 redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9" 5) "google.com" 6) "10" Query Element's score value redis> ZSCORE page_rank bing.com "8" Remove a single element redis> ZREM page_rank google.com (integer) 1 redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9"
So how? We set the order timeout stamp and order number to score and member, respectively. The system scans the first element to determine if it has timed out, as shown in the following figure:
Realize One
public class AppTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); public static Jedis getJedis() { return jedisPool.getResource(); } //Producer, generate 5 orders to put in public void productionDelayMessage(){ for(int i=0;i<5;i++){ //Delay 3 seconds Calendar cal1 = Calendar.getInstance(); cal1.add(Calendar.SECOND, 3); int second3later = (int) (cal1.getTimeInMillis() / 1000); AppTest.getJedis().zadd("OrderId",second3later,"OID0000001"+i); System.out.println(System.currentTimeMillis()+"ms:redis An order task was generated: Order ID by"+"OID0000001"+i); } } //Consumers, take orders public void consumerDelayMessage(){ Jedis jedis = AppTest.getJedis(); while(true){ Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1); if(items == null || items.isEmpty()){ System.out.println("There are currently no pending tasks"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } continue; } int score = (int) ((Tuple)items.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int) (cal.getTimeInMillis() / 1000); if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(System.currentTimeMillis() +"ms:redis Consumed a task: consumed orders OrderId by"+orderId); } } } public static void main(String[] args) { AppTest appTest =new AppTest(); appTest.productionDelayMessage(); appTest.consumerDelayMessage(); } }
At this point corresponding output:
As you can see, almost all consumer orders are in 3 seconds.
However, there is a fatal hard injury in this version. In high concurrency conditions, multiple consumers will get the same order number. We tested the code ThreadTest:
public class ThreadTest { private static final int threadNum = 10; private static CountDownLatch cdl = newCountDownLatch(threadNum); static class DelayMessage implements Runnable{ public void run() { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } AppTest appTest =new AppTest(); appTest.consumerDelayMessage(); } } public static void main(String[] args) { AppTest appTest =new AppTest(); appTest.productionDelayMessage(); for(int i=0;i<threadNum;i++){ new Thread(new DelayMessage()).start(); cdl.countDown(); } } }` The output is as follows: ![](https://upload-images.jianshu.io/upload_images/1179389-ca3e56dd26dfaf92.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) It is clear that multiple threads are consuming the same resource. **Solution** - With distributed locks, but with distributed locks, performance degrades, which is not detailed in this scenario. - Yes ZREM To make a judgment, only when the value is greater than 0 will the data be consumed, so the consumerDelayMessage()Method ```java if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(System.currentTimeMillis()+"ms:redis Consumed a task: consumed orders OrderId by"+orderId); }
Modify to:
if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); Long num = jedis.zrem("OrderId", orderId); if( num != null && num>0){ System.out.println(System.currentTimeMillis()+"ms:redis Consumed a task: consumed orders OrderId by"+orderId); } }
After this modification, rerun the ThreadTest class and find that the output is normal.
Idea Two
This scheme uses the Keyspace Notifications of redis, which means Chinese translation is the key space mechanism by which a callback can be provided after the key fails. In fact, redis sends a message to the client. Yes, redis version 2.8 or higher is required.
Realization Two
In redis.conf, add a configuration:
notify-keyspace-events Ex
The code to run is as follows:
public class RedisTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedis = new JedisPool(ADDR, PORT); private static RedisSub sub = new RedisSub(); public static void init() { new Thread(new Runnable() { public void run() { jedis.getResource().subscribe(sub, "__keyevent@0__:expired"); } }).start(); } public static void main(String[] args) throws InterruptedException { init(); for(int i =0;i<10;i++){ String orderId = "OID000000"+i; jedis.getResource().setex(orderId, 3, orderId); System.out.println(System.currentTimeMillis()+"ms:"+orderId+"Order Generation"); } } static class RedisSub extends JedisPubSub { public void onMessage(String channel, String message) { System.out.println(System.currentTimeMillis()+"ms:"+message+"Order Cancellation"); } } }
The output is as follows:
You can clearly see that the order was cancelled after 3 seconds.
However, there is a hard injury in the pub/sub mechanism of redis. The content of the official website is as follows
Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.
Simple translation: Redis's publish/subscription is currently in fire and forget mode, so reliable notification of events cannot be achieved. That is, if the publishing/subscribing client is disconnected and then reconnected, all events during the client disconnection are lost.
Therefore, scenario two is not recommended. Of course, if you don't require high reliability, you can use it.
Advantage:
-
Because Redis is used as the message channel, messages are stored in Redis. If the sender or task handler hangs up, there is also the possibility to reprocess the data after the restart.
-
Cluster expansion is quite convenient
-
High Time Accuracy
Disadvantages: Additional redis maintenance is required
5) Use Message Queuing
A delayed queue of rabbitMQ can be used. RabbitMQ has two features that enable delayed queues:
-
RabbitMQ can set x-message-tt for Queue and Mesage to control the Message's lifetime. If it times out, the Message becomes dead letter
-
The Queue of lRabbitMQ can configure two parameters, x-dead-letter-exchange and x-dead-letter-routing-key (optional), to control the occurrence of deadletter s in the queue and reroute according to these two parameters.
Combining these two features, you can simulate the ability to delay messages. Specifically, I'll write another article another day. To go on here, it's too long.
Benefits: Efficient, easy to scale horizontally using the distributed nature of rabbitmq, message support persistence increases reliability.
Disadvantages: Its ease of use depends on the operation of rabbitMq, which is more complex and costly because rabbitMq is referenced.
Original address: Alipay 2 Side: If an order is generated for 30 minutes without payment, it will be cancelled automatically. How can this be achieved?