How to implement Redis distributed locks with annotations in SpringBoot?

1, Business background

Some business requests are time-consuming operations and need to be locked to prevent subsequent concurrent operations. At the same time, they operate the data of the database to avoid affecting the previous business.

2, Analysis process

Redis is used as a distributed lock, and the lock status is placed in redis for unified maintenance, so as to solve the problem of non interoperability of single JVM information in the cluster, specify the operation sequence and protect the correctness of user data.

Sort out the design process

  1. Create a new annotation @ interface and set the parameter input flag in the annotation
  2. Add AOP pointcuts to scan specific annotations
  3. Create @ Aspect aspect Aspect tasks, register bean s and intercept specific methods
  4. Specific method parameter ProceedingJoinPoint, for method PJP Intercept before and after proceeding()
  5. Lock before pointcut and delete key after task execution

Core steps: locking, unlocking and renewal

Lock

Opsforvalue. Using RedisTemplate Setifabsent method, judge whether there is a key, and set a random number UUID random(). ToString, generate a random number as value.

After obtaining the lock from redis, set the expire expiration time for the key, and automatically release the lock after expiration.

According to this design, only the first request that successfully sets the Key can carry out subsequent data operations. Other subsequent requests cannot be obtained 🔐 Resources, will fail to end.

Timeout problem

Worry about PJP The method of pointcut execution of proceed() is too time-consuming, resulting in the early release of key s in Redis due to timeout.

For example, thread A obtains the lock first. The processed method takes time, exceeds the lock timeout, and releases the lock when it expires. At this time, another thread B successfully obtains the Redis lock. The two threads operate on the same batch of data at the same time, resulting in inaccurate data.

Solution: add a "continuation"

If the task is not completed, the lock will not be released:

A scheduled thread pool ScheduledExecutorService is maintained. Task s added to the queue are scanned every 2s to determine whether the expiration time is approaching. The formula is: [expiration time] < = [current time] + [expiration interval (one third timeout)]

/**
 * Thread pool. Each JVM uses a thread to maintain keylivetime and execute runnable regularly
 */
