Interviewer: how can Redis distributed locks be automatically renewed

Click on "end of life", pay attention to the official account.

Daily technical dry goods, delivered at the first time!

1. Redis implements distributed locks

  • Specify a key as the lock mark, store it in Redis, and specify a unique user ID as the value.

  • When the key does not exist, the value can be set to ensure that only one client process obtains the lock at the same time, meeting the mutex feature.

  • Set an expiration time to prevent the key from being deleted due to system exceptions, which meets the anti deadlock feature.

  • After processing the business, you need to clear the key to release the lock. When clearing the key, you need to verify the value value. Only the person who adds the lock can release the lock.

2. Question

If the expiration time of the lock is 30 seconds, but the business operation exceeds 30 seconds, such as 40 seconds, when the business operation reaches 30 seconds, the lock expires and other clients get the lock, what should we do

We can set a reasonable expiration time so that the business can complete the business logic within this time, but the setting of LockTime is not easy.

  • If the LockTime setting is too small, the probability of automatic lock timeout and abnormal lock failure will increase;

  • The LockTime setting is too large. In case the service fails to release the lock normally, the longer the abnormal lock occurs.

We can only configure it through experience. An acceptable value is basically the average time-consuming in the history of the service, and then increase a certain buff. In general, setting a reasonable expiration time is not easy

We can also set no expiration time to unlock after the end of business operation, but if the client ends abnormally or goes down, the lock cannot be unlocked and becomes a deadlock;

3. Automatic renewal

We can first set a LockTime for the lock, and then start a daemon thread to reset the LockTime of the lock after a period of time.

It looks simple, but it's not easy to implement.

  • As in the case of releasing a lock, we need to determine whether the client holding the lock has changed. Otherwise, no matter who holds the lock, the daemon thread will reset the LockTime of the lock.

  • The daemon thread should reset the LockTime of the lock in a reasonable time, otherwise it will cause a waste of resources. You can't keep going.

  • If the thread holding the lock has finished processing the business, the daemon thread should also be destroyed. No, the business operation is over, and the guardian continues to run there, wasting resources.

In addition, pay attention to the public account "final code life", reply to the keyword "data", and obtain video tutorials and the latest interview data!

4. Watchdog

Redisson's watchdog mechanism is this mechanism to realize automatic renewal

Redissson tryLock

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  long time = unit.toMillis(waitTime);
  long current = System.currentTimeMillis();
  long threadId = Thread.currentThread().getId();
  // 1. Try to acquire the lock
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return true;
  }
  
  // If the time taken to apply for a lock is greater than or equal to the maximum waiting time, the application for a lock fails
  time -= System.currentTimeMillis() - current;
  if (time <= 0) {
    acquireFailed(threadId);
    return false;
  }
  
  current = System.currentTimeMillis();
  
  /**
  * 2.Subscribe to lock release events and block waiting for lock release through await method, which effectively solves the problem of waste of resources caused by invalid lock applications:
  * Based on the amount of information, when the lock is occupied by other resources, the current thread subscribes to the lock release event through the Redis channel. Once the lock is released, a message will be sent to notify the waiting threads to compete
  *
  * When this Await returns false, indicating that the waiting time has exceeded the maximum waiting time for obtaining the lock. Unsubscribe and return the failure to obtain the lock
  * When this Await returns true and enters the loop to try to acquire the lock
  */
  RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  // Within the await method, CountDownLatch is used to implement blocking and obtain the result of asynchronous execution of subscribe (Netty's Future is applied)
  if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    if (!subscribeFuture.cancel(false)) {
      subscribeFuture.onComplete((res, e) -> {
        if (e == null) {
          unsubscribe(subscribeFuture, threadId);
        }
      });
    }
    acquireFailed(threadId);
    return false;
  }
  
  try {
    // Calculate the total time taken to acquire the lock. If it is greater than or equal to the maximum waiting time, acquiring the lock fails
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
      acquireFailed(threadId);
      return false;
  
    }
  
    /**
    * 3.After receiving the signal of lock release, cycle one attempt after another to obtain the lock within the maximum waiting time
    * If the lock is obtained successfully, it will immediately return true,
    * If the lock is not obtained within the maximum waiting time, it is considered that obtaining the lock failed, and false is returned to end the cycle
    */
    while (true) {
      long currentTime = System.currentTimeMillis();
  
      // Try to acquire the lock again
      ttl = tryAcquire(leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        return true;
      }
      // If the maximum waiting time is exceeded, false is returned to end the cycle, and lock acquisition fails
      time -= System.currentTimeMillis() - currentTime;
      if (time <= 0) {
        acquireFailed(threadId);
        return false;
      }
  
      /**
      * 6.Blocking waiting lock (blocking by semaphore (shared lock), waiting for unlocking message):
      */
      currentTime = System.currentTimeMillis();
      if (ttl >= 0 && ttl < time) {
        //If the remaining time (ttl) is less than the wait time, a license is obtained from the semaphore of the Entry within the ttl time (unless interrupted or there is no license available).
        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
      } else {
        //Then wait for the semaphore to pass within the wait time range
        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
      }
  
      // Update the remaining waiting time (maximum waiting time - elapsed blocking time)
      time -= System.currentTimeMillis() - currentTime;
      if (time <= 0) {
        acquireFailed(threadId);
        return false;
      }
    }
  } finally {
    // 7. Unsubscribe from the unlock message no matter whether the lock is obtained or not
    unsubscribe(subscribeFuture, threadId);
  }
  return get(tryLockAsync(waitTime, leaseTime, unit));
}

  • When trying to obtain a lock, if null is returned, it indicates that the lock is successful. If a value is returned, it indicates that the lock already exists. ttl is the remaining lifetime of the lock.

  • If the client 2 process fails to acquire the lock at this time, Then use the thread id of client 2 (in fact, it is the process id in essence) subscribe to the lock release event through the Redis channel. If you wait for the lock release event notification, when the maximum waiting time is exceeded, the lock acquisition fails and returns false, that is, the code in line 39. If you wait for the lock release event notification, you begin to enter a cycle of repeatedly trying to acquire the lock.

  • Each time in the loop, try to acquire the lock and get the remaining lifetime of the existing lock. If the lock is obtained in the retry, it will be returned directly. If the lock is still occupied, the Semaphore semaphore is used to block the thread for the message waiting to release the lock. When the lock is released and the message of releasing the lock is released, the Semaphore release() method will be called. At this time, a thread in the waiting queue blocked by the Semaphore can continue to try to obtain the lock.

  • When the lock is being occupied, the process waiting to obtain the lock does not obtain the lock through a while(true) loop, but uses Redis's publish and subscribe mechanism to block the process waiting for the lock through the await method, which effectively solves the problem of waste of resources caused by invalid lock applications.

In addition, pay attention to the public account "final code life", reply to the keyword "data", and obtain video tutorials and the latest interview data!

5. How does the watchdog renew automatically

The Redisson watchdog mechanism starts a Watch Dog as long as the client locks successfully.

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

  • The leaseTime must be - 1 to enable the Watch Dog mechanism. If you need to enable the Watch Dog mechanism, you must use the default locking time of 30s.

  • If you customize the time, beyond this time, the lock will be released automatically and will not be renewed automatically.

6. Renewal principle

The renewal principle is actually to reset the lock time to 30s with lua script

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId));
}

  • The Watch Dog mechanism is actually a background scheduled task thread. After successfully obtaining the lock, the thread holding the lock will be put into a redissonlock EXPIR

     

Keywords: Java Redis Distribution

Added by jay_bo on Thu, 23 Dec 2021 22:37:26 +0200