Springboot Redis Distributed Re-entrainable Lock Based on Redisson [Case to Source Analysis]

Python WeChat Subscription Applet Course Video

https://edu.csdn.net/course/detail/36074

Python Actual Quantitative Transaction Finance System

https://edu.csdn.net/course/detail/35475

1. Preface

We are implementing distributed locks using Redis, starting with SET resource-name anystring NX EX max-lock-time, and releasing locks using Lua scripts to guarantee atomicity. This manual implementation is cumbersome, and the Redis website makes it clear that the Java version uses Redisson. Xiao Bian also saw the official website to slowly explore clearly, close-up this record. From the official website to the integration of Springboot to source code interpretation, taking a single node as an example, the understanding of the small edition is in the notes, I hope it can help you!!

2. Why use Redisson

1. Let's open the website

Chinese official website in redis

2. We can see that the authorities let us use the other

3. Open Official Recommendations

4. Find Documents

Redisson address

5. Redisson structure

3. Springboot Integration Redisson

1. Import Dependency

<dependency>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-data-redisartifactId>
dependency>
<dependency>
    <groupId>redis.clientsgroupId>
    <artifactId>jedisartifactId>
dependency>

<dependency>
    <groupId>org.redissongroupId>
    <artifactId>redissonartifactId>
    <version>3.12.0version>
dependency>

2. Take the official website as an example to see how to configure it


3. Write configuration classes

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author wangzhenjun
 * @date 2022/2/9 9:57
 */
@Configuration
public class MyRedissonConfig {

    /**
 * All use of redisson is handled through the RedissonClient
 * @return
 */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson(){
        // 1. Create Configuration
        Config config = new Config();
        // Be sure to add redis://
        config.useSingleServer().setAddress("redis://192.168.17.130:6379");
        // 2. Create a redissonClient instance from config
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }
}

4. Examples of official website test locking

5. Write according to the simple Controller interface of the official website

@ResponseBody
@GetMapping("/hello")
public String hello(){
    // 1. Acquire a lock, the same lock as long as the lock name is the same
    RLock lock = redisson.getLock("my-lock");
    // 2. Locking
    lock.lock();// Blocking Trial Waiting for Default Add is 30s
    // Case with parameters
    // lock.lock(10, TimeUnit.SECONDS);// 10s auto-unlock, the auto-unlock time must be greater than the execution time of the business.
    try {
        System.out.println("Lock Success" + Thread.currentThread().getId());
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 3. Unlock
        System.out.println("Unlock succeeded:" + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}

6. Testing

4. lock.lock() source code analysis

1. Open the RedissonLock implementation class

2. Find implementation

@Override
public void lock() {
    try {
    	// We found that the default expiration time for non-expiration time sources is -1
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

3. Hold Ctrl in to lock method

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
	// Gets the id of the thread and the value of field when holding the lock is UUID:Thread number id
    long threadId = Thread.currentThread().getId();
    // Attempting to acquire a lock
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired gets the lock, returns
    if (ttl == null) {
        return;
    }
	// This means that if acquiring a lock fails, the lock is subscribed to through the thread id
    RFuture future = subscribe(threadId);
 if (interruptibly) {
 commandExecutor.syncSubscriptionInterrupted(future);
 } else {
 commandExecutor.syncSubscription(future);
 }

 try {
 // Spin here, keep trying to get the lock
 while (true) {
 // Continue trying to acquire locks
 ttl = tryAcquire(leaseTime, unit, threadId);
 // lock acquired succeeded
 if (ttl == null) {
 // Return directly, pick the spin
 break;
 }

 // Wait for message to continue waiting for lock
 if (ttl >= 0) {
 try {
 future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
 if (interruptibly) {
 throw e;
 }
 future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
 }
 } else {
 if (interruptibly) {
 future.getNow().getLatch().acquire();
 } else {
 future.getNow().getLatch().acquireUninterruptibly();
 }
 }
 }
 } finally {
 // unsubscribe
 unsubscribe(future, threadId);
 }
// get(lockAsync(leaseTime, unit));
}

4. Go in and try to get the lock method

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
	// Direct Into Asynchronous Method
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
 // Here you can tell if the parameter leaseTime = -1 is not set
 if (leaseTime != -1) {
 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL\_LONG);
 }
 // This method acquires the lock and the expiration time is the default time for the watchdog
 // private long lockWatchdogTimeout = 30 * 1000; Watchdog default expiration time is 30s
 // Lock and expiration times must be atomic. This method must be called after the execution of the Lua script. Let's see below
 RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL\_LONG);
 // Open a timed task to refresh the expiration time continuously
 ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
 if (e != null) {
 return;
 }

 // lock acquired gets lock
 if (ttlRemaining == null) {
 // Refresh expiration time method, let's go into more detail next
 scheduleExpirationRenewal(threadId);
 }
 });
 return ttlRemainingFuture;
}

