ReentrantLock reentrant lock

Synchronized is called implicit Lock, also known as JVM Lock, because it holds and releases locks implicitly without developer intervention. Java 1.5 introduces a new locking mechanism, in which the implementation of the Lock is based on the Lock interface:

public interface Lock {
	// Lock
	void lock();
	// Unlock
	void unlock();
	// The lock can be obtained by interrupt, and the interrupt operation can be responded to when obtaining the lock
	void lockInterruptibly() throws InterruptedException;
	// If you try to obtain a lock without blocking, you can get it and return true; otherwise, you will return false
	boolean tryLock();
	// If you try to acquire the lock according to the time, you can get it and return true; otherwise, you will return false
	boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
	// Gets the wait notification component, which is bound to the current lock
	Condition newCondition();
}

This is used in the code:

Lock lock = new ReentrantLock();
lock.lock();
try {
	// Shared code block
} finally {
	lock.unlock();
}

To ensure that the lock will be released, lock Unlock () must be written in a finally decorated code block.

ReentrantLock lock is implemented based on AQS queue synchronizer. Its full name is AbstractQueuedSynchronizer. Its abstract classes are as follows:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
	// 0 means the lock is not occupied and 1 means it is occupied
	private volatile int state;
	// Point to synchronization queue header
	private transient volatile Node head;
	// Point to the end of the synchronization queue
	private transient volatile Node tail;
	// Other attributes are omitted
}

When a thread calls the lock() method to grab a lock, if the state value is 0, it indicates that the current lock has not been occupied and the lock is obtained successfully. Set the value to 1. Otherwise, the lock grab fails, and the thread is encapsulated as a node object and placed in the synchronization queue for waiting. Node is the internal class of AQS. Its structure is as follows:

static final Class Node {
    //Sharing mode
    static final Node SHARED = new Node();
    //Exclusive Mode 
    static final Node EXCLUSIVE = null;
	// Waiting state
    volatile int waitStatus;
	// Synchronization queue precursor node
    volatile Node prev;
    // Synchronization queue successor node
    volatile Node next;
    // Thread requesting lock
    volatile Thread thread;
    // Waiting for successor nodes in the queue
    Node nextWaiter;
}

Shared mode means that the lock can be occupied by multiple threads at the same time. Exclusive mode means that at most one thread can occupy the lock. ReentrantLock adopts exclusive mode, and shared mode has Semaphore, which will be discussed later.

waitStatus indicates the status of the current Node. There are four values:

  • CANCELLED: 1, end status. The thread waits for timeout or is interrupted. It needs to be CANCELLED from the synchronization queue
  • SIGNAL: - 1, wake-up status. As long as the precursor node releases the lock, it will notify the thread in SIGNAL status to execute
  • Condition: - 2, wait status, call the wait() method of the Condition object and enter the wait queue
  • PROPAGATE: - 3, which is related to the sharing mode. The thread in this state can run
  • 0 state: 0, initial state

From prev and next, we can see that the synchronization queue maintained in AQS is bidirectional. nextWaiter refers to the successor node in the waiting queue. Here, it is necessary to distinguish between the synchronization queue and the waiting queue. The threads in the waiting queue will not obtain the lock.

ReentrantLock uses AQS through internal classes sync, FairSync and NofairSync. Sync inherits AbstractQueuedSynchronizer class, and FairSync and NofairSync inherit sync class. Sync implements the tryRelease() unlocking method, FairSync and NofairSync implement the tryAcquire() locking method.

FailSync and NofairSync respectively represent fair locks and unfair locks. The so-called fair locks refer to obtaining locks according to the order in which threads request locks. On the contrary, Synchronized is an unfair lock. ReentrantLock can choose fair or unfair according to the construction method. The default use of unfair locks is more efficient.

// Default unfair lock
public ReentrantLock() {
    sync = new NonfairSync();
}
// Create lock type based on parameters
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

