Disk by Disk AQS and ReentrantLock

What is AQS?

AQS (AbstractQueued Synchronizer) is the basic framework for Java to build locks and other synchronization components concurrently. Many synchronization class implementations rely on it, such as ReentrantLock/ReentrantReadWriterLock/CountDownLatch, etc.
 
AQS provides two ways of resource sharing: Exclusive and Share.
acquire(acquireShare)/release(releaseShare).  
acquire: Get the resource, return it directly if the current resource meets the criteria, otherwise suspend the current thread and join it in the queue.
release: release resources, wake up suspended threads
 
 

AQS queue

AQS Queue Diagram

 

Main attributes in AQS queues

//  Waiting for the head of the queue
private transient volatile Node head;

// Waiting for the tail of the queue
private transient volatile Node tail;

// Lock status (1 for successful locking, 0 for unlocking, reentry and reentry)+1)
private volatile int state;

//  The thread currently holding the lock, note that this property is from AbstractOwnableSynchronizer Inheritance
private transient Thread exclusiveOwnerThread;

 

Main attributes in Node classes

static final class Node {
    // The tag indicates that the node is waiting in shared mode
    static final Node SHARED = new Node();
    // The tag indicates that the node is waiting in exclusive mode
    static final Node EXCLUSIVE = null;

    // The waiting state of the node also has an initialization state 0 which does not belong to the following four states
    // Express Node The current thread represented has cancelled the queue, that is to say, the acquisition lock has been abandoned.
    static final int CANCELLED =  1;
    // When a node's waitStatus Be set as SIGNAL,It means that its next node (that is, its successor node) has been suspended (or will be suspended soon).
    // As long as the predecessor node releases the lock, the notification is identified as SIGNAL Thread execution of successor nodes of state
    static final int SIGNAL    = -1;
    // Nodes are in a waiting queue
    // When other threads pair Condition Called signal()After that, the node will be transferred from the waiting queue to the synchronization queue and joined in the acquisition of synchronization status.
    static final int CONDITION = -2;
    // Represents that the next shared synchronization state acquisition will be propagated unconditionally
    static final int PROPAGATE = -3;

    // The waiting state of the node is initialized to 0.
    volatile int waitStatus;

        // Pre-node of current node
    volatile Node prev;

    // Postnode of current node
    volatile Node next;

        // Line information queued on this node
    volatile Thread thread;
}

 

Implementation of ReentrantLock

Before introducing the ReentrantLock implementation, I'd like to take a look at Doug Lea, the author of util.concurrent package, which is more difficult to read than others. Mr. Doug Lea, who always has a modest and shy smile on his face, uses a lot of relatively complex logical judgments, such as multiple or multiple methods in a judgment condition, which makes it difficult for you to keep up with his rhythm and to figure out his design ideas. It's not that I'm too dishy to force myself in a low voice, leaving behind unskilled tears.

 

Inheritance Diagram

ReentrantLock, an implementation class of Lock interface, is a reentrant exclusive lock.
ReentrantLock implements the API of the AQS framework (AbstractQueued Synchronizer) through internal classes to realize the function of exclusive locks.

 

Main attributes

private final Sync sync;

// The inside of the fair lock is FairSync,Unfair Lock Internal is NonfairSync. 
// Both are inherited. Sync Indirect inheritance from AbstractQueuedSynchronizer This abstract class
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    
    // Lock up
    abstract void lock();

    // Attempt to acquire locks
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // Trying to release the lock
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

 

Construction method

//Create an unfair lock by default
public ReentrantLock() {
    sync = new NonfairSync();
}

//afferent true Create a fair lock. false Unfair Lock
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

 

ReentrantLock Fair Lock

We take fair lock as an example to analyze the source code of important methods.

// Inherited Sync,So it inherited indirectly. AbstractQueuedSynchronizer This abstract class
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

       // Lock up
    final void lock() {
        //call AQS in acquire Method
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // CAS Operational settings state
                // Set the current thread to a thread with a lock
                setExclusiveOwnerThread(current);
                return true;
            }
        }

        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

 

Source code analysis of acquire method

