I'll take you to deeply analyze the principle of Redisson's implementation of distributed locks

Principle of Redis implementing distributed lock

We talked about the application of Redis in the actual business scenario. Now let's learn about the application of Redisson functional scenario, that is, the implementation scenario of distributed locks that we often use.

  • Introduce redisson dependency

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.16.0</version>
    </dependency>
    
  • Write simple test code

    public class RedissonTest {
    
        private static RedissonClient redissonClient;
    
        static {
            Config config=new Config();
            config.useSingleServer().setAddress("redis://192.168.221.128:6379");
            redissonClient=Redisson.create(config);
        }
    
        public static void main(String[] args) throws InterruptedException {
            RLock rLock=redissonClient.getLock("updateOrder");
            //Wait up to 100 seconds and unlock automatically after locking for 10 seconds
            if(rLock.tryLock(100,10,TimeUnit.SECONDS)){
                System.out.println("Lock acquisition succeeded");
            }
            Thread.sleep(2000);
            rLock.unlock();
    
            redissonClient.shutdown();
        }
    }
    
    

Implementation principle of Redisson distributed lock

You will find that the functions we need can be easily realized through redisson. Of course, this is just the tip of redisson's iceberg. The most powerful thing about redisson is that it provides common tool classes with distributed features. The toolkit originally used to coordinate concurrent programs of single machine multithreaded concurrent programs has the ability to coordinate Distributed Multi-level multithreaded concurrent systems, which reduces the difficulty for programmers to solve distributed problems in a distributed environment. The implementation principle of RedissonLock is analyzed below

RedissonLock.tryLock

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    //Try to acquire the lock through the tryAcquire method
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) { //Indicates that the lock has been obtained successfully and returns directly
        return true;
    }
	//Omit part of the code
}

tryAcquire

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //leaseTime is the lease time, which is the expiration time of the redis key.
    if (leaseTime != -1) { //If the expiration time is set
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {//If the expiration time is not set, get the key timeout from the configuration. The default is 30s expiration
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    //When the tryLockInnerAsync execution ends, the following callback is triggered
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //Description: an exception occurred and returned directly
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //Indicates that the lock key is set for the first time
            if (leaseTime != -1) { //Indicates that the timeout has been set, updates internalLockLeaseTime, and returns
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1, start the Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync

The locking operation is implemented through lua script

  1. Judge whether the lock key exists. If not, directly call hset to store the current thread information and set the expiration time. Return nil and tell the client to directly obtain the lock.

  2. Judge whether the lock key exists. If it exists, increase the number of reentries by 1, reset the expiration time, return to nil, and tell the client to obtain the lock directly.

  3. If it is locked by another thread, return the remaining time of the lock validity period and tell the client to wait.

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

We will explain the Lua script later.

unlock lock release process

The process of releasing the lock, the script looks a little more complicated

  1. If the lock key does not exist, a message is sent through the publish command to indicate that the lock is available.

  2. nil if the lock is not locked by the current thread

  3. Since reentry is supported, the reentry times need to be reduced by 1 when unlocking

  4. If the calculated number of reentries is > 0, reset the expiration time

  5. If the calculated number of reentries is < = 0, a message will be sent saying that the lock is available

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

RedissonLock has competition

In the case of competition, the lua scripts on the redis side are the same, but different redis commands are executed under different conditions. When it is found that the lock is applied by other threads through tryAcquire, it needs to enter the waiting contention logic

  1. this.await returns false, indicating that the waiting time has exceeded the maximum waiting time for obtaining the lock. Unsubscribe and return failure to obtain the lock

  2. this.await returns true and enters the loop to try to obtain the lock.

Continue to look at the code of the second half of RedissonLock.tryLock as follows:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//Omit some codes
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        current = System.currentTimeMillis();
       // Subscribe to listen to redis messages and create redisonlockentry
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
      // Block the result object waiting for the future of the subscribe. If the subscribe method call exceeds time, it indicates that the maximum wait time set by the client has been exceeded, then it directly returns false, cancels the subscription and does not continue to apply for locks.
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) { //Unsubscribe
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId); //Indicates that preemption of the lock failed
            return false; //Return false
        }
        try {
            //Judge whether it timed out. If the wait timed out, it returns that the obtained lock failed
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            //Try competing locks again through the while loop
            while (true) { 
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //Contention lock, return lock timeout
                // lock acquired
                if (ttl == null) { //If the timeout is null, the lock is obtained successfully
                    return true;
                }
                //Judge whether the timeout occurs. If the timeout occurs, the lock acquisition fails
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // Block through semaphore (shared lock) and wait for unlocking message. (reduce the frequency of requesting lock calls)
				// If the remaining time (ttl) is less than the wait time, a license is obtained from the semaphore of the Entry within the ttl time (unless interrupted or there is no license available).
				// Otherwise, it will wait within the wait time range to pass the semaphore
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                // Update latency (maximum latency - elapsed blocking time)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) { //Failed to acquire lock
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId); //Unsubscribe
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

What if the lock expires?

Generally speaking, when we obtain a distributed lock, in order to avoid deadlock, we will set a timeout for the lock. However, in one case, if the current thread does not finish executing within the specified time, and the lock is released due to the lock timeout, other threads will get the lock, resulting in some failures.

In order to avoid this situation, Redisson introduces a Watch Dog mechanism to automatically renew the lock for distributed locks. In short, if the current thread that obtains the lock has not finished executing, Redisson will automatically extend the timeout for the target key in Redis.

By default, the watchdog renewal time is 30s, which can also be specified by modifying Config.lockWatchdogTimeout.

@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);  //leaseTime=-1
}