5. View tryLockInnerAsync() method

 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
 // First determine if a lock exists
 "if (redis.call('exists', KEYS[1]) == 0) then " +
 // Acquire locks if present
 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
 // Then set the expiration time
 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 "return nil; " +
 "end; " +
 // hexists checks to see if the specified field of the hash table exists, locks exist, and locks are held by the current thread
 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
 // hincrby adds one
 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
 // The value of a lock is greater than 1, indicating that it is re-entrainable and that the reset expiration time is
 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 "return nil; " +
 "end; " +
 // Lock already exists and is not the thread, then expiration time TL is returned
 "return redis.call('pttl', KEYS[1]);",
 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

6. Enter the scheduleExpirationRenewal() method of the remaining timed task

Step by step to find the source: scheduleExpirationRenewal - >renewExpiration

According to the following source code, the timer task refresh time is: internalLockLeaseTime / 3, which is 1/3 of the watchdog, that is, refresh once for 10s

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture future = renewExpirationAsync(threadId);
 future.onComplete((res, e) -> {
 if (e != null) {
 log.error("Can't update lock " + getName() + " expiration", e);
 return;
 }
 
 if (res) {
 // reschedule itself
 renewExpiration();
 }
 });
 }
 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
 
 ee.setTimeout(task);
}

5. lock.lock(10, TimeUnit.SECONDS) Source Analysis

1. Open the implementation class

@Override
public void lock(long leaseTime, TimeUnit unit) {
    try {
    	// The expiration time here is the 10 we entered
        lock(leaseTime, unit, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

2. Method lock() implementation display, same as 3.3 source code

3. Go directly to the try to get the lock tryAcquireAsync() method

private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
 // Let's make a judgement here that if the parameter leaseTime = -1 is not set, then we're 10
 if (leaseTime != -1) {
 // Come to this method
 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL\_LONG);
 }
 // The latter is omitted here, preceded by a detailed explanation.
}

4. Open tryLockInnerAsync() method

It's not difficult to see that, like no expiration time passed, the leaseTime value has changed.

 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);

 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
 // First determine if a lock exists
 "if (redis.call('exists', KEYS[1]) == 0) then " +
 // Acquire locks if present
 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
 // Then set the expiration time
 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 "return nil; " +
 "end; " +
 // hexists checks to see if the specified field of the hash table exists, locks exist, and locks are held by the current thread
 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
 // hincrby adds one
 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
 // The value of a lock is greater than 1, indicating that it is re-entrainable and that the reset expiration time is
 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 "return nil; " +
 "end; " +
 // Lock already exists and is not the thread, then expiration time TL is returned
 "return redis.call('pttl', KEYS[1]);",
 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

6. lock.unlock() source code analysis

1. Open Method Implementation

@Override
public void unlock() {
    try {
    	// Click to enter the release lock method
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
    
// Future future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}

2. Open the unlockAsync() method

@Override
public RFuture unlockAsync(long threadId) {
 RPromise result = new RedissonPromise();
 // Unlock method, expanded later
 RFuture future = unlockInnerAsync(threadId);
 // complete
 future.onComplete((opStatus, e) -> {
 if (e != null) {
 // Cancel expired renewal
 cancelExpirationRenewal(threadId);
 // Mark this future as a failure and notify everyone
 result.tryFailure(e);
 return;
 }
 // The state is empty, indicating that the unlocked thread and the current lock are not the same thread
 if (opStatus == null) {
 IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
 + id + " thread-id: " + threadId);
 result.tryFailure(cause);
 return;
 }
 
 cancelExpirationRenewal(threadId);
 result.trySuccess(null);
 });

 return result;
}


3. Open the unlockInnerAsync() method

protected RFuture unlockInnerAsync(long threadId) {
 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL\_BOOLEAN,
 // Determines if the thread releasing the lock and the thread that already has the lock are the same thread, not returning null
 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
 "return nil;" +
 "end; " +
 // Once the lock is released, the number of locks is reduced by one
 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
 // Determine if the remaining quantity is greater than 0
 "if (counter > 0) then " +
 // Greater than 0, refresh expiration time
 "redis.call('pexpire', KEYS[1], ARGV[2]); " +
 "return 0; " +
 "else " +
 // Release locks, delete key s, and publish lock release messages
 "redis.call('del', KEYS[1]); " +
 "redis.call('publish', KEYS[2], ARGV[1]); " +
 "return 1; "+
 "end; " +
 "return nil;",
 Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK\_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

7. Summary

In this way, we followed Xiaobian through the low-level source code, whether you feel you can do it again, haha. Xiao Bian came down once and thought that the receipt of goods was still barren. Before he dared not point in the source code, he was forced to go in, so people should boldly take the first step forward. Come on, I'm sorry to make up a little bit of it when I see there's not one key in a row!!

By the way, promote your own website!!!
Click to visit! Welcome to visit, there are also many good articles in it!

Keywords: server IT computer

Added by Akenatehm on Wed, 02 Mar 2022 19:30:03 +0200