[ruoyi Vue plus] learning note 15 - adding queue data to Redisson Delayed Queue (Redisson source code + Lua script)

preface

During this time, I was making up the video of Spring Cloud, which was originally intended to prepare for learning [ruoyi cloud plus]. Then I was urged by the lion leader in the group today (covering my face). The leader said that we can analyze the new function of [ruoyi Vue plus] version 4.0, Redisson distributed queue.

In fact, before it was officially released, I looked at this content in the dev branch, but the source code was all Lua scripts, so I silently turned it off

However, I supplemented some knowledge about Redis 6 some time ago, which talked about the content of Lua script. Although it was relatively simple, it was still able to understand it. I took some time to find some information and found that I could understand it slowly, so I hurried to write this blog. Of course, I just started from a relatively simple delay queue, Other queues will continue to be analyzed later.

Reference catalogue

code implementation

demo of official document (relatively simple):

Framework implementation:

  • Add queue data

DelayedQueueController#add

QueueUtils#addDelayedQueueObject

Function call

technological process:
1. Log in to the front page, get the token, and set the global parameters of the interface document.

2, calling the delay queue subscription queue interface in the interface document

3, call the delay queue add queue data interface in the interface document.

4. View console parameter print

Source code analysis

1. New delay queue

Referring to the QueueUtils#addDelayedQueueObject method, we can see that before adding data, we first call the getDelayedQueue method to obtain a delay queue.

RedissonDelayedQueue#RedissonDelayedQueue


2. Add data

3. Lua script analysis

The main operation steps of these two methods are completed by Lua script. It is easy to be confused to read the script at the beginning. If you want to understand it, you can directly see what Redis does first by means of reverse derivation.

Operation mode:
1. Redis cli opens the redis console.
If you are local to windows like me, you can directly open redis cli exe.

2. Enter the command minitor to monitor Redis operations.

3. Directly call the add queue data interface, and you can see the following output:

The diagram here is not very intuitive, so I sorted it out separately. Some time parameters may be different, but the overall process is the same.

The overall implementation is divided into two parts: the first part is to add data, and the second part is to obtain data.

Part I

Output result:

1645518012.326477 [0 127.0.0.1:50852] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "zlyx" "redisson_delay_queue_timeout:{zlyx}" "redisson_delay_queue:{zlyx}" "redisson_delay_queue_channel:{zlyx}" "1645518017326" "2476180431764309577" "\"0001\""

1645518012.326784 [0 lua] "zadd" "redisson_delay_queue_timeout:{zlyx}" "1645518017326" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\""

1645518012.326868 [0 lua] "rpush" "redisson_delay_queue:{zlyx}" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\""

1645518012.326925 [0 lua] "zrange" "redisson_delay_queue_timeout:{zlyx}" "0" "0"

1645518012.326988 [0 lua] "publish" "redisson_delay_queue_channel:{zlyx}" "1645518017326"

Parameter analysis:

Comparison table of method parameters called by script:

Script parameter nameJava parameter nameParameter value
KEYS[1]getRawName()"zlyx"
KEYS[2]timeoutSetName"redisson_delay_queue_timeout:{zlyx}"
KEYS[3]queueName"redisson_delay_queue:{zlyx}"
KEYS[4]channelName"redisson_delay_queue_channel:{zlyx}"
ARGV[1]timeout"1645518017326"
ARGV[2]randomId"2476180431764309577"
ARGV[3]encode(e)"\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00"0001""

Script analysis:

# struct.pack is used for data packaging, struct Pack (format string, data to be packed 1, data to be packed 2...), where the content information of the packing queue is value
local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]); 

# Add the element value to the delay queue of sorted set, with key as "timeoutSetName", and "timeout" as the delay time
redis.call('zadd', KEYS[2], ARGV[1], value);

## Convert to simple command: "zadd" "timeoutSetName" "timeout" value
## Actual output: "zadd" "reisson_delay_queue_timeout: {zlyx}" "1645518017326" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ ""

# Add the element value to the List. The key is queueName. Add it from the right. This is an ordinary List
redis.call('rpush', KEYS[3], value);

## Convert to simple command: "rpush" queueName value
## Actual output: "rpush" "reisson_delay_queue: {zlyx}" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ ""

