ReentrantLock usage and source code analysis

Basic introduction

Compared with synchronized, it has the following characteristics

  1. Interruptible
  2. Can be set to fair lock
  3. You can set the timeout
  4. Multiple condition variables are supported, that is, threads that do not meet the conditions can be placed in different collections to wait

Like synchronized, reentrant is supported

Basic grammar

// Acquire lock
reentrantLock.lock();
try {
 // Critical zone
} finally {
 // Release lock
 reentrantLock.unlock();
}

principle

When we create a ReentrantLock object, the bottom layer will help us create a new NonfairSync object by default. NonfairSync/FairSync are implemented based on AQS queue. AbstractQueuedSynchronizer is called AQS queue (queue synchronizer) for short.

It is a wait queue based on FIFO. The AQS queue is realized by a two-way linked list composed of Node nodes. All operations are in the AQS queue. If a thread obtains a lock, it will succeed directly. If it fails, it will be placed in the wait queue.

Acquire lock

  • CAS operates to preempt the lock. If the preemption is successful, modify the status of the lock to 1 and set the owner of the lock

    unfair:

    Fairness:

  • Preemption unsuccessful

    • Try to acquire the lock through tryAcquire first:

      If the lock status is 0, CAS will preempt the lock. If the preemption is successful, modify the lock status to 1 and set the lock owner

      If the lock state is not 0, judge whether it is a reentrant thread. If so, obtain the lock again and add the lock state + 1

      unfair:


      Fairness:

    • If tryAcquire does not acquire a lock:

      addWaiter(Node): create a new Node and insert it into the tail of the current AQS queue. If there is no AQS queue, create a new Node and insert it.

      Acquirequeueueueued (node): acquire the lock by spin; Or delete the node in the status of CANCELLED in the previous node, and then obtain the lock or block the thread through park.

      Note: node s are stateful, including canceled, SIGNAL, CONDITION and promote. The states involved in ReentrantLock are SIGNAL (waiting to be awakened) and canceled (CANCELLED).

      private Node enq(final Node node) {
          //spin
          for (;;) {
              Node t = tail;
              //If the tail node is empty, the AQS linked list has not been initialized yet
              if (t == null) { // Must initialize
                  //cas initializes the head node of AQS successfully. After initializing the head node, cas also initializes the tail node
                  //Note that here we can see that the head node does not store thread information, that is, the head node is equivalent to a virtual node
                  if (compareAndSetHead(new Node()))
                      tail = head;
              } else {
                  //If the tail node is not empty, you can directly add it to the tail of the linked list
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      t.next = node;
                      return t;
                  }
              }
          }
      }
      

      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              boolean interrupted = false;
              //Entry spin
              for (;;) {
                  //Gets the previous node of the current node
                  final Node p = node.predecessor();
                  // If the previous node is the head and the attempt to obtain the lock again is successful, remove the node from the AQS queue and replace the head, and return the interrupt flag
                  if (p == head && tryAcquire(arg)) {
                      setHead(node);
                      p.next = null; // help GC
                      failed = false;
                      //Note that this for loop will jump out only when the lock is preempted
                      return interrupted;
                  }
                  //Remove the node with status CANCELLED and the blocking thread is blocked here
                  //Note that after the thread is awakened, it continues to execute the for loop to try to preempt the lock. If it is not preempted, it will block again
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  //If it fails, change the node status to CANCELLED
                  cancelAcquire(node);
          }
      }
      
      
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
          int ws = pred.waitStatus;
          if (ws == Node.SIGNAL)
              //If the node is in the SIGNAL state, it does not need to be processed and returns directly
              return true;
          if (ws > 0) {
              //If the node status is > 0, it means that the node is in the cancelled status. The nodes in this status need to be cleared. Use the do while loop to clear the previous continuous nodes in the cancelled status
              do {
                  node.prev = pred = pred.prev;
              } while (pred.waitStatus > 0);
              pred.next = node;
          } else {
              //Under normal circumstances, cas is used to replace the state of the previous node with the SIGNAL state, that is - 1
              //Note that the status of all nodes in the queue is - 1 except the last one, including the head node
              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
          }
          return false;
      }
      
      private final boolean parkAndCheckInterrupt() {
          //Suspends the current thread and returns the interrupt flag locksupport Park (thread) will call unsafe The park () method blocks the thread (which is a native method)
          LockSupport.park(this);
          return Thread.interrupted();
      }
      

