[JDK source code] conditional lock of synchronous series AQS

brief introduction

Condition lock refers to a lock that is used when you find that the current business scenario cannot be processed by yourself after obtaining the lock, but you need to wait for a condition to continue processing.

Note that the condition here must wait after obtaining the lock. The condition lock corresponding to ReentrantLock can only be called after obtaining the lock.

In java, conditional locks are implemented in the ConditionObject class of AQS, and ConditionObject implements the Condition interface

Use example

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // Declare a reentry lock
        ReentrantLock lock = new ReentrantLock();
        // Declaring a conditional lock ReentrantLockTest does not implement AQS, but sync in ReentrantLockTest implements AQS, so sync is called to initialize
        /** ReentrantLock
            public Condition newCondition() {
       			 return sync.newCondition();
    		}
    	 *	sync
		  	final ConditionObject newCondition() {
                return new ConditionObject();
        	}
        */
        Condition condition = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // The waiting condition is actually blocking
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // The purpose of sleeping 1000ms here is to let the above thread obtain the lock first
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // Here, 2000ms represents the time required for this thread to execute business
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // Notification condition established
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}

The above code is very simple. One thread waits for a condition and the other thread notifies that the condition has been established. The following numbers represent the actual running order of the code. In fact, they are blocking and wake-up, that is, Object wait and notify

Source code analysis

Main properties of ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. First node of condition queue*/
    private transient Node firstWaiter;
    /** Last node of condition queue.Last node of condition queue */
    private transient Node lastWaiter;
}

You can see that a queue is also maintained in the condition lock. In order to distinguish it from the AQS queue, I call it the condition queue here. The firstWaiter is the head node of the queue and the lastWaiter is the tail node of the queue

lock.newCondition() method

Create a new conditional lock. ReentrantLockTest does not implement AQS, but sync in ReentrantLockTest implements AQS, so call sync to initialize

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}
// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }

Create a new condition lock, and finally call the ConditionObject class in AQS to instantiate the condition lock.

condition.await() method

  • The condition.await() method indicates that you have to wait for the condition to appear, that is, block the thread, and release all the locks obtained by the thread. addConditionWaiter, fullyRelease and isOnSyncQueue are mentioned
// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // If the thread breaks, an exception is thrown
    if (Thread.interrupted())
        throw new InterruptedException();
    // Add a node to the queue of Condition and return the node
    Node node = addConditionWaiter();
    // Completely release the lock acquired by the current thread
    // Because the lock is reentrant, all the acquired locks should be released here
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // Is it blocked in the synchronization queue
    while (!isOnSyncQueue(node)) {
        // Block current thread
        LockSupport.park(this);
        
        // In the upper part, when await() is called, it releases the lock it holds, blocks itself, and waits for the condition to appear
        // *************************Demarcation line//
        // The following part is that the condition has occurred and attempts to obtain the lock
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // Try to acquire locks. Note the second parameter savedState, which is how many locks are held
    // If not, it will block again
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // Clear cancelled nodes
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // Thread interrupt related
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • The condition.addConditionWaiter method queues the node, and the waiting state is CONDITION, and returns this thread node
 // AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If the tail node of the condition queue is cancelled, all cancelled nodes are cleared from the beginning node
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // Retrieve tail node
        t = lastWaiter;
    }
    // Create a new node whose waiting state is CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // If the tail node is empty, the new node is assigned to the head node (equivalent to initializing the queue)
    // Otherwise, assign the new node to the nextWaiter pointer of the tail node
    if (t == null)
        // The head node is the thread node, so the head node contains threads, which is different from AQS queue
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // The tail node points to the new node
    lastWaiter = node;
    // Return to new node
    return node;
}
  • AQS.fullyRelease releases all locks acquired by the secondary thread node and returns the number of locks acquired
// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // Obtain the value of the state variable and repeatedly obtain the lock. This value will always accumulate
        // Therefore, this value also represents the number of times to obtain the lock
        int savedState = getState();
        // Release all acquired locks at once
        if (release(savedState)) {
            failed = false;
            // Returns the number of times the lock was acquired
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
  • The AQS.isOnSyncQueue method determines whether the node is in the AQS queue
// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // If the wait state is CONDITION or the previous pointer is null, false is returned
    // Description has not been moved to the AQS queue
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // If the next pointer has a value, it indicates that it has been moved to the AQS queue
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // Start from the tail node of AQS and look forward to see if the current node can be found. If it is found, it means that it is already in the AQS queue
    return findNodeFromTail(node);
}

Explanation:

(1) The Condition queue is not exactly the same as the AQS queue;

// The AQS queue header node is a virtual node without any value;

// The queue header node of Condition is a real node that stores real element values. It is mentioned in addConditionWaiter

(2) Changes of various waiting status;

