Redisson distributed lock implementation principle and source code
Source code analysis
Simple business code
It is mainly responsible for the source code entry. There are three main methods to use distributed locks
- RLock lock = redissonClient. Getlock ("HPC lock") gets the class that implements reentrant distributed locks
- lock. Lock
lock. Unlock
@GetMapping("/redis/lock") public ResResult testDistributedLock() { RLock lock = redissonClient.getLock("hpc-lock"); lock.lock(); try { System.out.println("Locking service, xxx, xxx, xxxx"); } finally { lock.unlock(); } return new ResResult(true, ""); }
Source code analysis
Gets the class that implements the reentrant distributed lock
redissonClient. getLock ("HPC lock"), this method mainly obtains the class that implements the distributed reentrant lock and enters the getLock method
@Override public RLock getLock(String name) { return new RedissonLock(commandExecutor, name); }
It is found that the RedissonLock class is initialized and the constructor method is traced
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; // Through the next line of code and entering the trace, it is found that the internal lock lease time of the default thread is the default 30s, // That is, the automatic release lock after 30s this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
Lock logic
lock.lock() the main function of this method is to lock, enter the lock() method, trace down to find the core logic and find the method org redisson. Redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) and view the core logic.
The core logic mainly has three points: try to lock, the thread that fails to lock subscribes to Redis messages, and the thread that fails to lock obtains the lock through spin
Try locking logic
First look at org redisson. The attempt locking part of redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) method is as follows
long threadId = Thread.currentThread().getId(); // Acquire lock Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // If the lock is obtained successfully, it will be returned directly if (ttl == null) { return; }
Enter the tryAcquire(-1, leaseTime, unit, threadId) method and keep tracking down to find the core logic org redisson. The redissonlock#tryacquireasync method body is as follows. This method mainly has several logics: adding a lock and extending the life of the lock
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; // Acquire lock if (leaseTime != -1) { // Lock failure time ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // Use default lock expiration time ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } // Callback this method after locking CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // Locking succeeded if (ttlRemaining == null) { if (leaseTime != -1) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { // If no lock expiration time is passed in, that is, the default lock expiration time is used when locking // After the lock is successfully added, the life of the lock is continued scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
First, enter the core logic trylockinnerasync (waittime, internallockleasetime, timeunit.milliseconds, ThreadID, rediscommands. Eval_long) of locking. The method body is as follows:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // Execute lua script on redis return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }
What matters here is the logic of lua script. The logic of lua script is briefly explained below
- Judge whether there is data with the key value of getrawname () in redis. (Note: the value obtained by getRawName() here is the business method RLOCK lock = redissonclient Parameter passed in getlock ("HPC lock") (HPC lock)
- If there is no key value, save the key value to redis, and the value value used for the key pair is 1; At the same time, the expiration time of the key value is set to unit To millis (leasetime) (if the user does not transmit the expiration time here, the default 30s is generally adopted, which has been analyzed in the previous source code analysis of redisonclient.getlock ("HPC lock"), and returns a null value.
- If there is a key value, increase the value of the key value by 1, and reset the expiration time of the key value. The expiration time is set to unit To millis (leasetime), and return null value
- If there are other cases, return the remaining expiration time of the key value. If the key value does not exist, return - 2. If the key value does not have an expiration time, return - 1 (this is return redis.call('pttl', KEYS[1]) in Lua script)
Note: Lua scripts are inherently atomic in redis. Redis will not perform other operations until Lua scripts are executed. Therefore, there is no need to worry about the concurrency of lua scripts.
When the lock is obtained successfully, it will enter the callback method for the logic of lock life renewal, and enter the core method scheduleExpirationRenewal(threadId). The method body is as follows:
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { // Renewal renewExpiration(); } finally { // When the thread is interrupted, cancel the renewal. This is analyzed below and will not be discussed now if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } }
First, look at the renewal code renewExpiration(). This method has a lot of content and only looks at the core part. The value of internalLockLeaseTime has been analyzed above. If the user does not pass the valid period of the key value, it defaults to 30s. You can see from the following code that the task is executed once in 10s on average, that is, the thread is renewed once in 10s after locking.
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { //...... // Renew the contract RFuture<Boolean> future = renewExpirationAsync(threadId); // Callback after contract renewal future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // After successful execution, call yourself back // reschedule itself renewExpiration(); } else { // Close renewal cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
Continue to track the core method renewExpirationAsync(threadId) to see the core logic of the thread renewal
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
It can be seen that the Lua script is still executed. The logic of the script is that if the key value exists (still locking / still executing the business in the lock), reset the expiration time of the key value in Redis, internalLockLeaseTime(30s by default, analyzed earlier), and return 1; If it is detected that the key value does not exist in Redis, 0 will be returned directly.
Look at the method cancelExpirationRenewal(null), and close the renewal method
protected void cancelExpirationRenewal(Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { // If there is a scheduled task for renewal, close it directly timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
Threads that are not locked successfully subscribe to Redis messages
Back to org redisson. In the redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) method, through the above analysis of the locking logic, we can see that if the locking is successful, it will be returned directly, but the unlocked result will continue to execute the code downward.
Let's take a look at the core code of successful thread subscription to Redis message without locking:
// Subscribe to messages CompletableFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); }
Continue to enter the code subscribe(threadId) and continue tracking. Check the following core logic. It is found that the thread that has not obtained the lock listens for the received information through the semaphore
public CompletableFuture<E> subscribe(String entryName, String channelName) { AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); CompletableFuture<E> newPromise = new CompletableFuture<>(); int timeout = service.getConnectionManager().getConfig().getTimeout(); Timeout lockTimeout = service.getConnectionManager().newTimeout(t -> { newPromise.completeExceptionally(new RedisTimeoutException( "Unable to acquire subscription lock after " + timeout + "ms. " + "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); }, timeout, TimeUnit.MILLISECONDS); semaphore.acquire(() -> { // ...... }); return newPromise; }
The thread that failed to lock acquires the lock through spin
Back to org redisson. In the redissonlock #lock (long, Java. Util. Concurrent. Timeunit, Boolean) method, take a look at the code for obtaining the lock through spin as follows
try { while (true) { // Attempt to acquire lock ttl = tryAcquire(-1, leaseTime, unit, threadId); // If the lock is obtained successfully, it will jump out of the loop directly if (ttl == null) { break; } // Waiting for semaphores, if (ttl >= 0) { try { commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { commandExecutor.getNow(future).getLatch().acquire(); } else { commandExecutor.getNow(future).getLatch().acquireUninterruptibly(); } } } } finally { // Unsubscribe unsubscribe(commandExecutor.getNow(future), threadId); }
Logic for releasing locks
Lock in business code Unlock () is the code used to unlock and trace down. The core method code is as follows
@Override public RFuture<Void> unlockAsync(long threadId) { // Release lock code RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { // Cancellation of renewal upon expiration, as mentioned above, will not be explained here cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f); }
The above method has two main codes: release the lock and cancel the renewal upon expiration. The method of cancelExpirationRenewal(threadId) is described above. Here, we mainly analyze the method of releasing the lock code and enter the method unlockInnerAsync(threadId)
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
Here is mainly the Lua script executed. The logic of lua script is as follows:
- If the key value does not exist, null is returned directly
- Directly subtract 1 from the value of the key value, and then obtain the value of value. If the value is still greater than 0, reset the expiration time of the key value, and then return 0; If the value is not greater than 0, delete the key value directly, publish the subscription message, and return 1
- In other cases, null is returned directly