Reentrantlock AQS source code analysis

Speaking of AQS, we have to say that the classic ReentrantLock is the best embodiment of AQS
However, we can briefly talk about AQS characteristics first

What is AQS?

Abstract queue synchronizer

All request threads form a CLH queue. When a thread finishes executing lock When you unlock, your subsequent nodes will be activated. The executing threads are not in the queue, and all threads waiting for execution are blocked
First try to acquire the lock through CAS. If a thread has occupied the lock at this time, it will join the tail of the CLH queue and be suspended. When the lock is released, the thread at the top of the CLH queue will be awakened, and then CAS will try to acquire the lock again. At this time, if:

  1. Unfair lock: if another thread comes in to try to obtain it at the same time, it may let this thread obtain it first;

  2. Fair lock: if another thread comes in at the same time to try to obtain it, when it finds that it is not at the head of the queue, it will queue to the end of the queue and the thread at the head of the queue will obtain the lock.

    AQS maintains an integer variable state of shared resource volatile, and completes the queuing of resource obtaining threads through the built-in FIFO. (this built-in synchronization queue is called "CLH" queue: the two-way acyclic linked list of the leading Node). The queue consists of Node nodes one by one. Each Node maintains a prev reference and a next reference, pointing to its own predecessor and successor nodes respectively. AQS maintains two pointers, which point to the head and tail of the queue respectively.

Other usage scenarios

The commonly used CountDownLatch, ReentrantLock, Semaphore and other classes define an internal class called Sync internally, and the Sync class inherits the AQS abstract class and rewrites some necessary methods

In reentrant lock ReentrantLock, state can be used to indicate the number of reentrant times that the current thread obtains the lock; For read-write locks

For ReentrantReadWriteLock, the high 16 bits of state represent the number of reentrant times of the thread that has obtained the read lock, and the low 16 bits are the write lock;

For Semaphore, state is used to indicate the number of currently available signals

For CountDownlatch, state is used to represent the current value of the counter

Two synchronization modes (bottom support)

1. Exclusive

ReentrantLock

mutex

2. Shared

CountDownLatch,ReadWriteLock

Allow multithreaded reads

Custom synchronizer

Inherit AbstractQueuedSynchronizer class

Rewrite corresponding method

Invoke the AQS template method in our component

The waiting queue of AQS is shown in the figure below

The head is not included, and the blocking queue is behind it

Generally, we use ReentrantLock in this way

public class OrderService {
    // Use static so that each thread gets the same lock. Of course, the service in spring mvc is a single instance by default. Don't worry. This true represents the writing of fair lock
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        // For example, we only allow one thread to create orders at the same time
        reentrantLock.lock();
        // Typically, lock is followed by a try statement
        try {
            // Only one thread can enter this code at a time (the thread that obtains the lock),
            // Other threads block on the lock() method and wait for the lock to be obtained before entering
            // Execute code
        } finally {
            // Release lock
            reentrantLock.unlock();
        }
    }
}

The following analyzes the source code in the way of fair lock

Let's take a look at the attributes

// The head node, which you directly regard as the thread currently holding the lock, may be the best understood
private transient volatile Node head;

// The blocked tail node, each new node coming in, is inserted into the end, which forms a linked list
private transient volatile Node tail;

// This is the most important. It represents the state of the current lock. 0 means it is not occupied. Greater than 0 means that a thread holds the current lock
// This value can be greater than 1 because the lock can be re entered, adding 1 to each re-entry
private volatile int state;