Release lock

  • Get the status value of the lock and set the status value to - 1
  • Judge whether the current thread is the owner thread of the lock. If not, throw an exception.
  • If the status value is 0, the owner of the lock is set to null
  • Wake up the next node whose status is not CANCELLED through unpark
public final boolean release(int arg) {
    //Only tryRelease returns true, indicating that the lock has been released. It is necessary to wake up the blocked thread. Otherwise, it is not necessary to wake up other threads
    if (tryRelease(arg)) {
        Node h = head;
        //If the head node is not empty and the status is not 0, it means that the synchronization queue has been initialized and there are nodes to wake up
        //Note that the head node of the synchronization queue is equivalent to a virtual node, which we can clearly know in the code for building the node
        //In the shouldParkAfterFailedAcquire method, the state of the head node will be changed to - 1
        //If the status of the head is 0, it means that there are no elements in the queue that need to be awakened, and it directly returns true
        if (h != null && h.waitStatus != 0)
            //Wake up the next node of the head node
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    //Reduce reentry times
    int c = getState() - releases;
    //If the thread that obtains the lock is not the current thread, an exception is thrown
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //If the state is 0, it indicates that the lock has been completely released and returns true
    if (c == 0) {
        free = true;
        //Set the thread that gets the lock to null
        setExclusiveOwnerThread(null);
    }
    //Reset the value of state
    setState(c);
    //Returns true if the lock is released; otherwise, returns false
    return free;
}
private void unparkSuccessor(Node node) {
    //Get the state of the head node. Setting the head node state to 0 means that a thread is waking up. If the head state is 0, it will not enter this method
    int ws = node.waitStatus;
    if (ws < 0)
        //Set the header node status to 0
        compareAndSetWaitStatus(node, ws, 0);
    //The next state of the wake-up header node is not a cancelled node (because the header node does not store blocked threads)
    Node s = node.next;
    //The current node is null or cancelled
    if (s == null || s.waitStatus > 0) {
        s = null;
        //Start traversing from the tail of the aqs linked list and find the non empty state closest to the head node. The node that is not cancelled is assigned to s. here, why do you traverse from the tail node, but the head node should be because the prev of the node is initialized when adding the node. When traversing from the tail node, there will be no prve without assignment 
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //Call locksupport Unpark() wakes up the specified thread
        LockSupport.unpark(s.thread);
}	

Reentrant

Reentrant means that if the same thread obtains the lock for the first time, it has the right to obtain the lock again because it is the owner of the lock. If it is a non reentrant lock, it will be blocked by the lock the second time it obtains the lock

static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
 	method1();
}
public static void method1() {
     lock.lock();
     try {
     	log.debug("execute method1");
    	 method2();
     } finally {
     	lock.unlock();
     }
}
public static void method2() {
     lock.lock();
     try {
     	log.debug("execute method2");
     	method3();
     } finally {
    	 lock.unlock();
     }
}
public static void method3() {
     lock.lock();
         try {
         log.debug("execute method3");
     } finally {
     	lock.unlock();
     }
}

Interruptible

  • lock.lockInterruptibly(); It can be interrupted.

  • lock.lock() cannot be interrupted.

lock.lockInterruptibly() the underlying will call:

ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> { 
	log.debug("start-up...");
     try {
     	lock.lockInterruptibly();
     } catch (InterruptedException e) {
         e.printStackTrace();
         log.debug("The process of waiting for the lock is interrupted");
     	return;
     }
     try {
    	 log.debug("Got the lock");
     } finally {
    	 lock.unlock();
     }
}, "t1");
lock.lock();
log.debug("Got the lock");
t1.start();
try {
     sleep(1);
     t1.interrupt();
     log.debug("Execution interrupt");
} finally {
 	lock.unlock();
}

Output:

18:02:40.520 [main] c.TestInterrupt - Got the lock
18:02:40.524 [t1] c.TestInterrupt - start-up... 
18:02:41.530 [main] c.TestInterrupt - Execution interrupt
java.lang.InterruptedException 
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898) 
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222) 
 at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) 
 at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17) 
 at java.lang.Thread.run(Thread.java:748) 
