Python WeChat Subscription Applet Course Video
https://edu.csdn.net/course/detail/36074
Python Actual Quantitative Transaction Finance System
https://edu.csdn.net/course/detail/35475
1. Preface
We are implementing distributed locks using Redis, starting with SET resource-name anystring NX EX max-lock-time, and releasing locks using Lua scripts to guarantee atomicity. This manual implementation is cumbersome, and the Redis website makes it clear that the Java version uses Redisson. Xiao Bian also saw the official website to slowly explore clearly, close-up this record. From the official website to the integration of Springboot to source code interpretation, taking a single node as an example, the understanding of the small edition is in the notes, I hope it can help you!!
2. Why use Redisson
1. Let's open the website
Chinese official website in redis
2. We can see that the authorities let us use the other
3. Open Official Recommendations
4. Find Documents
Redisson address
5. Redisson structure
3. Springboot Integration Redisson
1. Import Dependency
<dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-data-redisartifactId> dependency> <dependency> <groupId>redis.clientsgroupId> <artifactId>jedisartifactId> dependency> <dependency> <groupId>org.redissongroupId> <artifactId>redissonartifactId> <version>3.12.0version> dependency>
2. Take the official website as an example to see how to configure it
3. Write configuration classes
import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author wangzhenjun * @date 2022/2/9 9:57 */ @Configuration public class MyRedissonConfig { /** * All use of redisson is handled through the RedissonClient * @return */ @Bean(destroyMethod="shutdown") public RedissonClient redisson(){ // 1. Create Configuration Config config = new Config(); // Be sure to add redis:// config.useSingleServer().setAddress("redis://192.168.17.130:6379"); // 2. Create a redissonClient instance from config RedissonClient redissonClient = Redisson.create(config); return redissonClient; } }
4. Examples of official website test locking
5. Write according to the simple Controller interface of the official website
@ResponseBody @GetMapping("/hello") public String hello(){ // 1. Acquire a lock, the same lock as long as the lock name is the same RLock lock = redisson.getLock("my-lock"); // 2. Locking lock.lock();// Blocking Trial Waiting for Default Add is 30s // Case with parameters // lock.lock(10, TimeUnit.SECONDS);// 10s auto-unlock, the auto-unlock time must be greater than the execution time of the business. try { System.out.println("Lock Success" + Thread.currentThread().getId()); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 3. Unlock System.out.println("Unlock succeeded:" + Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
6. Testing
4. lock.lock() source code analysis
1. Open the RedissonLock implementation class
2. Find implementation
@Override public void lock() { try { // We found that the default expiration time for non-expiration time sources is -1 lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
3. Hold Ctrl in to lock method
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { // Gets the id of the thread and the value of field when holding the lock is UUID:Thread number id long threadId = Thread.currentThread().getId(); // Attempting to acquire a lock Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired gets the lock, returns if (ttl == null) { return; } // This means that if acquiring a lock fails, the lock is subscribed to through the thread id RFuture future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { // Spin here, keep trying to get the lock while (true) { // Continue trying to acquire locks ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired succeeded if (ttl == null) { // Return directly, pick the spin break; } // Wait for message to continue waiting for lock if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // unsubscribe unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
4. Go in and try to get the lock method
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { // Direct Into Asynchronous Method return get(tryAcquireAsync(leaseTime, unit, threadId)); } private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // Here you can tell if the parameter leaseTime = -1 is not set if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL\_LONG); } // This method acquires the lock and the expiration time is the default time for the watchdog // private long lockWatchdogTimeout = 30 * 1000; Watchdog default expiration time is 30s // Lock and expiration times must be atomic. This method must be called after the execution of the Lua script. Let's see below RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL\_LONG); // Open a timed task to refresh the expiration time continuously ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired gets lock if (ttlRemaining == null) { // Refresh expiration time method, let's go into more detail next scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
5. View tryLockInnerAsync() method
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // First determine if a lock exists "if (redis.call('exists', KEYS[1]) == 0) then " + // Acquire locks if present "redis.call('hset', KEYS[1], ARGV[2], 1); " + // Then set the expiration time "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // hexists checks to see if the specified field of the hash table exists, locks exist, and locks are held by the current thread "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // hincrby adds one "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // The value of a lock is greater than 1, indicating that it is re-entrainable and that the reset expiration time is "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // Lock already exists and is not the thread, then expiration time TL is returned "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
6. Enter the scheduleExpirationRenewal() method of the remaining timed task
Step by step to find the source: scheduleExpirationRenewal - >renewExpiration
According to the following source code, the timer task refresh time is: internalLockLeaseTime / 3, which is 1/3 of the watchdog, that is, refresh once for 10s
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
5. lock.lock(10, TimeUnit.SECONDS) Source Analysis
1. Open the implementation class
@Override public void lock(long leaseTime, TimeUnit unit) { try { // The expiration time here is the 10 we entered lock(leaseTime, unit, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
2. Method lock() implementation display, same as 3.3 source code
3. Go directly to the try to get the lock tryAcquireAsync() method
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // Let's make a judgement here that if the parameter leaseTime = -1 is not set, then we're 10 if (leaseTime != -1) { // Come to this method return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL\_LONG); } // The latter is omitted here, preceded by a detailed explanation. }
4. Open tryLockInnerAsync() method
It's not difficult to see that, like no expiration time passed, the leaseTime value has changed.
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // First determine if a lock exists "if (redis.call('exists', KEYS[1]) == 0) then " + // Acquire locks if present "redis.call('hset', KEYS[1], ARGV[2], 1); " + // Then set the expiration time "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // hexists checks to see if the specified field of the hash table exists, locks exist, and locks are held by the current thread "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // hincrby adds one "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // The value of a lock is greater than 1, indicating that it is re-entrainable and that the reset expiration time is "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // Lock already exists and is not the thread, then expiration time TL is returned "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
6. lock.unlock() source code analysis
1. Open Method Implementation
@Override public void unlock() { try { // Click to enter the release lock method get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } // Future future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); }
2. Open the unlockAsync() method
@Override public RFuture unlockAsync(long threadId) { RPromise result = new RedissonPromise(); // Unlock method, expanded later RFuture future = unlockInnerAsync(threadId); // complete future.onComplete((opStatus, e) -> { if (e != null) { // Cancel expired renewal cancelExpirationRenewal(threadId); // Mark this future as a failure and notify everyone result.tryFailure(e); return; } // The state is empty, indicating that the unlocked thread and the current lock are not the same thread if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; }
3. Open the unlockInnerAsync() method
protected RFuture unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL\_BOOLEAN, // Determines if the thread releasing the lock and the thread that already has the lock are the same thread, not returning null "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // Once the lock is released, the number of locks is reduced by one "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // Determine if the remaining quantity is greater than 0 "if (counter > 0) then " + // Greater than 0, refresh expiration time "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // Release locks, delete key s, and publish lock release messages "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK\_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
7. Summary
In this way, we followed Xiaobian through the low-level source code, whether you feel you can do it again, haha. Xiao Bian came down once and thought that the receipt of goods was still barren. Before he dared not point in the source code, he was forced to go in, so people should boldly take the first step forward. Come on, I'm sorry to make up a little bit of it when I see there's not one key in a row!!
By the way, promote your own website!!!
Click to visit! Welcome to visit, there are also many good articles in it!