ReentrantLock source code analysis of Java synchronizer 2

problem

(1) What is a conditional lock?

(2) What scenarios are conditional locks applicable to?

(3) Did await() of conditional lock wake up when other threads signal()?

brief introduction

Condition lock refers to a lock used when you find that the current business scenario cannot be processed by yourself after obtaining the lock, but you need to wait for a condition to appear before you can continue processing.

For example, in the blocking queue, when there are no elements in the queue, an element cannot be ejected. At this time, it is necessary to block on the condition notEmpty, wait for other threads to put an element in it, wake up the condition notEmpty, and the current thread can continue to eject elements.

Note that the condition here must wait after obtaining the lock. The condition lock corresponding to ReentrantLock can only be called after obtaining the lock Await() method.

In Java, conditional locks are implemented in the ConditionObject class of AQS. ConditionObject implements the Condition interface. Let's enter the learning of conditional locks through an example.

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        //Conditional variable
        Condition condition = lock.newCondition();

        new Thread(()->{
            lock.lock(); //1
            try{
                System.out.println("before await"); //2
                condition.await();    //3
                System.out.println("after await"); //10
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();  //11
            }
        }).start();

        Thread.sleep(1000);
        lock.lock(); //4
        try {
            Thread.sleep(2000);  //5
            System.out.println("before signal");  //6
            //The notification conditions are established
            condition.signal(); //7
            System.out.println("after signal");   //8
        }finally {
            lock.unlock();  //9
        }
    }
}

The above code is very simple. One thread waits for the condition and another thread notifies that the condition has been established. The following numbers represent the actual running order of the code. If you can understand the order, you can master the basic condition lock almost.

Source code analysis

Main properties of ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

You can see that a queue is also maintained in the condition lock. In order to distinguish it from the AQS queue, it is called the condition queue here. The firstWaiter is the head node of the queue and the lastWaiter is the tail node of the queue.

lock.newCondition() method

Create a new conditional lock

    public Condition newCondition() {
        return sync.newCondition();
    }
    final ConditionObject newCondition() {
        return new ConditionObject();
    }        

    public ConditionObject() { }

To create a new condition lock, the last step is to call the ConditionObject class in AQS to instantiate the condition lock.

condition.await() method

