Reference link: https://www.bilibili.com/video/BV12K411G7Fg
Through CAS, we can realize optimistic lock operation to synchronize threads. However, through the source code of CAS, we find that CAS can only modify a value in memory, rather than synchronize objects. So how to synchronize objects? At the same time, when multiple threads compete for unified resources, how can we manage all threads that need the resources? Therefore, AQS came into being.
Reference: going deep into the Java virtual machine
AbstractQueuedSynchronizer Abstract synchronization queue is called AQS for short. It is the basic component to implement the synchronizer, and the underlying lock in the contract is implemented through AQS. The structure is as follows
attribute
int state
In the shared mode, you need to indicate the number of threads holding the shared lock.
Shared lock and exclusive lock (exclusive lock)
Shared lock: this lock can be held by multiple threads. Shared locks only support reading data. If a thread adds a shared lock to the data, other data can only add a shared lock to the data.
Exclusive lock: only one thread can acquire the lock.
Shared lock and exclusive lock are different implementations of AQS
Node head & Node tail
It is used to maintain a two-way linked list of FIFO. The two Node nodes point to the head Node and the tail Node respectively
Node
The node structure in the queue is shown in the figure above
Method (taking exclusive mode as an example)
tryAcquire(int arg)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
An attempt to acquire a lock is made, and the failure to acquire the lock is returned directly.
This method only throws an exception. AQS inheritance class needs to inherit this method to give the upper open space so that users can write business logic.
acquire(int arg)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
After the tryAcquire method fails, it will enter the waiting queue
addWaiter(Node.EXCLUSIVE), arg)
The main function is to create a new Node and insert the Node into the waiting queue.
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Get the current tail node. Tail is the AQS attribute Node pred = tail; if (pred != null) { node.prev = pred; // An attempt was made to make the current node the tail node through an atomic operation // In fact, after obtaining the pred, other threads may also modify the tail // compareAndSetTail(Node expect, Node update) will read the offset of tail // Judge whether the current pred is the end of the queue (the period may be modified by other threads). If so, update the end of the queue to the current node if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // Call the complete queue method, which will be entered when the above attempt to quickly queue fails // For example, the tail is modified enq(node); return node; }
acquireQueued(final Node node, int arg)
After joining the queue, spin the lock in the queue.
As can be seen from the code, the nodes behind the head node form a waiting queue
When the first node after the head obtains the lock, the node will become a new head node
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // Optional operation for (;;) { final Node p = node.predecessor(); // The front node of the current node is the head node & & the current thread successfully obtains the lock if (p == head && tryAcquire(arg)) { // The current node acts as the head node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // Determine whether the thread needs to be suspended according to the current node state if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
boolean tryRelease(int arg)
The same as tryAcquire, as a method open to the upper layer
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
boolean release(int arg)
Release the lock and notify the queue to change the thread state in the waiting queue
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // Pass in the head and wake up the node in the waiting queue unparkSuccessor(h); return true; } return false; }
unparkSuccessor(Node node)
After the head node has finished operating resources, it notifies the nodes in the waiting queue
In the operation of the following code, why don't you wake up from the beginning node
The search here is not atomic. Search from back to front may be because the queue construction order is not correct
- The back node pre points to the front node
- The front node next will point to the rear node
From front to back, the search may be interrupted because step 2 is not completed
private void unparkSuccessor(Node node) { // Sets the status of the header node int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // Start searching from the tail node, the node that is the most front after the head and has waitstatus < = 0 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // Wake up the found node. After waking up, the acquire method will be executed to obtain the lock if (s != null) LockSupport.unpark(s.thread); }