private static final ScheduledExecutorService SCHEDULER = 
new ScheduledThreadPoolExecutor(1, 
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
static {
    SCHEDULER.scheduleAtFixedRate(() -> {
        // do something to extend time
    }, 0,  2, TimeUnit.SECONDS);
}

3, Design scheme

After the above analysis, colleagues are small 🐟 This scheme is designed:

The overall process has been described earlier. Here are some core steps:

  • Intercept the annotation @ RedisLock and obtain the necessary parameters
  • Locking operation
  • Continuous operation
  • End the business and release the lock

4, Practical operation

AOP usage methods have been sorted out before. You can refer to them

Related attribute class configuration

Business attribute enumeration settings

public enum RedisLockTypeEnum {
    /**
     * Custom key prefix
     */
    ONE("Business1", "Test1"),
    
    TWO("Business2", "Test2");
    private String code;
    private String desc;
    RedisLockTypeEnum(String code, String desc) {
        this.code = code;
        this.desc = desc;
    }
    public String getCode() {
        return code;
    }
    public String getDesc() {
        return desc;
    }
    public String getUniqueKey(String key) {
        return String.format("%s:%s", this.getCode(), key);
    }
}

Task queue save parameters

public class RedisLockDefinitionHolder {
    /**
     * Business unique key
     */
    private String businessKey;
    /**
     * Locking time (s)
     */
    private Long lockTime;
    /**
     * Last update time (ms)
     */
    private Long lastModifyTime;
    /**
     * Save current thread
     */
    private Thread currentTread;
    /**
     * Total attempts
     */
    private int tryCount;
    /**
     * Current attempts
     */
    private int currentCount;
    /**
     * Update time period (MS), formula = lock time (converted to MS) / 3
     */
    private Long modifyPeriod;
    public RedisLockDefinitionHolder(String businessKey, Long lockTime, Long lastModifyTime, Thread currentTread, int tryCount) {
        this.businessKey = businessKey;
        this.lockTime = lockTime;
        this.lastModifyTime = lastModifyTime;
        this.currentTread = currentTread;
        this.tryCount = tryCount;
        this.modifyPeriod = lockTime * 1000 / 3;
    }
}

Set the name of the intercepted annotation

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface RedisLockAnnotation {
    /**
     * Specific parameter identification. The 0 subscript is taken by default
     */
    int lockFiled() default 0;
    /**
     * Timeout retries
     */
    int tryCount() default 3;
    /**
     * Custom lock type
     */
    RedisLockTypeEnum typeEnum();
    /**
     * Release time in seconds s units
     */
    long lockTime() default 30;
}

Operation of core section interception

RedisLockAspect.java this class is divided into three parts to describe the specific functions

Pointcut settings

/**
 * @annotation Paths in represent intercepting specific annotations
 */
@Pointcut("@annotation(cn.sevenyuan.demo.aop.lock.RedisLockAnnotation)")
public void redisLockPC() {
}

Lock and release the lock before and after Around

The previous step defines the pointcut we want to intercept. The next step is to do some custom operations before and after the pointcut:

@Around(value = "redisLockPC()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
    // Analytical parameters
    Method method = resolveMethod(pjp);
    RedisLockAnnotation annotation = method.getAnnotation(RedisLockAnnotation.class);
    RedisLockTypeEnum typeEnum = annotation.typeEnum();
    Object[] params = pjp.getArgs();
    String ukString = params[annotation.lockFiled()].toString();
    // Omit many parameter checksums and empty sentences
    String businessKey = typeEnum.getUniqueKey(ukString);
    String uniqueValue = UUID.randomUUID().toString();
    // Lock
    Object result = null;
    try {
        boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
        if (!isSuccess) {
            throw new Exception("You can't do it,because another has get the lock =-=");
        }
        redisTemplate.expire(businessKey, annotation.lockTime(), TimeUnit.SECONDS);
        Thread currentThread = Thread.currentThread();
        // Add the Task information to the "delay" queue
        holderList.add(new RedisLockDefinitionHolder(businessKey, annotation.lockTime(), System.currentTimeMillis(),
                currentThread, annotation.tryCount()));
        // Perform business operations
        result = pjp.proceed();
        // The thread is interrupted and throws an exception to interrupt the request
        if (currentThread.isInterrupted()) {
            throw new InterruptedException("You had been interrupted =-=");
        }
    } catch (InterruptedException e ) {
        log.error("Interrupt exception, rollback transaction", e);
        throw new Exception("Interrupt exception, please send request again");
    } catch (Exception e) {
        log.error("has some error, please check again", e);
    } finally {
        // After the request is completed, the key is forcibly deleted and the lock is released
        redisTemplate.delete(businessKey);
        log.info("release the lock, businessKey is [" + businessKey + "]");
    }
    return result;
}

The above process is briefly summarized as follows:

  • Parse annotation parameters to obtain annotation values and parameter values on methods
  • redis locks and sets the timeout
  • Add the Task information to the "delay" queue. When continuing, release the lock in advance
  • Added a thread interrupt flag
  • End the request and release the lock in finally

Continuous operation

ScheduledExecutorService is used here to maintain a thread to constantly judge the tasks in the task queue and extend the timeout:

// Scanned task queue
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
 * Thread pool, maintain keyAliveTime
 */
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
        new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
    // The "continuation" operation is performed once every two seconds
    SCHEDULER.scheduleAtFixedRate(() -> {
        // Remember to add try catch here. Otherwise, the scheduled task will not be executed after an error is reported=-=
        Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
        while (iterator.hasNext()) {
            RedisLockDefinitionHolder holder = iterator.next();
            // Air judgment
            if (holder == null) {
                iterator.remove();
                continue;
            }
            // Judge whether the key is still valid. If it is invalid, remove it
            if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
                iterator.remove();
                continue;
            }
            // The number of timeout retries. If it exceeds, set an interrupt for the thread
            if (holder.getCurrentCount() > holder.getTryCount()) {
                holder.getCurrentTread().interrupt();
                iterator.remove();
                continue;
            }
            // Determine whether to enter the last third of the time
            long curTime = System.currentTimeMillis();
            boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
            if (shouldExtend) {
                holder.setLastModifyTime(curTime);
                redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
                log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
                holder.setCurrentCount(holder.getCurrentCount() + 1);
            }
        }
    }, 0, 2, TimeUnit.SECONDS);
}

