preface
During this time, I was making up the video of Spring Cloud, which was originally intended to prepare for learning [ruoyi cloud plus]. Then I was urged by the lion leader in the group today (covering my face). The leader said that we can analyze the new function of [ruoyi Vue plus] version 4.0, Redisson distributed queue.
In fact, before it was officially released, I looked at this content in the dev branch, but the source code was all Lua scripts, so I silently turned it off
However, I supplemented some knowledge about Redis 6 some time ago, which talked about the content of Lua script. Although it was relatively simple, it was still able to understand it. I took some time to find some information and found that I could understand it slowly, so I hurried to write this blog. Of course, I just started from a relatively simple delay queue, Other queues will continue to be analyzed later.
Reference catalogue
- Redisson official document - 7.15 Delayed Queue
- Microservice application (19): Redisson DelayedQueue implementation of delay queue
A very detailed blog, this article is also based on this re combination framework for sorting and analysis. - Talk about the DelayedQueue of reisson
code implementation
demo of official document (relatively simple):
Framework implementation:
- Add queue data
DelayedQueueController#add
QueueUtils#addDelayedQueueObject
Function call
technological process:
1. Log in to the front page, get the token, and set the global parameters of the interface document.
2, calling the delay queue subscription queue interface in the interface document
3, call the delay queue add queue data interface in the interface document.
4. View console parameter print
Source code analysis
1. New delay queue
Referring to the QueueUtils#addDelayedQueueObject method, we can see that before adding data, we first call the getDelayedQueue method to obtain a delay queue.
RedissonDelayedQueue#RedissonDelayedQueue
2. Add data
3. Lua script analysis
The main operation steps of these two methods are completed by Lua script. It is easy to be confused to read the script at the beginning. If you want to understand it, you can directly see what Redis does first by means of reverse derivation.
Operation mode:
1. Redis cli opens the redis console.
If you are local to windows like me, you can directly open redis cli exe.
2. Enter the command minitor to monitor Redis operations.
3. Directly call the add queue data interface, and you can see the following output:
The diagram here is not very intuitive, so I sorted it out separately. Some time parameters may be different, but the overall process is the same.
The overall implementation is divided into two parts: the first part is to add data, and the second part is to obtain data.
Part I
Output result:
1645518012.326477 [0 127.0.0.1:50852] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "zlyx" "redisson_delay_queue_timeout:{zlyx}" "redisson_delay_queue:{zlyx}" "redisson_delay_queue_channel:{zlyx}" "1645518017326" "2476180431764309577" "\"0001\"" 1645518012.326784 [0 lua] "zadd" "redisson_delay_queue_timeout:{zlyx}" "1645518017326" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\"" 1645518012.326868 [0 lua] "rpush" "redisson_delay_queue:{zlyx}" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\"" 1645518012.326925 [0 lua] "zrange" "redisson_delay_queue_timeout:{zlyx}" "0" "0" 1645518012.326988 [0 lua] "publish" "redisson_delay_queue_channel:{zlyx}" "1645518017326"
Parameter analysis:
Comparison table of method parameters called by script:
Script parameter name | Java parameter name | Parameter value |
---|---|---|
KEYS[1] | getRawName() | "zlyx" |
KEYS[2] | timeoutSetName | "redisson_delay_queue_timeout:{zlyx}" |
KEYS[3] | queueName | "redisson_delay_queue:{zlyx}" |
KEYS[4] | channelName | "redisson_delay_queue_channel:{zlyx}" |
ARGV[1] | timeout | "1645518017326" |
ARGV[2] | randomId | "2476180431764309577" |
ARGV[3] | encode(e) | "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00"0001"" |
Script analysis:
# struct.pack is used for data packaging, struct Pack (format string, data to be packed 1, data to be packed 2...), where the content information of the packing queue is value local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]); # Add the element value to the delay queue of sorted set, with key as "timeoutSetName", and "timeout" as the delay time redis.call('zadd', KEYS[2], ARGV[1], value); ## Convert to simple command: "zadd" "timeoutSetName" "timeout" value ## Actual output: "zadd" "reisson_delay_queue_timeout: {zlyx}" "1645518017326" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ "" # Add the element value to the List. The key is queueName. Add it from the right. This is an ordinary List redis.call('rpush', KEYS[3], value); ## Convert to simple command: "rpush" queueName value ## Actual output: "rpush" "reisson_delay_queue: {zlyx}" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ "" # The sorted set of the delay queue (key is "timeoutSetName") is sorted from small to large, and the first element is taken out local v = redis.call('zrange', KEYS[2], 0, 0); ## Convert to simple command: "zrange" "timeoutSetName" "0" "0" ## Actual output: "zrange" "reisson_delay_queue_timeout: {zlyx}" "0" "0" if v[1] == value then # If the first element is a newly added element, publish it to "channelName" in channel redis.call('publish', KEYS[4], ARGV[1]); end; ## Convert to simple command: "publish" "channelName" "timeout" ## Actual output: "publish" "reisson_delay_queue_channel: {zlyx}" "1645518017326"
Part II
Output result:
1645518017.426758 [0 127.0.0.1:50849] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "zlyx" "redisson_delay_queue_timeout:{zlyx}" "redisson_delay_queue:{zlyx}" "1645518017426" "100" 1645518017.426955 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{zlyx}" "0" "1645518017426" "limit" "0" "100" 1645518017.426997 [0 lua] "rpush" "zlyx" "\"0001\"" 1645518017.427013 [0 lua] "lrem" "redisson_delay_queue:{zlyx}" "1" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\"" 1645518017.427033 [0 lua] "zrem" "redisson_delay_queue_timeout:{zlyx}" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\"" 1645518017.427054 [0 lua] "zrange" "redisson_delay_queue_timeout:{zlyx}" "0" "0" "WITHSCORES" 1645518017.427970 [0 127.0.0.1:50853] "BLPOP" "zlyx" "0"
Parameter analysis:
Comparison table of method parameters called by script:
Script parameter name | Java parameter name | Parameter value |
---|---|---|
KEYS[1] | getRawName() | "zlyx" |
KEYS[2] | timeoutSetName | "redisson_delay_queue_timeout:{zlyx}" |
KEYS[3] | queueName | "redisson_delay_queue:{zlyx}" |
ARGV[1] | System.currentTimeMillis() | "1645518017326" |
ARGV[2] | 100 | 100 |
Script analysis:
## Sort the Sorted Set by score from small to large, and take out 100 pieces of data (expired data) less than the current time local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); ## Convert to a simple command: "zrangebyscore" "timeoutSetName" "0" "timeout" "limit" "0" "100" ## Actual output: "zrangebyscore" "reisson_delay_queue_timeout: {zlyx}" "0" "1645518017426" "limit" "0" "100" if #expiredValues > 0 then ## Loop traversal for i, v in ipairs(expiredValues) ## struct.unpack is used for data unpacking, struct Unpack (format string, byte array to be unpacked, position to start unpacking) to get the data v do local randomId, value = struct.unpack('dLc0', v); ## queueName added to blocking queue redis.call('rpush', KEYS[1], value); ## Convert to simple command: "rpush" getRawName() value ## Actual output: "rpush" "zlyx" "\"0001 \ "" ## Delete the queueName queue (List) element v redis.call('lrem', KEYS[3], 1, v); ## Convert to simple command: "lrem" queueName "1" v ## Actual output: "lrem" "reisson_delay_queue: {zlyx}" "1" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ "" end; ## Delete the element in the Sorted Set redis.call('zrem', KEYS[2], unpack(expiredValues)); ## Convert to simple command: "zrem" timeoutSetName unpack(expiredValues) ## Actual output: "zrem" "reisson_delay_queue_timeout: {zlyx}" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ "" end; ## Take out the element with the smallest score in the Sorted Set and return local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); ## Convert to simple command: "zrange" timeoutSetName "0" "0" "WITHSCORES" ## Actual output: "zrange" "reisson_delay_queue_timeout: {zlyx}" "0" "0" "withscores" if v[1] ~= nil then return v[2]; end return nil;