condition.await() method, indicating that you want to wait for the condition to appear.

   //await() of internal class ConditionObject class in AQS
   public final void await() throws InterruptedException {
        //Throw an exception if the thread is interrupted
        if (Thread.interrupted())
            throw new InterruptedException();
        //Add to the Condition queue and return the node
        Node node = addConditionWaiter();
        //Completely release the lock acquired by the current thread
        //Because the lock is reentrant, all the acquired locks should be released here
        int savedState = fullyRelease(node);
        //
        int interruptMode = 0;
        //Is the current node in the synchronization queue
        while (!isOnSyncQueue(node)) {
            //Block current thread
            LockSupport.park(this);
            //The upper part is to release the lock occupied by yourself when await() is called and block yourself from waiting for the condition to appear

            //The following part is that the condition has occurred and attempts to obtain the lock
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //Clear cancelled nodes
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        //Thread interrupt related
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    // AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If the tail node of the condition queue is cancelled, all cancelled nodes are cleared from the beginning node
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            // Retrieve tail node
            t = lastWaiter;
        }
        // Create a new node whose waiting state is CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // If the tail node is empty, the new node is assigned to the head node (equivalent to initializing the queue)
        // Otherwise, assign the new node to the nextWaiter pointer of the tail node
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        // The tail node points to the new node
        lastWaiter = node;
        // Return to new node
        return node;
    }
    // AbstractQueuedSynchronizer.fullyRelease
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // Obtain the value of the state variable and repeatedly obtain the lock. This value will always accumulate
            // Therefore, this value also represents the number of times to obtain the lock
            int savedState = getState();
            // Release all acquired locks at once
            if (release(savedState)) {
                failed = false;
                // Returns the number of times the lock was acquired
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    // AbstractQueuedSynchronizer.isOnSyncQueue
    final boolean isOnSyncQueue(Node node) {
        // If the wait state is CONDITION or the previous pointer is null, false is returned
        // Description has not been moved to the AQS queue
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // If the next pointer has a value, it indicates that it has been moved to the AQS queue
        if (node.next != null) // If has successor, it must be on queue
            return true;
        // Start from the tail node of AQS and look forward to see if the current node can be found. If it is found, it means that it is already in the AQS queue
        return findNodeFromTail(node);
    }

Here are some difficult points to understand:

1. The Condition queue is not exactly the same as the AQS queue. The AQS queue head node does not have any value and is a virtual node. The queue of Condition is a real node that stores real element values

2. Changes of various waiting status;

First, in the CONDITION queue, the initial waiting state of the new node is CONDITION (- 2);

Secondly, when moving to the AQS queue, the waiting state will change to 0 (the initial waiting state of the AQS queue node is 0);

Then, if blocking is required in the AQS queue, the waiting state of its previous node will be set to SIGNAL (- 1);

Finally, whether in the Condition queue or AQS queue, the waiting status of the CANCELLED node will be set to CANCELLED

In addition, we will talk about another waiting state when sharing locks later, which is called PROPAGATE (- 3).

3. Similar names

The next node in AQS is next, and the previous node is prev

The next node in the Condition is nextWaiter, and there is no previous node

The following is a summary of the general process of await() method:

(1) Create a new node to join the condition queue;

(2) Completely release the lock held by the current thread;

(3) Block the current thread and wait for the condition to appear;

(4) When the condition has appeared (at this time, the node has been moved to the AQS queue), try to obtain the lock;

In other words, the await() method is actually a process of {releasing the lock first - > waiting conditions - > acquiring the lock again.

condition.signal() method

    //Method of notification condition occurrence
    public final void signal() {
        //If the current thread does not hold the lock, calling this method will throw an exception
        //Note that signal() is not executed until the lock is obtained
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //Head node of conditional queue
        Node first = firstWaiter;
        //If there is a queue with waiting conditions, it is notified that the conditions are true
        if (first != null)
            doSignal(first);
    }        
    private void doSignal(Node first) {
        do {
            //If the node after the head node is null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            //It's equivalent to getting the head node out of the team
            first.nextWaiter = null;
            //Transfer node to AQS queue
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    final boolean transferForSignal(Node node) {
        //Change the status of the node to 0 through CAS operation, that is, it will be transferred to AQS queue
        //If it fails, it indicates that the current node has been changed to cancel status
        //false is returned. It can be known from the above loop that the next available node will be found
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        //Call the queue method of AQS to put nodes into AQS column summary
        //Note that the enq() method returns the previous node of node, that is, the old tail node
        Node p = enq(node);
        //Wait state of the previous node
        int ws = p.waitStatus;
        //If the previous node has been cancelled or the update status is SIGNAL failed (indicating that the previous node has been cancelled)
        //Wake up the thread corresponding to the current node directly
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        // If the waiting status of the last node is updated to SIGNAL, it is successful
        // Returns true. At this time, the above loop is not established and exits the loop, that is, only one node is notified
        // At this time, the current node is still blocked
        // That is, calling signal() does not really wake up a node
        // Just move the node from the condition queue to the AQS queue

        return true;
    }

The general flow of the signal() method is as follows:

(1) Start from the head node of the condition queue to find a node in a non cancelled state;

(2) Move it from the condition queue to the AQS queue;

(3) And only one node is moved;

Note that calling the signal() method here does not really wake up a node.

summary

(1) Reentry lock refers to a lock that can be acquired repeatedly, that is, when a thread attempts to acquire a lock after acquiring the lock, it will automatically acquire the lock;

(2) In ReentrantLock, reentrant lock is realized by continuously accumulating the value of state variable;

(3) The release of ReentrantLock should match the acquisition, that is, it should be released several times after acquisition;

(4) ReentrantLock defaults to the unfair mode because the unfair mode is more efficient;

(5) Conditional lock refers to a lock used to wait for a condition to occur;

(6) The classic usage scenario of conditional lock is to block the conditional notEmpty when the queue is empty;

(7) The conditional lock in ReentrantLock is implemented through the ConditionObject internal class of AQS;

(8) Both await() and signal() methods must be used after obtaining the lock and before releasing the lock;

(9) await() method will create a new node and put it in the condition queue, then release the lock completely, then block the current thread and wait for the condition to appear;

(10) The signal() method will find the first available node in the condition queue and move it to the AQS queue;

(11) Only when the thread calling the signal() method calls the unlock() method can the node blocking the condition really wake up (at this time, the node is already in the AQS queue);

(12) After that, the node will try to acquire the lock again. The following logic is basically the same as that of lock().

Added by kybaker on Fri, 31 Dec 2021 08:47:51 +0200