The basic process of acquiring and releasing locks in AQS -- Based on ReentrantLock

AQS

Refer to meituan technical team for the following contents: The principle and application of AQS from the implementation of ReentrantLock

When it comes to ReentrantLock, we have to talk about AQS, because the bottom layer of Lock is implemented based on AQS. So. When is AQS?

The full name of AQS is AbstractQueuedSynchronizer, which is a class in JUC. It provides a simple framework for atomically managing synchronization status, blocking and waking threads, and a queue model. This class is commonly used in ReentranrLock, Semaphore, CountDownLatch and ThreadPoolExecutor.

AQS class source code simple explanation

First, let's take a look at some properties and methods in the AQS class and their functions.

Properties:

  • State: synchronization status. This attribute is used to indicate whether the current lock is exclusive or shared. It also indicates how many threads have obtained the lock for the current critical resource. Initial state state=0; If state=1, it means exclusive lock; If state > 1, it means shared lock. Let's take a look at how ReentranrLock and Semaphore set this property when they are created (taking unfair locks as an example).

    ReentranrLock: the NonfairSync internal class in this class finally inherits AQS. This class will be created when LOCK is created by default, so you only need to look at the source code of this class. When the lock() method is called, sta will be set as follows:

    final void lock() {
         // Modify 0 to 1 using CAS
         if (compareAndSetState(0, 1))
              setExclusiveOwnerThread(Thread.currentThread());
         else
              acquire(1);
    }
    

    Semaphore: during initialization, the setState() method will be called for setting, as follows:

    // Construction method
    Sync(int permits) {
          setState(permits);
    }   
    // setState method in parent class (AQS)
    protected final void setState(int newState) {
            state = newState;
    }
    
  • Head: the head node of the queue, which will be discussed in the next section.

  • Tail: the tail node of the queue, which will be discussed in the next section.

  • spinForTimeoutThreshold: spin time (nanoseconds)

  • Some offsets: offsets, which are not explained in detail.

method:

There are many classes in AQS. Here are just some common methods.

  • void acquire(int arg): to obtain the lock resource method, you need to call the tryAcquire() method. If it succeeds, it will be returned directly. If it fails, the thread will be added to the waiting queue for queuing.
  • boolean tryAcquire(int arg): class rewriting is required for unimplemented methods; Attempt to acquire the lock resource once.
  • Node addWaiter(Node mode): adds the thread node to the waiting queue.
  • Boolean acquirequeueueueueued (final node, int ARG): get the thread already in the queue in exclusive and uninterrupted mode.
  • Cancel acquire (node node): cancels the request to acquire the lock resource.
  • Node enq(final Node node): add points to the queue and initialize the queue if necessary.
  • boolean release(int arg): to release the lock resource, the tryRelease() method will be called
  • boolean tryRelease(int arg). In order to implement the method, the implementation class needs to be rewritten.
  • ......

Simple explanation of internal class Node

There is a very important internal class Node class in AQS. As the name suggests, it represents a Node. So what needs nodes? Here we need to talk about a CLH(Craig,Landin,and Hagersten) queue in AQS. It is a virtual two terminal queue, which uses nodes to connect. AQS includes Sync Queue and Condition Queue. The Condition Queue is used only when Condition is used. Otherwise, it is unnecessary and will not be introduced here.

The AQS attributes mentioned above refer to the head and tail attributes. Head is the head node of the synchronization queue and tail is the tail node of the synchronization queue. Note: the head node of the synchronization queue does not store any valid information, but serves as the initialization node of the queue.

The main attributes in Node are as follows:

        // Sharing mode	
        static final Node SHARED = new Node();
       // Exclusive Mode 
        static final Node EXCLUSIVE = null;

        // Waiting state
		// Thread canceled, unique status value greater than 0
        static final int CANCELLED =  1;
        // Indicates that subsequent threads need unparking
        static final int SIGNAL    = -1;
        // The thread is waiting for a condition
        static final int CONDITION = -2;
        // The next acquireShared should propagate unconditionally
        static final int PROPAGATE = -3;
		// The contemporary state is mainly composed of the above four kinds
        volatile int waitStatus;

		// Precursor node
        volatile Node prev;
		// Successor node
        volatile Node next;
		// The thread that the node contains
        volatile Thread thread;
		// Next node of waiting condition
        Node nextWaiter;

You mainly need to understand the value type of waitStatus attribute, and 0 is the default value for node initialization.