This code is used to realize the idea of dotted box in the design drawing, so as to avoid a request that is very time-consuming, resulting in early release of the lock.

"Thread interrupt" Thread#interrupt is added here. It is hoped that the thread can be interrupted after the number of retries is exceeded (without rigorous test, for reference only, ha ha ha ha)

However, it is recommended that if you encounter such a time-consuming request, you can still find the root cause, analyze the time-consuming path, and conduct business optimization or other processing to avoid these time-consuming operations.

So remember to make more logs to analyze problems faster. How to use SpringBoot AOP to record operation logs and exception logs?

5, Start test

In an entry method, this annotation is used, and then time-consuming requests are simulated in the business, using Thread#sleep

@GetMapping("/testRedisLock")
@RedisLockAnnotation(typeEnum = RedisLockTypeEnum.ONE, lockTime = 3)
public Book testRedisLock(@RequestParam("userId") Long userId) {
    try {
        log.info("Before sleep execution");
        Thread.sleep(10000);
        log.info("After sleep execution");
    } catch (Exception e) {
        // log error
        log.info("has some error", e);
    }
    return null;
}

When using, add the annotation on the method and set the corresponding parameters. Multiple services can be distinguished according to typeEnum to restrict the simultaneous operation of the service.

Test results:

2020-04-04 14:55:50.864  INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController       : Before sleep execution
2020-04-04 14:55:52.855  INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect        : businessKey : [Business1:1024], try count : 0
2020-04-04 14:55:54.851  INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect        : businessKey : [Business1:1024], try count : 1
2020-04-04 14:55:56.851  INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect        : businessKey : [Business1:1024], try count : 2
2020-04-04 14:55:58.852  INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect        : businessKey : [Business1:1024], try count : 3
2020-04-04 14:56:00.857  INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController       : has some error
java.lang.InterruptedException: sleep interrupted
 at java.lang.Thread.sleep(Native Method) [na:1.8.0_221]

What I test here is the scenario of too many retries and failure. If you reduce the sleep time, the business can be executed normally.

If you request at the same time, you will find the following error message:

Indicates our lock 🔐 It did take effect and avoided repeated requests.

6, Summary

For time-consuming business and core data, repeated requests cannot operate data at the same time to avoid incorrect data, so distributed locks should be used to protect them.

Let's sort out the design process:

  1. Create a new annotation @ interface and set the parameter input flag in the annotation
  2. Add AOP pointcuts to scan specific annotations
  3. Create @ Aspect aspect Aspect tasks, register bean s and intercept specific methods
  4. Specific method parameter ProceedingJoinPoint, for method PJP Intercept before and after proceeding()
  5. Lock before pointcut and delete key after task execution

In this study, we learned about the specific implementation of distributed locks through the code design of the Review partner, and rewritten a simplified version of business processing based on his design. For the "continuation" operation not considered before, the daemon thread is used to judge regularly and extend the timeout time to avoid early release of the lock.

Therefore, three knowledge points are reviewed at the same time:

1. Implementation and common methods of AOP

2. Usage and parameter meaning of scheduled thread pool ScheduledExecutorService

3. The meaning and usage of Thread#interrupt (this is very interesting and can be further studied)

The specific code is in the previous project of learning SpringBoot. Those interested can clone it and use Redis

★https://github.com/Vip-Augus/springboot-note/blob/master/src/main/java/cn/sevenyuan/demo/aop/lock/RedisLockAspect.java "

Added by gibigbig on Fri, 17 Dec 2021 12:18:21 +0200