// Represents the thread that currently holds the exclusive lock. For example, the most important use example is because the lock can be re entered
// reentrantLock.lock() can be nested multiple times, so it is used every time to judge whether the current thread already has a lock
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //Inherited from AbstractOwnableSynchronizer
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    //Contention lock
    final void lock() {
        acquire(1);
    }
}
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
        //If tryAcquire(arg) returns true, it ends; Otherwise, the acquirequeueueueued method pushes the thread into the queue
        //Because it is possible to succeed directly, there is no need to queue up; The meaning of fair lock is that no one holds the lock, and there is no need to wait in the queue (hang up and wake up)
        //tryAcquire(arg) failed. At this time, you need to suspend the current thread and put it into the blocking queue.
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire is like this

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //state == 0 no thread holds a lock at this time
        if (c == 0) {
            //The lock is available because it is a fair lock. Check whether someone else is already waiting
            if (!hasQueuedPredecessors() &&
                //If not, cas attempts to acquire the lock. If not, it proves that the lock has been acquired concurrently;
                compareAndSetState(0, acquires)) {
                    //Get lock, tag
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //If it is consistent with the marked thread, the lock is re entered, state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
//No lock was acquired at this time
    // The function of this method is to wrap the thread into node and enter the queue at the same time
    // The parameter mode is now node Exclusive stands for exclusive mode
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // The following lines of code want to add the current node to the last side of the linked list, that is, to the end of the blocking queue
        Node pred = tail;

        // tail!= Null = > the queue is not empty (when tail==head, the queue is actually empty, but no matter this)
        if (pred != null) { 
            // Set the current queue tail node as its own precursor 
            node.prev = pred; 
            // Use CAS to set itself as the tail of the queue. If it succeeds, the tail == node will become the new tail of the blocking queue
            if (compareAndSetTail(pred, node)) { 
                // Entering here indicates that the setting is successful. The current node==tail connects itself to the end of the previous team,
                // There is already a node on it Prev = PRED, plus the following sentence, the two-way connection with the previous tail node is realized
                pred.next = node;
                // The thread has joined the queue and can return
                return node;
            }
        }
        // Take a closer look at the code above. If you get here,
        // Description PRED = = null (the queue is empty) or CAS fails (there are threads competing to join the queue)
        enq(node);
        return node;
    }
// Join the team by spinning
    // As mentioned before, there are only two possibilities for this method: the waiting queue is empty or there are threads competing to join the queue,
    // The meaning of spin here is: in the process of CAS setting tail, if I can't compete once, I will compete many times, and I will always be ranked
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // If the queue is empty
            if (t == null) { // Must initialize
                // Initialize the head node
                // Careful readers will know that both head and tail are null when initialized
                // Or one step CAS, you know, now many threads may come in at the same time
                if (compareAndSetHead(new Node()))
                    // For later use: at this time, the waitstatus of the head node = = 0. Just look at the construction method of new Node()

                    // At this time, there is a head, but the tail is still null. Set it,
                    // Point the tail to the head. Don't worry. There will be a thread coming soon. At that time, the tail will be robbed
                    // Note: only tail=head is set here. There is no return here. Oh, no return, no return
                    // Therefore, after setting, continue the for loop and next time go to the else branch below
                    tail = head;
            } else {
                // The following lines are the same as the previous method addWaiter,
                // It's just that this set is in an infinite loop. Anyway, it is to queue the current thread to the end of the queue. If there is thread competition, it can't be ranked repeatedly
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
// Back to this code, it's time to see acquirequeueueueued
    // if (!tryAcquire(arg) 
    //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    //     selfInterrupt();    
    
    // The following method, the parameter node, does not get the lock, and passes through addWaiter(Node.EXCLUSIVE). At this time, it has entered the blocking queue
    // Note: if acquirequeueueueued (addwaiter (node. Exclusive, Arg)) returns true,
    // This means that the above code will enter selfInterrupt(), so normally, false should be returned below
    // This method is very important. It should be said that the real thread hangs and then wakes up to obtain the lock. It is all in this method
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //Get precursor node
                final Node p = node.predecessor();
                // p == head indicates that although the current node has entered the blocking queue, it is the first one in the blocking queue because its predecessor is head
                // Note that the blocking queue does not contain a head node. Head generally refers to the thread occupying the lock, and the thread behind head is called blocking queue
                // So the current node can try to grab the lock
                // Here's why we can try:
                // First, it is the team head. This is the first condition. Second, the current head may be the node just initialized,
                // It is mentioned in the enq(node) method that the head is delayed initialization, and no thread is set during new Node()
                // In other words, the current head does not belong to any thread, so as the head of the team, you can have a try,
                // tryAcquire has been analyzed and forgotten. Please look ahead and simply try to operate state with CAS
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // Here, it means that the if branch above is not successful, or the current node is not the head of the team,
                // Or tryAcquire(arg) didn't win
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // When will failed be true? -- > The tryacquire() method throws exceptions
            if (failed)
                cancelAcquire(node);
        }
    }
// As I just said, when I get here, I don't grab the lock. This method says: "the current thread doesn't grab the lock. Do you need to suspend the current thread?"
    // The first parameter is the precursor node, and the second parameter is the node representing the current thread
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // waitStatus == -1 of the precursor node indicates that the precursor node is in normal status and the current thread needs to be suspended. You can directly return true
        if (ws == Node.SIGNAL)
            return true;

        // The waitStatus of the precursor node is greater than 0. As mentioned earlier, if it is greater than 0, it means that the precursor node has cancelled the queue.
        // Here we need to know this: the thread queued into the blocking queue will be suspended, and the wake-up operation is completed by the precursor node.
        // So the following code points the prev of the current node to the node with waitstatus < = 0,
        // In short, it's to find a good father, because you still have to rely on it to wake up. If the precursor node cancels the queue,
        // Find the precursor node of the precursor node to be the father. You can always find a good father through forward traversal
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // Think about what it means to enter this branch
            // The waitStatus of the precursor node is not equal to - 1 and 1, that is, it can only be 0, - 2, - 3
            // In our previous source code, we didn't see the setting of waitStatus, so when each new node joins the team, waitStatu is 0
            // Normally, if the precursor node is the previous tail, its waitStatus should be 0
            // Use CAS to set the waitStatus of the precursor node to node Signal (i.e. - 1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // If this method returns false, it will go through the for sequence again,
        //     Then enter this method again and return true from the first branch
        return false;
    }