Analyze the principle by example -- the process of obtaining lock

The following steps may be cumbersome, but it is very easy to understand step by step in the IDE.

Layer 1: ReentrantLock acquire lock

ReentrantLock lock = new ReentrantLock();   // Create ReentrantLock instance object
// Acquire lock
try{
    lock.lock();
}

Layer 2: operations in ReentrantLock class

We can know from the source code that the lock object created by default is an unfair lock, so here we take the unfair lock as an example. For other details of ReentrantLock class, please refer to the source code. Initialization is as follows:

// ReentrantLock construction method
public ReentrantLock() {
    sync = new NonfairSync();    // No fair lock
}

Therefore, subsequent lock operations can be said to rely on the internal class NonfairSync, as follows:

// The lock method in NonfairSync above will be called
public void lock() {
    sync.lock();
}

Layer 3: operation of NonfairSync internal class

// Methods in the NonfairSync inner class
final void lock() {
    if (compareAndSetState(0, 1))
        // The lock is obtained successfully. Set the current thread as the exclusive thread of the lock
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // Otherwise, execute acquire logic
        acquire(1);
}

Here you can see that CAS will be used to set the state in if (Note: NonfairSync class finally inherits from AQS, so it inherits the states attribute). As mentioned earlier, the state attribute indicates the state of the lock, so here is to set 0 to 1 (1 means exclusive). If it succeeds, it means that the lock is obtained successfully and returned directly; Otherwise, the acquire() method logic is executed.

Layer 4: acquire() method of AQS.

The acquire () method is one of the core methods of AQS. It is used to ignore interrupts and try to obtain lock resources. The source code is as follows:

// Qcinquire method in AQS class
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&   // Attempt to acquire lock resource
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Add node to synchronization queue
        selfInterrupt();
}

There are two logics here: tryAcquire() attempts to acquire lock resources, and acquirequeueueueued() lets nodes in the synchronization queue keep trying to acquire lock resources.

If tryAcquire() returns true, it indicates that obtaining the lock resource is successful, and subsequent operations are not required.

Layer 5: implementation of tryAcquie() method

tryAcquire() is an implementation method that needs to be implemented by subclasses. You can see how the parent class Sync of NonfairSync is implemented.