The main template methods provided by AQS include:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{
    // Exclusive Mode 
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // Sharing mode
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // Determine whether to exclusive lock
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
}

The benefits of this are obvious. As a basic component, AQS mainly completes the maintenance of queues and the control of synchronization status. The specific operations of obtaining and releasing locks are implemented in subclasses. As a public template, AQS can serve a variety of locks, such as exclusive locks and shared locks, and even we can customize locks

AQS realizes thread synchronization through two-way queue (Node two-way linked list) and synchronization state (state). The implementation of fair lock and unfair lock are different. Let's look at the implementation source code of unfair lock:

// Where the lock() method begins to execute
static final class NonfairSync extends Sync {
    final void lock() {
    	// CAS attempts to acquire lock
        if (compareAndSetState(0, 1))
        	// The lock is obtained successfully. The AQS exclusive lock thread is set to the current thread
          	setExclusiveOwnerThread(Thread.currentThread());
        else
        	// Failed to acquire the lock. 1 indicates the value to be set after acquiring the lock. Generally, it is 1
            acquire(1);
    }
}
public final void acquire(int arg) {
	// Try to acquire the lock again. If the lock acquisition fails, encapsulate the current thread as a Node object and put it into the synchronization queue
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // Failed to acquire lock, trying to safely interrupt thread
        selfInterrupt();
}

Here, acquire() is the AQS method. The parameter arg indicates the state value to be set, which is generally 1, indicating the lock grabbing operation

static final class NonfairSync extends Sync {
	// Unfair attempt to acquire lock method
    protected final boolean tryAcquire(int acquires) {
         return nonfairTryAcquire(acquires);
     }
 }
abstract static class Sync extends AbstractQueuedSynchronizer {
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      // Judge whether AQS synchronization status is 0 (lock can be obtained)
      if (c == 0) {
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      // The current thread is the thread that holds the lock and can re-enter the lock
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          setState(nextc);
          return true;
      }
      return false;
  }
  // Omit other codes
}

As can be seen from the above code, ReentrantLock is also a reentrant lock, which is the same as Synchronized. It records the number of reentries through the state parameter value. When the value is 0, it indicates that the current lock is not occupied

When the thread fails to acquire the lock for the first time, it will rob the lock again. At this time, it may encounter the situation that other threads just release the lock, that is, the current thread can seize the lock without entering the synchronization queue, which is also the reason why unfair locks are unfair. If the lock acquisition fails, try to encapsulate the current thread as a Node object and put it into the synchronization queue

private Node addWaiter(Node mode) {
    // Encapsulation node
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
	// Judge whether the tail node is empty
    if (pred != null) {
        node.prev = pred;
        // Use CAS to try to add nodes at the end of the queue
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // If joining CAS for the first time or CAS fails
    enq(node);
    return node;
}
private Node enq(final Node node) {
	// Dead cycle
    for (;;) {
         Node t = tail;
         // Initialize queue
         if (t == null) {
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
}

The above uses CAS plus dead loop to put the Node queue into the synchronization queue. If the queue is empty, initialize the queue, otherwise add the Node to the end of the queue. As can be seen from the code, the head Node in the queue does not save data, and the data is saved from the tail Node.

After adding to the synchronization queue, the node enters the spin to judge whether it meets the synchronization conditions:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	// Get precursor node
            final Node p = node.predecessor();
            // If and only if p is the head node trying to obtain synchronization status
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                // Convenient gc
                p.next = null;
                failed = false;
                return interrupted;
            }
            // If the precursor node is not a head and the lock acquisition fails, judge whether to suspend the thread
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
    	// Finally, the lock could not be obtained and the thread ended
        if (failed)
            cancelAcquire(node);
    }
}
private void setHead(Node node) {
	head = node;
	node.thread = null;
	node.prev = null;
}