In fact, when we do not pass the timeout through the tryLock method, a timeout of 30s will be set by default to avoid deadlock.

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) { 
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else { //When leaseTime is - 1, leaseTime=internalLockLeaseTime, which is 30s by default, indicating the expiration time of the current lock.
        
        //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //Description: an exception occurred and returned directly
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //Indicates that the lock key is set for the first time
            if (leaseTime != -1) { //Indicates that the timeout has been set, updates internalLockLeaseTime, and returns
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1, start the Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

Since an expiration time of 30s is set by default, in order to prevent the current thread from executing after expiration, the expiration time is renewed through scheduled tasks.

  • First, we will judge whether there is an entryName in the expirationRenewalMap, which is a map structure, mainly to judge whether the lock key of the locking client in this service instance exists,
  • If it already exists, return directly; The main reason is that RedissonLock is a reentrant lock.
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {// It will be called when locking for the first time, and the WatchDog will be started internally
        entry.addThreadId(threadId);
        renewExpiration();
    
    }
}

Define a timed task that calls the renewExpirationAsync method to renew the contract.

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
	//The time wheel mechanism is used
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync renewal
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//1 / 3 of the lease term at each interval

    ee.setTimeout(task);
}

Execute the Lua script to renew the specified key.

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

Lua script

Lua is an efficient lightweight scripting language (similar to JavaScript). It is written in standard C language and open in the form of source code. Its design purpose is to embed in the application, so as to provide flexible extension and customization functions for the application. Lua means "Moon" in Portuguese. Its logo form is satellite, which means that Lua is a "satellite language" and can be easily embedded into other languages; In fact, many common frameworks have the function of embedding Lua scripts, such as OpenResty, Redis, etc.

Benefits of using Lua scripts:

  1. To reduce network overhead, multiple commands can be run in the same script in Lua script

  2. For atomic operation, redis will execute the entire script as a whole without being inserted by other commands. In other words, there is no need to worry about race conditions during scripting

  3. Reusability. The scripts sent by the client will always be stored in redis, which means that other clients can reuse this pin to complete the same logic

Download and installation of Lua

Lua is an independent scripting language, so it has a special compilation and execution tool. Let's take you to install it.

If an error is reported, saying that readline/readline.h cannot be found, you can install it using the yum command

yum -y install readline-devel ncurses-devel

Finally, you can enter the Lua console directly by entering the Lua command. Lua script has its own syntax, variables, logical operators, functions, etc. I won't explain it too much here. Students who have used JavaScript should only spend a few hours to learn all of them. A simple demonstration of two cases is as follows.

array = {"Lua", "mic"}
for i= 0, 2 do
   print(array[i])
end
array = {"mic", "redis"}

for key,value in ipairs(array)
do
   print(key, value)
end

Redis and Lua

Redis integrates Lua compiler and executor, so we can define Lua script in redis to execute. At the same time, in Lua script, you can directly call redis commands to operate the data in redis.

redis.call('set','hello','world')

local value=redis.call('get','hello') 

The return value of the redis.call function is the execution result of the redis command. As described earlier, the types of returned values of the five types of data in redis are also different. The redis.call function will convert the return values of these five types into the corresponding Lua data types

In many cases, we need the script to have a return value. After all, this script is also a command set we have written. We can call our own script like calling other redis built-in commands, so redis will automatically convert the Lua data type of the script return value into the redis return value type. In the script, you can use the return statement to return the value to the redis client and execute it through the return statement. If you do not execute return, the default return is nil.

Execute Lua script related commands in Redis

After writing the script, the most important thing is to execute the script in the program. Redis provides EVAL commands that enable developers to call scripts like other redis built-in commands.

EVAL command - execute script

[EVAL] [script content] [number of key parameters] [key...] [arg...]

You can pass data to the script through the two parameters key and arg, and their values can use KEYS and ARGV respectively in the script   These two types of global variable access.

For example, we implement a set command by script, and call it through the redis client.

eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello

The above script is equivalent to using Lua script to call Redis's set command and store a key=lua, value=hello in Redis.

EVALSHA command

Considering that we execute lua script through Eval, when the script is long, we need to pass the whole script to redis every time we call the script, which takes up more bandwidth. To solve this problem, redis provides the EVALSHA command, which allows developers to execute scripts through the SHA1 summary of script content. The usage of this command is the same as Eval, except that it replaces the script content with the SHA1 summary of the script content

  1. Redis will calculate the SHA1 summary of the script and record it in the script cache when executing the EVAL command

  2. When executing the EVALSHA command, Redis will find the corresponding script content from the script cache according to the provided summary. If found, execute the script. Otherwise, it returns "NOSCRIPT No matching script,Please use EVAL"

# Add the script to the cache and generate the sha1 command
script load "return redis.call('get','lua')"
# ["13bd040587b891aedc00a72458cbf8588a27df90"]
# Pass the value of sha1 to execute the command
evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0

Redisson executes Lua script

Implement an access frequency restriction function through lua script.

The idea is to define a key, which contains the ip address. value is the number of accesses within the specified time, for example, only 3 accesses in 10 seconds.

  • Define Lua scripts.

    local times=redis.call('incr',KEYS[1])
    -- If it is the first time to come in, set an expiration time
    if times == 1 then
       redis.call('expire',KEYS[1],ARGV[1])
    end
    -- If the number of accesses is greater than the specified number within the specified time, 0 is returned, indicating that access is restricted
    if times > tonumber(ARGV[2]) then
       return 0
    end
    -- Return 1 to allow access
    return 1
    
  • Define the controller and provide access test methods

    @RestController
    public class RedissonController {
        @Autowired
        RedissonClient redissonClient;
    
        private final String LIMIT_LUA=
            "local times=redis.call('incr',KEYS[1])\n" +
            "if times == 1 then\n" +
            "   redis.call('expire',KEYS[1],ARGV[1])\n" +
            "end\n" +
            "if times > tonumber(ARGV[2]) then\n" +
            "   return 0\n" +
            "end\n" +
            "return 1";
    
        @GetMapping("/lua/{id}")
        public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException {
            List<Object> keys= Arrays.asList("LIMIT:"+id);
            RFuture<Object> future=redissonClient.getScript().
                evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);
            return future.get().toString();
        }
    
    }
    