//Hang
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

Analysis: parkAndCheckInterrupt is suspended if shouldParkAfterFailedAcquire returns true
If false is returned (ws must be 0 for the first time, because there is no previous setting, compareAndSetWaitStatus(pred, ws, Node.SIGNAL) depends on the above spin after setting
Enter status again before it is - 1, and return true)
Why does shouldParkAfterFailedAcquire(p, node) not directly suspend the thread when it returns false?
=>This is to cope with the fact that after this method, the node is already the direct successor node of the head??.

Let's see how to release the lock

    public final boolean release(int arg) {
        //Modify the lock counter. If the counter is 0, the lock is released
        if (tryRelease(arg)) {
            Node h = head;
            //The waitStatus of the head node is not equal to 0, indicating that the thread corresponding to the successor node of the head node is blocking and waiting to be awakened
            if (h != null && h.waitStatus != 0)
                //Wake up the successor node
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // Is the lock fully released
        boolean free = false;
        // In fact, it is the problem of reentry. If c==0, that is, there is no nested lock, it can be released, otherwise it cannot be released
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
   
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //Successor node
        Node s = node.next;
        // The following code wakes up the successor node, but it is possible that the successor node cancelled the wait (waitStatus==1)
        // Look forward from the end of the tail queue and find the top of all nodes with waitstatus < = 0. There is no break during the period, so s=t is the top in the end
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //Wake up the successor node
            LockSupport.unpark(s.thread);
    }

The detailed instructions are already in the code. No special instructions will be given for the time being,
In the whole process, cas setting state is very important, so what does state represent?

state effect

  • 0: no thread holds lock
  • cas is set to 1, which proves that the lock has been obtained
  • Lock reentry state+1

On the process chart




You can see that the precursor node waitStatus is set to - 1

Well, fair locks are roughly the same. What's the difference between unfair locks and fair locks?
There are only two differences between fair locks and unfair locks:

  1. After calling lock, the unfair lock will first call CAS to grab the lock. If the lock is not occupied at this time, the lock will be directly obtained and returned.
  2. After CAS fails, the non fair lock will enter the tryAcquire method just like the fair lock. In the tryAcquire method, if the lock is found to be released at this time (state == 0), the non fair lock will directly grab the lock from CAS, but the fair lock will judge whether there are threads in the waiting queue in the waiting state. If so, it will not grab the lock and wait behind

About interrupt

Using lockInterruptibly(), the internal implementation is as follows

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // That's it. Once there is an exception, end the method immediately and throw an exception.
                // It's no longer just marking that the return value of this method represents the interrupt state
                // Instead, the exception is thrown directly, and the outer layer is not caught. It is always thrown out to lockinterruptible
                throw new InterruptedException();
        }
    } finally {
        // If the exception is passed through InterruptedException, the failed is true
        if (failed)
            cancelAcquire(node);
    }
}

The lock itself only marks the interrupt flag when it park s the corresponding interrupt. The calling room needs to check whether it is interrupted, and lockInterruptibly throws an exception directly after checking

Response interrupt

Each thread is associated with an interrupt state, which is a boolean value of true or false, and the initial value is false.
If the thread is in the following three situations, it can automatically sense when the thread is interrupted:

  1. wait(), wait(long), wait(long, int) from the Object class,
    Methods: throws InterruptedException
    Reset the interrupt status to false.
  2. Implements some I/O blocking operations in the classes of the InterruptibleChannel interface, such as the connect method and receive method in DatagramChannel
    Throw closedbyinteruptexception and reset the interrupt state
  3. For the select method in Selector, refer to the NIO article I wrote
    Once interrupted, the method returns immediately
    Thread blocking pending locksupport Park (object obj), the interrupt will cause wake-up, and the interrupt state will not be reset after wake-up

Come on, attach the original address. It's very well written. I suggest you read it carefully: https://javadoop.com/post/AbstractQueuedSynchronizer

Keywords: Java Back-end source code analysis aqs ReentrantLock

Added by bedted on Mon, 07 Feb 2022 13:06:47 +0200