The setHead() method indicates that the thread corresponding to the current Node can obtain the lock and does not need to be saved in the queue to release the corresponding resources. The AQS synchronization queue head itself still does not save data. If and only if the current driver Node is the head Node, it will try to obtain the lock. Otherwise, it will judge whether the thread is suspended

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	// The precursor node is suspended when it is waiting for wake-up
	if (ws == Node.SIGNAL)
	    return true;
	// Skip when the precursor node is in the end state
	if (ws > 0) {
	    do {
	        node.prev = pred = pred.prev;
	    } while (pred.waitStatus > 0);
	    pred.next = node;
	} else {
		// An attempt was made to set the state of its predecessor node to SIGNAL
	    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}
private final boolean parkAndCheckInterrupt() {
	// Suspend thread
	LockSupport.park(this);
	// Return thread interrupt status
	return Thread.interrupted();
}

Here, it is mainly used to judge whether the thread needs to be suspended. If the precursor node waits for wake-up, it will be suspended. If the precursor node has ended, it will skip. In other words, the thread does not need to be processed and will be removed from the synchronization queue. Otherwise, the state of the precursor node will be set to wait for wake-up

It should be noted that when the precursor node is head, it may not obtain the lock successfully, but it may still hang, because the unfair lock exists, and the thread that has not yet entered the synchronization queue happens to obtain the lock. At this time, it needs to execute a cycle

The process of unfair locking and locking is listed above, and the code comments have been clearly written. Here we integrate:

  1. CAS obtains the synchronization status, successfully sets the exclusive lock thread as the current thread, and fails to go 2
  2. Try again to obtain the synchronization status and determine whether to re-enter the lock. The synchronization status value is successfully set and the exclusive lock thread is set. If it fails, go 3
  3. Encapsulate the thread as a Node node and judge whether the precursor Node is head. If it is an endless loop to try to obtain the lock, if not, judge whether it is suspended and cycle to judge

There is also a method of obtaining lock by interrupt, in which its code is implemented as follows:

private void doAcquireInterruptibly(int arg) throws InterruptedException {
	final Node node = addWaiter(Node.EXCLUSIVE);
	boolean failed = true;
	try {
	    for (;;) {
	        final Node p = node.predecessor();
	        if (p == head && tryAcquire(arg)) {
	            setHead(node);
	            p.next = null; 
	            failed = false;
	            return;
	        }
	        if (shouldParkAfterFailedAcquire(p, node) &&
	            parkAndCheckInterrupt())
	            //Throw an exception directly and interrupt the synchronization status request of the thread
	            throw new InterruptedException();
	    }
	} finally {
	    if (failed)
	        cancelAcquire(node);
	}
}

The biggest difference is that after judging that the thread should hang, it directly throws an exception, interrupts the thread's synchronization ball and overflows the synchronization queue. So far, the introduction of all locking operations is completed

tryRelease() method is implemented in AQS. ReentrantLock fair lock and unfair lock release lock operations are implemented in the parent class Sync. Unparksuccess() method will be called after releasing the synchronization state:

private void unparkSuccessor(Node node) {
	// node generally represents the thread that currently obtains the lock
    int ws = node.waitStatus;
    // Set thread status to 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // Find the next node to wake up s
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // Wake up thread
    if (s != null)
        LockSupport.unpark(s.thread);
}

The above code mainly wakes up the leftmost thread in the synchronization queue, in which all threads in non end state can wake up, and the awakened thread returns to the loop queue in the above locking operation to continue to judge.

The operation of obtaining a fair lock is similar to that of obtaining a non fair lock. The main differences are in the following codes:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    //First, judge whether there are nodes in the synchronization queue
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

When a fair lock obtains a lock again, it will judge whether the synchronization queue is empty, which is the biggest difference between it and a non fair lock. Generally speaking, the efficiency of non fair locks is higher and more in line with the expected effect. Fair locks are mainly used in some scenarios where the execution order needs to be considered.

Keywords: Java Multithreading Distribution queue Concurrent Programming

Added by kjl-php on Mon, 07 Feb 2022 23:16:38 +0200