18:02:41.532 [t1] c.TestInterrupt - The process of waiting for the lock is interrupted

Fair lock

In the synchronized lock, the lock waiting in the entrylist is not obtained on a first come first served basis during competition. Therefore, the synchronized lock is unfair;

The ReentranLock lock is unfair by default, but it can be set to achieve a fair lock.

ReentrantLock lock = new ReentrantLock(true);

The original intention is to solve the hunger problem mentioned earlier, but fair lock is generally not necessary, which will reduce the concurrency, and trylock is better.

In fact, there are only two differences between the two implementations( We have already talked about the previous principle of acquiring locks):

  1. At the beginning of the lock() method, the unfair lock will try cas to seize the lock and insert it into the queue once;
  2. When the tryAcquire() method finds that the state is 0, the non fair lock will preempt the lock once, and the fair lock will judge whether there are waiting threads in the AQS linked list. Only those threads that do not wait will preempt the lock.

Lock timeout

private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
    Thread t1 = new Thread(() -> {
        log.debug("Attempt to acquire lock");
        try {
            lock.lockInterruptibly();
            //It timed out after waiting for 2s without obtaining the lock
            if (! lock.tryLock(2, TimeUnit.SECONDS)) {
                log.debug("Cannot acquire lock");
                return;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.debug("Cannot acquire lock");
            return;
        }
        try {
            log.debug("Get lock");
        } finally {
            lock.unlock();
        }
    }, "t1");

    lock.lock();
    log.debug("Get lock");
    t1.start();
    sleep(1);
    log.debug("The lock was released");
    lock.unlock();
}

Conditional variable

There are also conditional variables in synchronized, that is, the waitSet lounge when we talk about the principle. When the conditions are not met, enter the waitSet and wait
The advantage of ReentrantLock's conditional variables over synchronized is that it supports multiple conditional variables, which is like

  1. synchronized means that those threads that do not meet the conditions are waiting for messages in a lounge
  2. ReentrantLock supports multiple lounges, including a lounge dedicated to waiting for cigarettes and a lounge dedicated to waiting for breakfast. It also wakes up according to the lounge when waking up

Key points of use:

  1. You need to obtain a lock before await
  2. After await is executed, it will release the lock and enter the conditionObject to wait
  3. The await thread is awakened (or interrupted, or timed out) to re compete for the lock lock. The wake-up thread must obtain the lock first
  4. After the contention lock is successful, the execution continues after await
static final Object room = new Object();
static boolean hasCigarette = false;
static boolean hasTakeout = false;
static ReentrantLock ROOM = new ReentrantLock();
// Waiting room for cigarettes
static Condition waitCigaretteSet = ROOM.newCondition();
// Waiting room for takeout
static Condition waitTakeoutSet = ROOM.newCondition();

public static void main(String[] args) {
    new Thread(() -> {
        ROOM.lock();
        try {
            log.debug("Any smoke?[{}]", hasCigarette);
            while (!hasCigarette) { 
                log.debug("No smoke, take a break!");
                try {
                    waitCigaretteSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("You can start working");
        } finally {
            ROOM.unlock();
        }
    }, "Xiaonan").start();

    new Thread(() -> {
        ROOM.lock();
        try {
            log.debug("Did you deliver the takeout?[{}]", hasTakeout);
            while (!hasTakeout) {
                log.debug("No takeout, take a break!");
                try {
                    waitTakeoutSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("You can start working");
        } finally {
            ROOM.unlock();
        }
    }, "my daughter").start();

    sleep(1);
    new Thread(() -> {
        ROOM.lock();
        try {
            hasTakeout = true;
            waitTakeoutSet.signal();
        } finally {
            ROOM.unlock();
        }
    }, "Delivery").start();

    sleep(1);

    new Thread(() -> {
        ROOM.lock();
        try {
            hasCigarette = true;
            waitCigaretteSet.signal();
        } finally {
            ROOM.unlock();
        }
    }, "Cigarette delivery").start();
}

Keywords: Java Back-end Concurrent Programming

Added by brendandonhue on Sun, 09 Jan 2022 05:17:50 +0200