# The sorted set of the delay queue (key is "timeoutSetName") is sorted from small to large, and the first element is taken out
local v = redis.call('zrange', KEYS[2], 0, 0); 

## Convert to simple command: "zrange" "timeoutSetName" "0" "0"
## Actual output: "zrange" "reisson_delay_queue_timeout: {zlyx}" "0" "0"

if v[1] == value 
    then 
    # If the first element is a newly added element, publish it to "channelName" in channel
    redis.call('publish', KEYS[4], ARGV[1]); 
end;

## Convert to simple command: "publish" "channelName" "timeout"
## Actual output: "publish" "reisson_delay_queue_channel: {zlyx}" "1645518017326"

Part II

Output result:

1645518017.426758 [0 127.0.0.1:50849] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "zlyx" "redisson_delay_queue_timeout:{zlyx}" "redisson_delay_queue:{zlyx}" "1645518017426" "100"

1645518017.426955 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{zlyx}" "0" "1645518017426" "limit" "0" "100"

1645518017.426997 [0 lua] "rpush" "zlyx" "\"0001\""

1645518017.427013 [0 lua] "lrem" "redisson_delay_queue:{zlyx}" "1" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\""

1645518017.427033 [0 lua] "zrem" "redisson_delay_queue_timeout:{zlyx}" "\xe3\x8a\xdd}\x94.\xc1C\x06\x00\x00\x00\"0001\""

1645518017.427054 [0 lua] "zrange" "redisson_delay_queue_timeout:{zlyx}" "0" "0" "WITHSCORES"

1645518017.427970 [0 127.0.0.1:50853] "BLPOP" "zlyx" "0"

Parameter analysis:

Comparison table of method parameters called by script:

Script parameter nameJava parameter nameParameter value
KEYS[1]getRawName()"zlyx"
KEYS[2]timeoutSetName"redisson_delay_queue_timeout:{zlyx}"
KEYS[3]queueName"redisson_delay_queue:{zlyx}"
ARGV[1]System.currentTimeMillis()"1645518017326"
ARGV[2]100100

Script analysis:

## Sort the Sorted Set by score from small to large, and take out 100 pieces of data (expired data) less than the current time
local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); 

## Convert to a simple command: "zrangebyscore" "timeoutSetName" "0" "timeout" "limit" "0" "100"
## Actual output: "zrangebyscore" "reisson_delay_queue_timeout: {zlyx}" "0" "1645518017426" "limit" "0" "100"

if #expiredValues > 0 
    then 
    ## Loop traversal
    for i, v in ipairs(expiredValues)
    ## struct.unpack is used for data unpacking, struct Unpack (format string, byte array to be unpacked, position to start unpacking) to get the data v
    do local randomId, value = struct.unpack('dLc0', v);
	
        ## queueName added to blocking queue
        redis.call('rpush', KEYS[1], value);
		
## Convert to simple command: "rpush" getRawName() value
## Actual output: "rpush" "zlyx" "\"0001 \ ""
		
        ## Delete the queueName queue (List) element v
        redis.call('lrem', KEYS[3], 1, v);
		
## Convert to simple command: "lrem" queueName "1" v
## Actual output: "lrem" "reisson_delay_queue: {zlyx}" "1" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ ""
		
    end; 
	
    ## Delete the element in the Sorted Set
    redis.call('zrem', KEYS[2], unpack(expiredValues));    
	
## Convert to simple command: "zrem" timeoutSetName unpack(expiredValues)
## Actual output: "zrem" "reisson_delay_queue_timeout: {zlyx}" "\ xe3 \ x8a \ XDD} \ x94. \ xc1c \ X06 \ X00 \ X00 \ X00 \" 0001 \ ""
	
end; 

## Take out the element with the smallest score in the Sorted Set and return
local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); 

## Convert to simple command: "zrange" timeoutSetName "0" "0" "WITHSCORES"
## Actual output: "zrange" "reisson_delay_queue_timeout: {zlyx}" "0" "0" "withscores"

if v[1] ~= nil 
    then return v[2];     
end 
return nil;

Keywords: Java Back-end lua redisson

Added by SapAuthor on Tue, 22 Feb 2022 13:27:39 +0200