It should be noted that there will be problems during the execution of the above script, because the default serialization method of redis causes the value to be converted into an object type when passed to the script. It is necessary to modify the redisson.yml file and increase the serialization method of codec.

  • application.yml

    spring:
      redis:
        redisson:
          file: classpath:redisson.yml
    
  • redisson.yml

    singleServerConfig:
      address: redis://192.168.221.128:6379
    
    codec: !<org.redisson.codec.JsonJacksonCodec> {}
    

Atomicity of Lua script

Redis's script execution is atomic, that is, redis will not execute other commands during script execution. All commands must wait for the script to execute. To prevent a script execution time process from causing redis to fail to provide services. Redis provides the Lua time limit parameter to limit the maximum running time of the script. The default is 5 seconds.

Non transactional operations

When the running time of the script exceeds this limit, Redis will start to accept other commands but will not execute them (to ensure the atomicity of the script), but return a BUSY error. This is demonstrated below.

Open two client windows and execute the loop of lua script in the first window

eval "while true do end" 0

Run get lua in the second window and you will get the following exception.

(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.

We will find that the execution result is Busy, and then we use script kill   The command of terminates the currently executed script, and the display of the second window returns to normal.

There are transactional operations

If the currently executed Lua script modifies Redis data (SET, DEL, etc.), the SCRIPT KILL Command cannot terminate the script operation, because the atomicity of the script operation should be guaranteed. If the script is partially terminated, it violates the requirements of script atomicity. Finally, make sure that the scripts are either executed or not executed

Also open two windows. The first window runs the following command

eval "redis.call('set','name','mic') while true do end" 0

Run in the second window

get lua

The result is the same. It is still busy, but at this time, through the script kill Command, you will find an error and can't kill.

(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.

In this case, you can only forcibly terminate redis through the shutdown nosave command.

The difference between shutdown nosave and shutdown is that shutdown nosave does not perform persistence, which means that database modifications after the last snapshot will be lost.

Redisson's Lua script

After learning about Lua, it's not difficult to understand Redisson's Lua script.

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

Pub/Sub mechanism in Redis

The following is the code for releasing locks in Redisson. In the code, we found a publish instruction redis.call('publish', KEYS[2], ARGV[1]). What is this instruction for?

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Redis provides a set of commands to enable developers to implement the publish / subscribe mode. This mode can also realize message transmission between processes. Its implementation principle is as follows:

  • The publish / subscribe mode contains two roles: publisher and subscriber. Subscribers can subscribe to one or more channels, while publishers can send messages to the specified channels, which will be received by all subscribers subscribing to this channel

  • The command of the publisher to PUBLISH a message is PUBLISH. The usage is

    PUBLISH channel message
    

    For example, send a message to channel.1: hello

    PUBLISH channel.1 "hello"
    

In this way, the message is sent. The return value of the command indicates the number of subscribers who received the message. Because no subscriber has subscribed to the channel when this command is executed, the return is 0. In addition, it is worth noting that the message will not be persistent when it is sent. If there is no subscriber before sending, subsequent subscribers will subscribe to the channel, and the previous message will not be received

The commands for subscribers to subscribe to messages are:

SUBSCRIBE channel [channel ...]

This command can SUBSCRIBE to multiple channels at the same time, such as SUBSCRIBE channel.1. After executing the SUBSCRIBE command, the client will enter the subscription state.

In general, we will not use pub/sub as the message sending mechanism. After all, there are so many MQ technologies.

 

Recent hot article reading

A junior college student entered Tencent in a counter attack. It's really a little cow flying. The cow is forced to heaven!

Summary: InnoDB lock types and their detailed analysis

How should relational database be optimized under ten million level concurrent architecture?  

An in-depth analysis of the persistence mechanism of Redis6 (concise and easy to understand)

Keywords: Java Redis Distribution

Added by ajaybuilder on Tue, 26 Oct 2021 14:04:48 +0300