Redisson distributed lock implementation principle and source code

Redisson distributed lock implementation principle and source code

Source code analysis

Simple business code

It is mainly responsible for the source code entry. There are three main methods to use distributed locks

  1. RLock lock = redissonClient. Getlock ("HPC lock") gets the class that implements reentrant distributed locks
  2. lock. Lock
  3. lock. Unlock

    @GetMapping("/redis/lock")  
    public ResResult testDistributedLock() {  
      RLock lock = redissonClient.getLock("hpc-lock");  
      lock.lock();  
      try {  
             System.out.println("Locking service, xxx, xxx, xxxx");  
      } finally {  
             lock.unlock();  
      }  
      return new ResResult(true, "");  
    }

Source code analysis

Gets the class that implements the reentrant distributed lock

redissonClient. getLock ("HPC lock"), this method mainly obtains the class that implements the distributed reentrant lock and enters the getLock method

@Override  
public RLock getLock(String name) {  
    return new RedissonLock(commandExecutor, name);  
}

It is found that the RedissonLock class is initialized and the constructor method is traced

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {  
    super(commandExecutor, name);  
    this.commandExecutor = commandExecutor;
    // Through the next line of code and entering the trace, it is found that the internal lock lease time of the default thread is the default 30s,
    // That is, the automatic release lock after 30s
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();  
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();  
}

Lock logic

lock.lock() the main function of this method is to lock, enter the lock() method, trace down to find the core logic and find the method org redisson. Redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) and view the core logic.
The core logic mainly has three points: try to lock, the thread that fails to lock subscribes to Redis messages, and the thread that fails to lock obtains the lock through spin

Try locking logic

First look at org redisson. The attempt locking part of redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) method is as follows

long threadId = Thread.currentThread().getId();  
// Acquire lock
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);  
// If the lock is obtained successfully, it will be returned directly
if (ttl == null) {  
    return;  
}

