preface
- The previous article described the delay queue of JDK[ DelayQueue delay processing task record]
- In the actual distributed project, the delay queue of JDK is not used for the delay task. Because it is based on JVM memory storage and has no persistence operation, the task will be lost after the service is restarted.
- In the project, MQ dead letter queue or reisson delay queue can be used to process delay tasks. This article will describe the demo of reisson delay queue and its execution source code.
demo example
- Create a simple springboot project through the scaffold, introduce the maven dependency of redisson, and simply configure the redisson connection properties.
<!-- redisson quote --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.6</version> </dependency> @Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; /** * Get redissonClient instance * * @return * @throws Exception */ @Bean public RedissonClient getRedisson() { Config config = new Config(); String address = "redis://" + host + ":" + port; config.useSingleServer().setAddress(address); return Redisson.create(config); } } Copy code
- Define a reisson delay queue insertion and acquisition task processing class, redisson queuehandle, which enables independent threads to obtain delayed tasks by controlling the bean loading cycle of spring.
- Here, three methods are used to obtain the delayed task. Except the first blocking method, the other two methods do not obtain the task according to the percentage according to the delay parameter, because they obtain the delayed task periodically at time intervals.
/** * redisson Delay queue processor * * @author zrh */ @Slf4j @Component public class RedissonQueueHandle implements InitializingBean { private final RBlockingQueue<RedisDataEntity<?>> queue; private final RDelayedQueue<RedisDataEntity<?>> delayedQueue; public RedissonQueueHandle (RedissonClient client) { this.queue = client.getBlockingQueue("redisson:queue"); this.delayedQueue = client.getDelayedQueue(queue); } @Override public void afterPropertiesSet () { // Open a thread blocking acquisition task thread(); // Get tasks using netty time wheel loop // watchDog(new HashedWheelTimer()); // Get tasks using thread pool timing // schedule(); } private void thread () { new Thread(() -> { while (true) { try { RedisDataEntity entity = queue.take(); log.info("Data obtained this time:{},Time consuming:{}", entity, System.currentTimeMillis() - entity.getTime()); } catch (Exception e) { } } }, "zrh").start(); } private void watchDog (final HashedWheelTimer timer) { timer.newTimeout(timeout -> { RedisDataEntity entity = queue.poll(); if (null != entity) { log.info("Data obtained this time:{},Time consuming:{}", entity, System.currentTimeMillis() - entity.getTime()); } watchDog(timer); }, 3, TimeUnit.SECONDS); } private void schedule () { Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> { RedisDataEntity entity = queue.poll(); if (null != entity) { log.info("Data obtained this time:{},Time consuming:{}", entity, System.currentTimeMillis() - entity.getTime()); } }, 5, 5, TimeUnit.SECONDS); } /** * Add redis and expire regularly * * @param entity */ public void offer (RedisDataEntity entity) { try { delayedQueue.offer(entity, entity.getExpire(), TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("Put redis Delay queue exception", e); } } } Copy code
- The redisson delay queue can be either a string or an object RedisDataEntity. Because there are IO disk storage operations, the Serializable serialization interface must be implemented.
/** * @Author: ZRH * @Date: 2022/1/10 11:54 */ @Data public class RedisDataEntity<T> implements Serializable { /** * data */ private final T data; /** * Expiration time in milliseconds */ private final Long expire; /** * Add time */ private final Long time; public RedisDataEntity (T data, Long expire, Long time) { this.data = data; this.expire = expire; this.time = time; } } Copy code
- Then open a plug-in data interface:
/** * @Author: ZRH * @Date: 2022/1/10 11:45 */ @Slf4j @RestController public class IndexController { private final RedissonQueueHandle redisHandle; public IndexController (RedissonQueueHandle redisHandle) { this.redisHandle = redisHandle; } @PostMapping("redissonQueue") public String redissonQueue (@RequestParam String data, @RequestParam Long expire) { RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis()); log.info("Data added this time:{}", entity); redisHandle.offer(entity); return "ok"; } } Access interface setting delay 30 seconds: http://localhost:8802/redissonQueue? Data = A & expire = 30000, the print results are as follows 2022-01-14 14:21:52.140 INFO 10808 --- [nio-8802-exec-1] c.r.web.controller.IndexController : Data added this time: RedisDataEntity(data=a, expire=30000, time=1642141312135) 2022-01-14 14:21:52.887 INFO 10808 --- [nio-8802-exec-2] c.r.web.controller.IndexController : Data added this time: RedisDataEntity(data=a, expire=30000, time=1642141312887) 2022-01-14 14:22:22.240 INFO 10808 --- [ zrh] c.r.web.redis.RedissonQueueHandle : Data obtained this time: RedisDataEntity(data=a, expire=30000, time=1642141312135),Time: 30105 2022-01-14 14:22:22.914 INFO 10808 --- [ zrh] c.r.web.redis.RedissonQueueHandle : Data obtained this time: RedisDataEntity(data=a, expire=30000, time=1642141312887),Time: 30027 Copy code
Source code analysis of initial execution process
- The redisson delay queue ultimately interacts with the redis service. You can use the monitor command to view which commands are executed in redis, which is very helpful to understand its execution process.
- The above figure shows several instructions sent to redis when the project is started
- "SUBSCRIBE": SUBSCRIBE to the queue "reisson_delay_queue_channel: {reisson: queue}", in which a scheduled task obtains data through the queue
- "zrangebyscore": get the top 100 elements in the "reisson_delay_queue_timeout: {reisson: queue}" collection whose sorting score value is between 0 and 1642148406748 (current timestamp)
- "zrange": get the first element in the "reisson_delay_queue_timeout: {reisson: queue}" set, which is used to get the expiration time of the next element
- "BLPOP": take out and remove the first element in the "reisson: queue" list. If there is no element, it will always wait for blocking. So it'll be blocked here
- "rpush": if the instruction "zrangebyscore" obtains an element, it pushes the element to the queue reisson: queue
- "lrem": if the instruction "zrangebyscore" obtains the element, delete the first element with element v in the queue "reisson_delay_queue: {reisson: queue}"
SUBSCRIBE instruction
- Enter the constructor of RedissonDelayedQueue delay queue, which contains the lua script command for executing the above instructions (in order not to affect the length, some codes are deleted, the same below):
...... protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); // list structure, which is used to delay the subscription and publication of the queue channelName = prefixName("redisson_delay_queue_channel", getRawName()); // list structure to store the original order of elements queueName = prefixName("redisson_delay_queue", getRawName()); // zset structure, which stores the unexpired elements and arranges them according to the expiration time timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName()); QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } }; queueTransferService.schedule(queueName, task); this.queueTransferService = queueTransferService; } Copy code
- Continue to follow up on queuetransferservice Schedule (queuename, task) method, because it enters the tasks collection for the first time, it finally executes the start() method:
...... private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>(); public synchronized void schedule(String name, QueueTransferTask task) { QueueTransferTask oldTask = tasks.putIfAbsent(name, task); if (oldTask == null) { task.start(); } else { oldTask.incUsage(); } } Copy code
- Enter QueueTransferTask and continue to follow up schedulertopic addListener(...) method:
...... private int messageListenerId; private int statusListenerId; public void start() { RTopic schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); } Copy code
- Then it will enter publishsubscribeservice subscribe(...) method:
- Note: continue to call the overloaded method subscribe(...) Parameter: pubsubtype SUBSCRIBE
...... public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) { return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), listeners); } private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, RedisPubSubListener<?>... listeners) { RPromise<PubSubConnectionEntry> promise = new RedissonPromise<>(); AsyncSemaphore lock = getSemaphore(channelName); // Create a thread task and put it into the lock object lock.acquire(() -> { if (promise.isDone()) { lock.release(); return; } subscribe(codec, channelName, entry, promise, type, lock, listeners); }); return promise; } Copy code
- acquire(...) of AsyncSemaphore object Method will put the thread task into its own queue listeners, and then read and execute the thread task in turn;
public class AsyncSemaphore { private final AtomicInteger counter; private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>(); public void acquire(Runnable listener) { listeners.add(listener); tryRun(); } private void tryRun() { if (counter.decrementAndGet() >= 0) { Runnable listener = listeners.poll(); if (listener == null) { counter.incrementAndGet(); return; } listener.run(); } else { if (counter.incrementAndGet() > 0) { tryRun(); } } } } Copy code
- Then continue to follow up the method subscribe(codec, channelName, entry, promise, type, lock, listeners):
..... private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry)); if (connEntry != null) { addListeners(channelName, promise, type, lock, connEntry, listeners); return; } freePubSubLock.acquire(() -> { if (promise.isDone()) { lock.release(); freePubSubLock.release(); return; } MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry); // The first entry into the entry2PubSubConnection collection is null, so the default value is used. Finally, freeEntry == null PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry()); PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek(); if (freeEntry == null) { freePubSubLock.release(); connect(codec, channelName, msEntry, promise, type, lock, listeners); return; } ...... }); } Copy code
- Continue to follow up the method connect(codec, channelName, msEntry, promise, type, lock, listeners):
...... private void connect(Codec codec, ChannelName channelName, MasterSlaveEntry msEntry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) { RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(msEntry, channelName); promise.onComplete((res, e) -> {...}); connFuture.onComplete((conn, ex) -> { if (ex != null) {...} freePubSubLock.acquire(() -> { PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); int remainFreeAmount = entry.tryAcquire(); PubSubKey key = new PubSubKey(channelName, msEntry); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry); if (oldEntry != null) {...} if (remainFreeAmount > 0) { addFreeConnectionEntry(channelName, entry); } freePubSubLock.release(); RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners); ChannelFuture future; // It can be seen from the parameters passed by the above overload method. Finally, use else logic if (PubSubType.PSUBSCRIBE == type) { future = entry.psubscribe(codec, channelName); } else { future = entry.subscribe(codec, channelName); } future.addListener((ChannelFutureListener) future1 -> { if (!future1.isSuccess()) {...} connectionManager.newTimeout(timeout -> subscribeFuture.cancel(false), config.getTimeout(), TimeUnit.MILLISECONDS); }); }); }); } Copy code
- The content of the branch line in this method is not expressed. It mainly depends on the method entry SUBSCRIBE (codec, channelname), and finally enter redispubsubconnection async(...) Method is the process of sending SUBSCRIBE instruction:
zrangebyscore and zrange instructions
- After the subscription instruction SUBSCRIBE is issued, it is displayed in queuetransfertask When the listener added in the start () method is triggered, it will execute pushTask()
- When the pushTaskAsync() method is executed (lua script is executed), a scheduled task scheduleTask() will be started
...... protected abstract RTopic getTopic(); protected abstract RFuture<Long> pushTaskAsync(); private void pushTask() { // This abstract method is implemented in the constructor of the RedissonDelayedQueue object built before, and finally returns the element expiration time RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.onComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } }); } Copy code
BLPOP instruction
- After the RedissonDelayedQueue is constructed, it will call the take() method of the delay queue to get the delayed task, and then enter redissonblockingqueue Takeasync() method:
...... @Override public RFuture<V> takeAsync() { return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0); } /* * (non-Javadoc) * @see java.util.concurrent.BlockingQueue#take() */ @Override public V take() throws InterruptedException { return commandExecutor.getInterrupted(takeAsync()); } ...... Copy code
- Note that the value of the parameter here is BLPOP. Obviously, this is related to the BLPOP instruction we are looking for, so this is actually the blocking value obtained by the client through the BLPOP instruction. Open a thread on the client to block the acquisition of elements all the time;
- Look at the source code and continue down to commandasyncservice writeAsync(...) Method, and then continue down to redisexecutor Execute() method:
...... public void execute() { if (mainPromise.isCancelled()) {...} if (!connectionManager.getShutdownLatch().acquire()) {...} codec = getCodec(codec); // Get connection RFuture<RedisConnection> connectionFuture = getConnection(); RPromise<R> attemptPromise = new RedissonPromise<>(); mainPromiseListener = (r, e) -> {...}; if (attempt == 0) {...} scheduleRetryTimeout(connectionFuture, attemptPromise); connectionFuture.onComplete((connection, e) -> { if (connectionFuture.isCancelled()) {...} if (!connectionFuture.isSuccess()) {...} // Execute the current method when the connection is successfully obtained sendCommand(attemptPromise, connection); writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { checkWriteFuture(writeFuture, attemptPromise, connection); } }); }); attemptPromise.onComplete((r, e) -> {...}); } Copy code
- Some branch methods in this method are not shown in the table below. There is a timeout retry mechanism in the middle. The use of netty's time wheel is not the focus, so I won't express it.
- First obtain the write operation connection object task, and then enter the method sendCommand(attemptPromise, connection) to send instructions
- Instruction: "BLPOP", parameter: "reisson: queue" "0"
offer add task process source code analysis
- After the project is started, add a delay task to redis and view the instructions executed in redis:
- Then follow up the offer method of inserting the element and enter redissondelayedqueue In the offerasync() method, as follows:
...... @Override public void offer(V e, long delay, TimeUnit timeUnit) { get(offerAsync(e, delay, timeUnit)); } @Override public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { if (delay < 0) { throw new IllegalArgumentException("Delay can't be negative"); } long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = ThreadLocalRandom.current().nextLong(); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); } Copy code
- Obviously, a long string of script commands are the instructions executed in redis. The basic process is relatively simple:
- "zadd": This is to add element data (this data has been processed, regardless of its structure) to the zset set "reisson_delay_queue_timeout: {reisson: queue}". The sorting value is the current timestamp + delay time
- "rpush": push the element data to the list queue "reisson: queue"
- "zrange": get the ordered first element in the zset set "reisson_delay_queue_timeout: {reisson: queue}"
- "publish": if the element obtained above is the element inserted this time, the notification queue "reisson_delay_queue_channel: {reisson: queue}" will be published. The content is the expiration time of the current element. This is to reduce the time difference between the expiration of this element.
Finally, timer source code analysis
- The timer task is mainly to execute pushTask() and scheduleTask(...) when the listener listens that there is a new client subscription or element notification published method:
...... private int messageListenerId; private int statusListenerId; public void start() { RTopic schedulerTopic = getTopic(); // When a new client subscribes to schedulerTopic, the pushTask() method is triggered statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); // When redis has a new message notification, it will trigger scheduleTask(...) Method, startTime is the expiration time of the element notified by publish in the above messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); } Copy code
- The pushTask() method is used to operate the redis delay queue. scheduleTask(...) The netty time wheel controls the calling of the pushTask() method, so pushTask() and scheduleTask() call each other.
...... private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) {...} if (oldTimeout != null) {...} long delay = startTime - System.currentTimeMillis(); if (delay > 10) { Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); } } protected abstract RTopic getTopic(); protected abstract RFuture<Long> pushTaskAsync(); private void pushTask() { RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.onComplete((res, e) -> { if (e != null) { if (e instanceof RedissonShutdownException) { return; } log.error(e.getMessage(), e); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (res != null) { scheduleTask(res); } }); } Copy code
- Summary:
- When a new client subscribes, it calls the pushTask() method to pull the data into the blocking queue.
- When a message with a letter is published, scheduleTask(...) is called Method, and judge whether to call the pushTask() method through the time wheel delay or immediately according to its expiration time.
last
- The source code of redisson delay queue is relatively abstract and complex. I feel that it is easy to analyze without its distributed lock.
- However, if you look carefully and follow the main methods, you can still understand the implementation process. Study with an open mind and make common progress-_-