// First, in the condition queue, the initial waiting state of the new node is CONDITION(-2);
// Secondly, when moving to the AQS queue, the waiting state will change to 0 (the initial waiting state of the AQS queue node is 0);
// Then, if blocking is required in the AQS queue, the waiting state of the previous node will be set to SIGNAL(-1), as mentioned in shouldParkAfterFailedAcquire in AQS
// Finally, whether in the Condition queue or AQS queue, the waiting status of the cancelled node will be set to CANCELLED(1);

(3) Similar names;

// The next node in AQS is next, and the previous node is prev; The next node in the Condition is nextWaiter, and there is no previous node.

The general flow of await() method:

(1) Create a new node and add it to the condition queue to addConditionWaiter

(2) Completely release the lock held by the current thread; fullyRelease

(3) Block the current thread and wait for the condition to appear; In while of addConditionWaiter

(4) When the condition has appeared (at this time, the node has been moved to the AQS queue), try to obtain the lock; After the while of addConditionWaiter

In other words, the await() method is actually a process of releasing the lock first - > waiting conditions - > acquiring the lock again.

condition.signal() method

  • The condition.signal() method notifies that the condition has occurred. Make a judgment first. If there is a header node in the condition queue, notify the header node, but not necessarily wake up the header node
// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // If the current thread does not hold the lock, call this method to throw an exception
    // Note that signal() should also be executed after obtaining the lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // Header node of condition queue
    Node first = firstWaiter;
    // If there is a node with a waiting condition, it is notified that the condition has been established
    if (first != null)
        doSignal(first);
}
  • The condition.doSignal method transfers the head node of the condition queue to the AQS queue, but it does not necessarily join. If this node is in the cancelled state, it will directly find the next node
// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // Move to the next bit of the head node of the condition queue. If the head node has no subsequent nodes, the tail node is directly set to empty
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // This is equivalent to dequeuing the head node from the queue
        first.nextWaiter = null;
        // Transfer the node to the AQS queue. If the transfer fails, find the next node
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
  • The AQS.transferForSignal method enters the node into the AQS queue
// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // Change the state of the node to 0 (that is, the default state), that is, it will be moved to the AQS queue
    // If it fails, it indicates that the node has been changed to cancel status
    // false is returned. It can be seen from the above loop that the next available node will be found
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // Call the AQS queue method to move the node to the AQS queue
    // Note that the return value of enq() here is the previous node of node, that is, the old tail node
    Node p = enq(node);
    // Wait state of the previous node
    int ws = p.waitStatus;
    // If the previous node has been cancelled, or the update status is SIGNAL failed (it also indicates that the previous node has been cancelled)
    // Wake up the thread corresponding to the current node directly
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // If the waiting status of the last node is updated to SIGNAL, it is successful
    // Returns true. At this time, the above loop is not established and exits the loop, that is, only one node is notified
    // At this time, the current node is still blocked
    // That is, calling signal() does not necessarily wake up a node
    // Just move the node from the condition queue to the AQS queue
    return true;
}

The general flow of the signal() method is as follows:

(1) Start from the head node of the condition queue to find a node in a non cancelled state;

(2) Move it from the condition queue to the AQS queue;

(3) And only one node is moved;

Note that calling the signal() method here does not necessarily wake up a node, so when does it wake up a node?

In the first example, after the signal() method, the lock.unlock() method will be executed finally. Only then will a node really wake up. If the awakened node was once a conditional node, it will continue to execute the code under the "boundary" of await() method.

If it is represented by a diagram, the following diagram can roughly represent it

summary

(1) Reentry lock refers to a lock that can be acquired repeatedly, that is, when a thread attempts to acquire a lock after acquiring the lock, it will automatically acquire the lock;

(2) In ReentrantLock, reentrant lock is realized by continuously accumulating the value of state variable;

(3) The release of ReentrantLock should match the acquisition, that is, it should be released several times after acquisition;

(4) ReentrantLock defaults to the unfair mode because the unfair mode is more efficient;

(5) Conditional lock refers to a lock used to wait for a condition to occur;

(6) The classic usage scenario of conditional lock is to block the conditional notEmpty when the queue is empty;

(7) The conditional lock in ReentrantLock is implemented through the ConditionObject internal class of AQS;

(8) Both await() and signal() methods must be used after obtaining the lock and before releasing the lock;

(9) await() method will create a new node and put it in the condition queue, then release the lock completely, then block the current thread and wait for the condition to appear;

(10) The signal() method will find the first available node in the condition queue and move it to the AQS queue;

(11) Only when the thread calling the signal() method calls the unlock() method can the node blocking the condition really wake up (at this time, the node is already in the AQS queue);

(12) After that, the node will try to acquire the lock again. The following logic is basically the same as that of lock().

Keywords: Java Back-end source code

Added by brucensal on Wed, 17 Nov 2021 06:03:10 +0200