// tryAcquire in AQS is not implemented
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// The method in the internal class Sync of ReentrantLock class implements tryAcquire
// If the parameter is 1, it still represents an exclusive lock
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();    // Get synchronization status
    if (c == 0) {          
        if (compareAndSetState(0, acquires)) {
             // If the status is 0 and the CAS setting status is successful, it means that the lock is obtained successfully, and return true
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // If the status is not 0, it means that an existing thread has occupied the lock, so judge whether this thread is the current thread. If yes, it means that the current thread has re entered. Therefore, it can be + 1 on the original basis. This is also the principle of lock.
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // If the acquisition fails, false is returned to perform subsequent operations
    return false;
}

It takes up to two points to pass the above:

  1. If the lock resource is not occupied, tryAcquire will attempt to set the synchronization status of CAS to obtain the lock.
  2. If the current thread obtains the lock resource again, the synchronization state can be + 1 on the original basis, that is, the implementation principle of lock reentrant lock.

If true is returned, the & & subsequent operations in the fourth layer will not continue, otherwise the subsequent operations will be executed.

Layer 6: synchronize the nodes in the queue to obtain lock resources

If the tryacquire() acquisition in step 5 fails, this step will be executed. Here, we need to focus on two methods: acquirequeueueueueued () and addWaiter () methods.

Let's first look at the addWaiter() method:

// addWaiter method in AQS class
// The parameter is node Exclusive, i.e. null, exclusive lock
private Node addWaiter(Node mode) {
    // Encapsulate the current thread as a node of a synchronization queue and set the mode to exclusive
    Node node = new Node(Thread.currentThread(), mode);
    // Gets and temporarily saves the tail node
    Node pred = tail;
    // If the tail node is not empty, the synchronization queue already exists
    if (pred != null) {
        // Set the precursor of the current thread node as the tail node
        node.prev = pred;
        // Use CAS to set the node of the current thread as the tail node
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // If the synchronization queue is not initialized
    enq(node);
    return node;
}

///////////////////////enq method///////////////////////////
// Parameter is the node encapsulated by the current thread
private Node enq(final Node node) {
    // Infinite loop until the node is successfully added to the synchronization queue
   for (;;) {
       Node t = tail;
       // Initialize synchronization queue
       if (t == null) { // Must initialize
           // If the tail node is empty, initialize an empty node, which is the head node and tail node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {   // Adds the node of the current thread to the end of the synchronization queue
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
    }
}

Through the above two methods, we can clearly see that the main operation here is to encapsulate the current thread as a Node node and add it to the tail of the synchronization queue, while the enq() method is mainly used to deal with the situation that the synchronization queue does not exist.

Let's look at the acquirequeueueueueueueueued () method:

// Acquirequeueueueued method in AQS
// Parameter: current thread node, 1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // Interrupt flag
        boolean interrupted = false;
        for (;;) {
            // Gets the precursor node of the current thread node
            final Node p = node.predecessor();
            // If the precursor node is the head node, make the current thread try to acquire the lock resource (the tryAcquire method forgot to look back at step 5)
            if (p == head && tryAcquire(arg)) {
                // If the current thread acquires the lock resource successfully, set the current thread node as the head node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // Judge whether to block the current thread according to the waiting state of the precursor node p
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // Generate CANCELLED status node and wake up the node
        if (failed)
            cancelAcquire(node);
    }
}

This method is also easy to understand: after adding the node of the current thread to the synchronization queue, judge whether the current thread is the second node of the queue (the first node does not store valid information), then let the current thread try to obtain the lock resource. If it succeeds, it will be all right, otherwise the last attempt operation will be executed.

The shouldParkAfterFailedAcquire method and parkAndCheckInterrupt method need to be executed.

Therefore, the core of the acquirequeueueueueueueueued () method is to let the nodes in the synchronization queue constantly try to obtain lock resources

Layer 7: the final bottom line for obtaining lock resources

Here, we mainly focus on the shouldParkAfterFailedAcquire method and parkAndCheckInterrupt method.

Let's first look at the shouldParkAfterFailedAcquire() method:

// shouldParkAfterFailedAcquire method in AQS
// Parameter: precursor node of current thread node, current thread node
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // Get the waiting state of the precursor Node (forget the attribute definition of the internal class of the visible Node)
    int ws = pred.waitStatus;
    // If the SIGNAL status is set, perform the park operation
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    // If the thread is CANCELLED, it is removed from the synchronization queue
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            // The logic of this line is: set the previous node of the current thread as the previous node, that is, skip the previous node
            node.prev = pred = pred.prev;  
        } while (pred.waitStatus > 0);  // From the perspective of loop, it is to skip all CANCELLED nodes before the current thread node
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // Otherwise, set the waiting state of the precursor node to SIGAL, that is, it can park
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // Return false
    return false;
}

The main operation of this method is to judge the waiting state of the node, as follows:

  • Judge whether the waiting state of the precursor node of the current thread is SIGNAL. If so, return true and execute the following parkAndCheckInterrupt() method.
  • Judge whether the waiting state of the precursor node of the current thread is CANCELLED. If yes, all continuous nodes in this state before the current thread node will be eliminated.
  • If none of the above is true, set the waiting state of the precursor node to SIGNAL, and finally return false;

The flow chart is as follows (from meituan technical team):

Let's look at the parkAndCheckInterrupt() method:

// parkAndCheckInterrupt method of AQS
private final boolean parkAndCheckInterrupt() {
    // park current thread
    LockSupport.park(this);
    return Thread.interrupted();
}

This method is very simple, that is, park the current thread, that is, block the current thread and return the interrupt status of the thread. The purpose of this step is to determine whether the current thread needs to be blocked according to the waiting state of the precursor node, so as to avoid infinite loop waste of CPU resources.

After the basic locking operation is completed, return to the acquirequeueueueueued method. The logic is shown in the following figure (from meituan technical team):

However, there is another place to think about: the finally code block in acquirequeueueueueueueueued. Through the source code view, we can see that under normal circumstances, cancelAcquire() in finally will not be executed for a lifetime, so it will be executed only under abnormal circumstances. Under abnormal circumstances, the resource requested operation of the current thread will be CANCELLED and set to the CANCELLED state. The source code is omitted. To sum up, there are three situations:

  • The current thread node is the tail node: directly eliminate the current node;
  • The current thread node is neither the tail node nor the successor node of the head node: skip the current node, and the precursor of the current node points to the successor node of the current node;
  • The current node is the successor of the head node: after waking up the current node, the nearest node in non canceled state is realized by traversing forward from the tail node to find the closest node.

Summary of lock acquisition process:

  1. First use lock Lock() will call the lock method in NonfairSync and try CAS. This is the synchronization state to obtain the lock resource; If successful, the lock is obtained successfully. Otherwise, go to the next step.
  2. Call the acquire() method in AQS. This method will first call the tryAcquire() method to try to lock the resource (this method is specifically implemented by AQS subclass). Here, it will judge whether it is the reentry lock of the current thread. If it succeeds, it will return directly, and if it fails, it will go to the next step.
  3. After the failure, the addWaiter() method is called to add the current thread to the tail of the synchronization queue, then the acquireQueued() method is used to get the node in the synchronization queue to get the lock resource: if the current thread is the successor node of the head node (second nodes), it tries to get the lock resource, and successfully sets the current node to the header node and returns it, otherwise the next step.
  4. Call shouldParkAfterFailedAcquire() method. If the precursor of the current node is in SIGNAL state, call parkAndCheckInterrupt() method to block the current thread. If it is in canceled state, remove the node in canceled state before the current node. Otherwise, set the precursor node to SIGNAL, and then go back to the previous step to try to obtain lock resources again.

Scenario simulation: thread A obtains the lock resource first, thread B obtains the lock resource, and thread C finally obtains the lock resource.

  • Thread A first sets the state synchronization status through lock and obtains the lock;
  • Thread B failed to set the lock, failed to call acquire(), failed to call tryAcquire(), called addWaiter() to add to the synchronization queue, and called the acquirequeueueueueueued() method. Since it is the successor node of the head, the attempt to obtain the lock resource returns successfully. Otherwise, call shouldParkAfterFailedAcquire() method to replace the head node. This is SIGNAL, The next time this method is executed, thread B will be blocked;
  • Thread C to get the lock, also enter the acquired() method, but it is not the successor node of head, so directly call shouldParkAfterFailedAcquire() method to set the thread B to SIGNAL state, and call parkAndCheckInterrupt() method to block the thread C in the next cycle.

Analyze the principle by example -- the process of releasing the lock

Step 1: ReentrantLock release the lock

finally{
    lock.unlock();
}

release() in the Sync class is then called

public void unlock() {
    sync.release(1);  // In fact, it is the method of the parent AQS
}

Step 2: release in AQS

// release in AQS
// Parameter: 1
public final boolean release(int arg) {
    // Call tryRelease to try to release the lock
    if (tryRelease(arg)) {
        Node h = head;
        // If the header node is not null and the wait state is not 0
        if (h != null && h.waitStatus != 0)
            // Wake up the thread of the successor node of the head node, and locksupport will be called unpark()
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease release lock:

Similar to acquire(), tryRelease() in AQS is not implemented and needs subclass implementation. See the tryRelease override method in Sync:

// Sync rewrite method
// Parameter: 1
protected final boolean tryRelease(int releases) {
    // Release the lock, so set the synchronization state to - 1
    int c = getState() - releases;
    // If it is not the current thread, an exception is thrown and the monitor status is abnormal
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // If 0, no thread holds the lock
    if (c == 0) {
        free = true;
        // Set exclusive thread to null
        setExclusiveOwnerThread(null);
    }
    // Set new synchronization status
    setState(c);
    return free;
}

To sum up, it can be seen that the process of releasing the lock is relatively simple: after successfully modifying the synchronization state, you only need to judge the state of the head node to wake up the thread.

Question 1: why (H! = null & & h.waitstatus! = 0)?

  • h != null is easy to understand and prevents null pointers;
  • h.waitStatus != 0. In fact, when a thread is new as a Node, the waitstatus defaults to 0. It will be set to SIGNAL only after calling shouldParkAfterFailedAcquire() method when obtaining the lock. Therefore, it can be understood that when waitStatus==0, its successor nodes must not be park, so there is no need to wake up. Otherwise, you need to perform a wake-up operation.

Question 2: will c in tryRelease() be greater than 1?

  • Yes, this is the concept of reentrant lock. If a thread reentries n times, c will be n. Therefore, true will be returned only when c==0, indicating that the release of lock resources is completed.

The above is the general process of obtaining and releasing locks using AQS in ReentrantLock. More content is good. You need to further study the source code.

Keywords: Java Multithreading

Added by billcoker on Tue, 18 Jan 2022 18:21:47 +0200