public final void acquire(int arg) {
    // tryAcquire(arg)Attempt to lock, if the lock fails, it will be called acquireQueued Method joins the queue to queue and will not be invoked if the lock is successful
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
The acquire method does so many things.
1. tryAcquire() tries to get resources, and returns directly if it succeeds.
2. addWaiter() adds the thread to the waiting queue and updates the AQS queue chain information
3. acquireQueued() causes threads to retrieve resources in a waiting queue until they retrieve resources. If the whole waiting process is interrupted, return true, otherwise return false.
4. selfInterrupt() interrupts itself. If the thread is interrupted while waiting, it will not respond. The interruption will be filled only after the resource is obtained.
 

tryAcquire method

protected final boolean tryAcquire(int acquires) {
    // Get the current thread
    final Thread current = Thread.currentThread();
    // Obtain lock Object's lock state, if the lock is free state=0,If locked, it is 1, greater than 1 means reentry.
    int c = getState();

    // c=0 Represents that no lock is occupied, and the current thread can directly access lock resource execution
    if (c == 0) {
        // Here's an introduction hasQueuedPredecessors()Method: Determine if you need to queue up
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // CAS Operational settings state
            // Set the current thread to a thread with a lock
            setExclusiveOwnerThread(current);
            return true;
        }
    }

    // Non-reentrant locks return directly false,Failure to lock
    else if (current == getExclusiveOwnerThread()) {
        // If re-entry lock, state Plus 1 (acquires)
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

 

HasQueued Predecessors Method

public final boolean hasQueuedPredecessors() {
    // Getting the information of the queue's head and tail nodes
    Node t = tail; 
    Node h = head;
    Node s;
    // h != t There are several situations.
    // 1,The queue has not yet been initialized. The first thread acquires the lock resource.
    //    here h and t All are null, h != t Return fasle Initialization queue
    // 2,The queue has been initialized, and other threads try to get resources.
    //    At this time, the head and tail nodes are different. h!=t Return true,
    //    Continue to judge s.thread != Thread.currentThread() The thread currently competing for the lock is the same thread as the first queued thread.,You need to queue up.
    // 3,The queue has been initialized, but there is only one data in the queue due to lock release.
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

 

addWaiter method

private Node addWaiter(Node mode) {
    // AQS The element type in the queue is Node,You need to encapsulate the current thread into one Node object
    Node node = new Node(Thread.currentThread(), mode);

    // tail For the end of the team, assign a value to pred
    Node pred = tai
    // judge pred Whether the queue is empty or not is actually to determine whether there are nodes at the end of the queue. In fact, as long as the queue is initialized, the end of the queue is certainly not empty.
    if (pred != null) {
        // Assemble node Queue Chain Processes
        // Directly encapsulating the current thread node The last node is set to pred That's the end of the original team.
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // pred The next node is set to node
            pred.next = node;
            return node;
        }
    }

    // Splicing aqs Queue Chain
    enq(node);
    return node;
}


private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

 

acquireQueued method

final boolean acquireQueued(final Node node, int arg) {
    // Does the tag succeed in getting resources?
    boolean failed = true;
    try {
        // Whether the marker has been interrupted during the waiting process
        boolean interrupted = false;
        // spin
        for (;;) {
            final Node p = node.predecessor();
            // Determine whether you are the second node in the queue
            // Being the second node in the queue qualifies for resources
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // Have you been interrupted during the return wait?
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 

Main differences between fair locks and unfair locks

For the convenience of comparison, the source code of the locking process of two kinds of locks is listed here. Note the red identification fragment.

// Fair Locking Process
final void lock() {
    //call AQS in acquire Method
    acquire(1);
}  

 

// Unfair locking process
final void lock() {
    // Attempt to acquire the lock, unlock unsuccessfully queue up. The only chance to jump before queuing.
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

 

 

summary

1. If the first thread tries to get resources, it has nothing to do with the AQS queue, and the thread holds locks directly. And the queue will not be initialized. If the next thread is executed alternately, then it has nothing to do with the AQS queue and all threads hold locks directly.
2. The AQS queue is initialized only when there is resource competition among threads. The head of the AQS queue is always a virtual node whose Thread is NULL.
3. Threads that fail to retrieve resources will be in the park state, where only the second node in the queue waits to be awakened to try to retrieve resources. Other nodes do not compete for resources, which is the essence of the AQS queue and reduces CPU usage.
4. The lock of fair lock is to judge whether it needs queuing or not; instead, fair lock is to modify CAS counter directly to see if it can lock successfully; if the lock is unsuccessful, it will queue up (call acquire); so it is fair or unfair; as long as it enters the AQS queue, it will queue up; once queued up, it will queue up. Always queue up!

Keywords: Java Fragment

Added by theda on Mon, 19 Aug 2019 11:43:01 +0300