What is AQS?
AQS (AbstractQueued Synchronizer) is the basic framework for Java to build locks and other synchronization components concurrently. Many synchronization class implementations rely on it, such as ReentrantLock/ReentrantReadWriterLock/CountDownLatch, etc.
AQS provides two ways of resource sharing: Exclusive and Share.
acquire(acquireShare)/release(releaseShare).
acquire: Get the resource, return it directly if the current resource meets the criteria, otherwise suspend the current thread and join it in the queue.
release: release resources, wake up suspended threads
AQS queue
AQS Queue Diagram
Main attributes in AQS queues
// Waiting for the head of the queue private transient volatile Node head; // Waiting for the tail of the queue private transient volatile Node tail; // Lock status (1 for successful locking, 0 for unlocking, reentry and reentry)+1) private volatile int state; // The thread currently holding the lock, note that this property is from AbstractOwnableSynchronizer Inheritance private transient Thread exclusiveOwnerThread;
Main attributes in Node classes
static final class Node { // The tag indicates that the node is waiting in shared mode static final Node SHARED = new Node(); // The tag indicates that the node is waiting in exclusive mode static final Node EXCLUSIVE = null; // The waiting state of the node also has an initialization state 0 which does not belong to the following four states // Express Node The current thread represented has cancelled the queue, that is to say, the acquisition lock has been abandoned. static final int CANCELLED = 1; // When a node's waitStatus Be set as SIGNAL,It means that its next node (that is, its successor node) has been suspended (or will be suspended soon). // As long as the predecessor node releases the lock, the notification is identified as SIGNAL Thread execution of successor nodes of state static final int SIGNAL = -1; // Nodes are in a waiting queue // When other threads pair Condition Called signal()After that, the node will be transferred from the waiting queue to the synchronization queue and joined in the acquisition of synchronization status. static final int CONDITION = -2; // Represents that the next shared synchronization state acquisition will be propagated unconditionally static final int PROPAGATE = -3; // The waiting state of the node is initialized to 0. volatile int waitStatus; // Pre-node of current node volatile Node prev; // Postnode of current node volatile Node next; // Line information queued on this node volatile Thread thread; }
Implementation of ReentrantLock
Before introducing the ReentrantLock implementation, I'd like to take a look at Doug Lea, the author of util.concurrent package, which is more difficult to read than others. Mr. Doug Lea, who always has a modest and shy smile on his face, uses a lot of relatively complex logical judgments, such as multiple or multiple methods in a judgment condition, which makes it difficult for you to keep up with his rhythm and to figure out his design ideas. It's not that I'm too dishy to force myself in a low voice, leaving behind unskilled tears.
Inheritance Diagram
ReentrantLock, an implementation class of Lock interface, is a reentrant exclusive lock.
ReentrantLock implements the API of the AQS framework (AbstractQueued Synchronizer) through internal classes to realize the function of exclusive locks.
Main attributes
private final Sync sync; // The inside of the fair lock is FairSync,Unfair Lock Internal is NonfairSync. // Both are inherited. Sync Indirect inheritance from AbstractQueuedSynchronizer This abstract class abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; // Lock up abstract void lock(); // Attempt to acquire locks final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Trying to release the lock protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }
Construction method
//Create an unfair lock by default public ReentrantLock() { sync = new NonfairSync(); } //afferent true Create a fair lock. false Unfair Lock public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock Fair Lock
We take fair lock as an example to analyze the source code of important methods.
// Inherited Sync,So it inherited indirectly. AbstractQueuedSynchronizer This abstract class static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // Lock up final void lock() { //call AQS in acquire Method acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // CAS Operational settings state // Set the current thread to a thread with a lock setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
Source code analysis of acquire method
public final void acquire(int arg) { // tryAcquire(arg)Attempt to lock, if the lock fails, it will be called acquireQueued Method joins the queue to queue and will not be invoked if the lock is successful if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
The acquire method does so many things.
1. tryAcquire() tries to get resources, and returns directly if it succeeds.
2. addWaiter() adds the thread to the waiting queue and updates the AQS queue chain information
3. acquireQueued() causes threads to retrieve resources in a waiting queue until they retrieve resources. If the whole waiting process is interrupted, return true, otherwise return false.
4. selfInterrupt() interrupts itself. If the thread is interrupted while waiting, it will not respond. The interruption will be filled only after the resource is obtained.
tryAcquire method
protected final boolean tryAcquire(int acquires) { // Get the current thread final Thread current = Thread.currentThread(); // Obtain lock Object's lock state, if the lock is free state=0,If locked, it is 1, greater than 1 means reentry. int c = getState(); // c=0 Represents that no lock is occupied, and the current thread can directly access lock resource execution if (c == 0) { // Here's an introduction hasQueuedPredecessors()Method: Determine if you need to queue up if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // CAS Operational settings state // Set the current thread to a thread with a lock setExclusiveOwnerThread(current); return true; } } // Non-reentrant locks return directly false,Failure to lock else if (current == getExclusiveOwnerThread()) { // If re-entry lock, state Plus 1 (acquires) int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
HasQueued Predecessors Method
public final boolean hasQueuedPredecessors() { // Getting the information of the queue's head and tail nodes Node t = tail; Node h = head; Node s; // h != t There are several situations. // 1,The queue has not yet been initialized. The first thread acquires the lock resource. // here h and t All are null, h != t Return fasle Initialization queue // 2,The queue has been initialized, and other threads try to get resources. // At this time, the head and tail nodes are different. h!=t Return true, // Continue to judge s.thread != Thread.currentThread() The thread currently competing for the lock is the same thread as the first queued thread.,You need to queue up. // 3,The queue has been initialized, but there is only one data in the queue due to lock release. return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
addWaiter method
private Node addWaiter(Node mode) { // AQS The element type in the queue is Node,You need to encapsulate the current thread into one Node object Node node = new Node(Thread.currentThread(), mode); // tail For the end of the team, assign a value to pred Node pred = tai // judge pred Whether the queue is empty or not is actually to determine whether there are nodes at the end of the queue. In fact, as long as the queue is initialized, the end of the queue is certainly not empty. if (pred != null) { // Assemble node Queue Chain Processes // Directly encapsulating the current thread node The last node is set to pred That's the end of the original team. node.prev = pred; if (compareAndSetTail(pred, node)) { // pred The next node is set to node pred.next = node; return node; } } // Splicing aqs Queue Chain enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued method
final boolean acquireQueued(final Node node, int arg) { // Does the tag succeed in getting resources? boolean failed = true; try { // Whether the marker has been interrupted during the waiting process boolean interrupted = false; // spin for (;;) { final Node p = node.predecessor(); // Determine whether you are the second node in the queue // Being the second node in the queue qualifies for resources if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // Have you been interrupted during the return wait? return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
Main differences between fair locks and unfair locks
For the convenience of comparison, the source code of the locking process of two kinds of locks is listed here. Note the red identification fragment.
// Fair Locking Process final void lock() { //call AQS in acquire Method acquire(1); }
// Unfair locking process final void lock() { // Attempt to acquire the lock, unlock unsuccessfully queue up. The only chance to jump before queuing. if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
summary
1. If the first thread tries to get resources, it has nothing to do with the AQS queue, and the thread holds locks directly. And the queue will not be initialized. If the next thread is executed alternately, then it has nothing to do with the AQS queue and all threads hold locks directly.
2. The AQS queue is initialized only when there is resource competition among threads. The head of the AQS queue is always a virtual node whose Thread is NULL.
3. Threads that fail to retrieve resources will be in the park state, where only the second node in the queue waits to be awakened to try to retrieve resources. Other nodes do not compete for resources, which is the essence of the AQS queue and reduces CPU usage.
4. The lock of fair lock is to judge whether it needs queuing or not; instead, fair lock is to modify CAS counter directly to see if it can lock successfully; if the lock is unsuccessful, it will queue up (call acquire); so it is fair or unfair; as long as it enters the AQS queue, it will queue up; once queued up, it will queue up. Always queue up!