Enter the tryAcquire(-1, leaseTime, unit, threadId) method and keep tracking down to find the core logic org redisson. The redissonlock#tryacquireasync method body is as follows. This method mainly has several logics: adding a lock and extending the life of the lock

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {  
    RFuture<Long> ttlRemainingFuture;  
    // Acquire lock
    if (leaseTime != -1) {  
        // Lock failure time
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);  
    } else {  
        // Use default lock expiration time
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,  
 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);  
    }  
    // Callback this method after locking
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {  
        // Locking succeeded
        if (ttlRemaining == null) {  
            if (leaseTime != -1) {  
                internalLockLeaseTime = unit.toMillis(leaseTime);  
            } else {  
                // If no lock expiration time is passed in, that is, the default lock expiration time is used when locking
                // After the lock is successfully added, the life of the lock is continued
                scheduleExpirationRenewal(threadId);  
            }  
        }  
        return ttlRemaining;  
    });  
    return new CompletableFutureWrapper<>(f);  
}
  1. First, enter the core logic trylockinnerasync (waittime, internallockleasetime, timeunit.milliseconds, ThreadID, rediscommands. Eval_long) of locking. The method body is as follows:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {  
        // Execute lua script on redis
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,  
             "if (redis.call('exists', KEYS[1]) == 0) then " +  
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
                        "return nil; " +  
                        "end; " +  
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
                        "return nil; " +  
                        "end; " +  
                        "return redis.call('pttl', KEYS[1]);",  
             Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));  
    }

    What matters here is the logic of lua script. The logic of lua script is briefly explained below

    • Judge whether there is data with the key value of getrawname () in redis. (Note: the value obtained by getRawName() here is the business method RLOCK lock = redissonclient Parameter passed in getlock ("HPC lock") (HPC lock)
    • If there is no key value, save the key value to redis, and the value value used for the key pair is 1; At the same time, the expiration time of the key value is set to unit To millis (leasetime) (if the user does not transmit the expiration time here, the default 30s is generally adopted, which has been analyzed in the previous source code analysis of redisonclient.getlock ("HPC lock"), and returns a null value.
    • If there is a key value, increase the value of the key value by 1, and reset the expiration time of the key value. The expiration time is set to unit To millis (leasetime), and return null value
    • If there are other cases, return the remaining expiration time of the key value. If the key value does not exist, return - 2. If the key value does not have an expiration time, return - 1 (this is return redis.call('pttl', KEYS[1]) in Lua script)
      Note: Lua scripts are inherently atomic in redis. Redis will not perform other operations until Lua scripts are executed. Therefore, there is no need to worry about the concurrency of lua scripts.
  2. When the lock is obtained successfully, it will enter the callback method for the logic of lock life renewal, and enter the core method scheduleExpirationRenewal(threadId). The method body is as follows:

    protected void scheduleExpirationRenewal(long threadId) {  
        ExpirationEntry entry = new ExpirationEntry();  
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);  
        if (oldEntry != null) {  
            oldEntry.addThreadId(threadId);  
        } else {  
            entry.addThreadId(threadId);  
            try {  
                // Renewal
                renewExpiration();  
            } finally {  
                // When the thread is interrupted, cancel the renewal. This is analyzed below and will not be discussed now
                if (Thread.currentThread().isInterrupted()) {  
                    cancelExpirationRenewal(threadId);  
                }  
            }  
        }  
    }

    First, look at the renewal code renewExpiration(). This method has a lot of content and only looks at the core part. The value of internalLockLeaseTime has been analyzed above. If the user does not pass the valid period of the key value, it defaults to 30s. You can see from the following code that the task is executed once in 10s on average, that is, the thread is renewed once in 10s after locking.

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {  
            @Override  
         public void run(Timeout timeout) throws Exception {  
                //......
    
                // Renew the contract
                RFuture<Boolean> future = renewExpirationAsync(threadId);  
                // Callback after contract renewal
                future.whenComplete((res, e) -> {  
                    if (e != null) {  
                        log.error("Can't update lock " + getRawName() + " expiration", e);  
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());  
                        return; 
                    }  
                      
                    if (res) {  
                        // After successful execution, call yourself back
                        // reschedule itself  
                        renewExpiration();  
                    } else {  
                        // Close renewal
                        cancelExpirationRenewal(null);  
                    }  
                });  
         }  
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    • Continue to track the core method renewExpirationAsync(threadId) to see the core logic of the thread renewal

      protected RFuture<Boolean> renewExpirationAsync(long threadId) {  
          return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +  
                          "return 1; " +  
                          "end; " +  
                          "return 0;",  
               Collections.singletonList(getRawName()),  
               internalLockLeaseTime, getLockName(threadId));  
      }

      It can be seen that the Lua script is still executed. The logic of the script is that if the key value exists (still locking / still executing the business in the lock), reset the expiration time of the key value in Redis, internalLockLeaseTime(30s by default, analyzed earlier), and return 1; If it is detected that the key value does not exist in Redis, 0 will be returned directly.

    • Look at the method cancelExpirationRenewal(null), and close the renewal method

      protected void cancelExpirationRenewal(Long threadId) {  
          ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());  
           if (task == null) {  
                  return;  
           }  
            
          if (threadId != null) {  
              task.removeThreadId(threadId);  
           }  
        
          if (threadId == null || task.hasNoThreads()) {  
              Timeout timeout = task.getTimeout();  
               if (timeout != null) { 
                      // If there is a scheduled task for renewal, close it directly
                      timeout.cancel();  
               }  
              EXPIRATION_RENEWAL_MAP.remove(getEntryName());  
          }  
      }
Threads that are not locked successfully subscribe to Redis messages

Back to org redisson. In the redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) method, through the above analysis of the locking logic, we can see that if the locking is successful, it will be returned directly, but the unlocked result will continue to execute the code downward.
Let's take a look at the core code of successful thread subscription to Redis message without locking:

// Subscribe to messages
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);  
if (interruptibly) {  
    commandExecutor.syncSubscriptionInterrupted(future);  
} else {  
    commandExecutor.syncSubscription(future);  
}

Continue to enter the code subscribe(threadId) and continue tracking. Check the following core logic. It is found that the thread that has not obtained the lock listens for the received information through the semaphore

public CompletableFuture<E> subscribe(String entryName, String channelName) {  
    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));  
    CompletableFuture<E> newPromise = new CompletableFuture<>();  
  
    int timeout = service.getConnectionManager().getConfig().getTimeout();  
    Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> {  
        newPromise.completeExceptionally(new RedisTimeoutException(  
                "Unable to acquire subscription lock after " + timeout + "ms. " +  
                        "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));  
         }, timeout, TimeUnit.MILLISECONDS);  

     semaphore.acquire(() -> {  
        // ......
     });  
  
     return newPromise;  
}
The thread that failed to lock acquires the lock through spin

Back to org redisson. In the redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) method, take a look at the code for obtaining the lock through spin as follows

try {  
    while (true) {  
        // Attempt to acquire lock
        ttl = tryAcquire(-1, leaseTime, unit, threadId);  
         // If the lock is obtained successfully, it will jump out of the loop directly  
         if (ttl == null) {  
                break;  
         }  
  
        // Waiting for semaphores, 
         if (ttl >= 0) {  
            try {  
                commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
             } catch (InterruptedException e) {  
                if (interruptibly) {  
                    throw e;  
                 }  
                commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);  
             }  
        } else {  
            if (interruptibly) {  
                commandExecutor.getNow(future).getLatch().acquire();  
             } else {  
                commandExecutor.getNow(future).getLatch().acquireUninterruptibly();  
             }  
        }  
    }  
} finally {  
    // Unsubscribe
    unsubscribe(commandExecutor.getNow(future), threadId);  
}

Logic for releasing locks

Lock in business code Unlock () is the code used to unlock and trace down. The core method code is as follows

@Override  
public RFuture<Void> unlockAsync(long threadId) {  
    // Release lock code
    RFuture<Boolean> future = unlockInnerAsync(threadId);  
  
     CompletionStage<Void> f = future.handle((opStatus, e) -> {  
        // Cancellation of renewal upon expiration, as mentioned above, will not be explained here
        cancelExpirationRenewal(threadId);  
  
         if (e != null) {  
                throw new CompletionException(e);  
         }  
        if (opStatus == null) {  
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "  
     + id + " thread-id: " + threadId);  
             throw new CompletionException(cause);  
         }  
      
        return null;  
     });  
  
     return new CompletableFutureWrapper<>(f);  
}

The above method has two main codes: release the lock and cancel the renewal upon expiration. The method of cancelExpirationRenewal(threadId) is described above. Here, we mainly analyze the method of releasing the lock code and enter the method unlockInnerAsync(threadId)

protected RFuture<Boolean> unlockInnerAsync(long threadId) {  
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,  
         "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  
                    "return nil;" +  
                    "end; " +  
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +  
                    "if (counter > 0) then " +  
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +  
                    "return 0; " +  
                    "else " +  
                    "redis.call('del', KEYS[1]); " +  
                    "redis.call('publish', KEYS[2], ARGV[1]); " +  
                    "return 1; " +  
                    "end; " +  
                    "return nil;",  
        Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));  
}

Here is mainly the Lua script executed. The logic of lua script is as follows:

  • If the key value does not exist, null is returned directly
  • Directly subtract 1 from the value of the key value, and then obtain the value of value. If the value is still greater than 0, reset the expiration time of the key value, and then return 0; If the value is not greater than 0, delete the key value directly, publish the subscription message, and return 1
  • In other cases, null is returned directly

Keywords: Redis Distributed lock redisson

Added by PallaviDalvi on Fri, 04 Mar 2022 21:10:59 +0200