Most of the tools in the JUC package are based on AQS, so let's learn the principle of AQS first
1.AQS principle
1.1 overview:
The full name is AbstractQueuedSynchronizer, which is the framework of blocking lock and related synchronizer tools
1.2 features:
-
The state attribute is used to represent the state of resources (exclusive mode and shared mode). Subclasses need to define how to maintain this state and control how to obtain and release locks
- getState - get state
- setState - set state
- compareAndSetState - cas mechanism sets state state
- Exclusive mode allows only one thread to access resources, while shared mode allows multiple threads to access resources
-
FIFO based waiting queue is provided, which is similar to the EntryList of Monitor
-
Condition variables are used to realize the waiting and wake-up mechanism. Multiple condition variables are supported, similar to the WaitSet of Monitor
1.3 implementation mode:
Define class inheritance aqs, and implement the following methods in subclasses: (throw unsupported operation exception by default)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
//Get lock pose // If lock acquisition fails if (!tryAcquire(arg)) { // To join the queue, you can choose to block the current thread park unpark } //Release lock posture // If the lock is released successfully if (tryRelease(arg)) { // Let the blocked thread resume operation }
1.4 using AQS to realize a non reentrant lock
Custom exclusive lock:
// Custom lock (non reentrant lock) class MyLock implements Lock { // AQS has implemented most methods for us. We only need to implement a few important methods // Implement an exclusive lock synchronizer class class MySync extends AbstractQueuedSynchronizer { @Override // Attempt to acquire lock protected boolean tryAcquire(int arg) { // Try to modify the state. If you change from 0 to 1, it means you have acquired the lock // Maybe other threads also want to modify the state, so they need to use cas to ensure atomicity if (compareAndSetState(0, 1)) { // The lock is added and the owner is set to the current thread setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { setExclusiveOwnerThread(null); // Put it later to ensure that the previous code is masked and visible to other threads setState(0); return true; } @Override // Whether to hold exclusive lock protected boolean isHeldExclusively() { return getState() == 1; } // Return condition variable public Condition newCondition() { // Internal classes in AQS return new ConditionObject(); } } private MySync mySync = new MySync(); @Override // Lock (if unsuccessful, it will enter the waiting queue) public void lock() { // Try to lock. If it fails, it will be put into the waiting queue // tryAcquire cannot be used here. tryAcquire will only try once mySync.acquire(1); } @Override // Lock, interruptible public void lockInterruptibly() throws InterruptedException { mySync.acquireInterruptibly(1); } @Override // Try locking once public boolean tryLock() { return mySync.tryAcquire(1); } @Override // Try locking with timeout public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return mySync.tryAcquireNanos(1, unit.toNanos(time)); } @Override // Unlock public void unlock() { mySync.release(1); } @Override // Create condition variable public Condition newCondition() { return mySync.newCondition(); } }
Test:
public static void main(String[] args) { MyLock lock = new MyLock(); new Thread(() -> { lock.lock(); try { log.debug("locking....."); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("unlocking....."); lock.unlock(); } }, "t1").start(); new Thread(() -> { lock.lock(); try { log.debug("locking....."); } finally { log.debug("unlocking....."); lock.unlock(); } }, "t2").start(); }
result:
14:21:09.947 [t1] DEBUG aqs - locking..... 14:21:10.961 [t1] DEBUG aqs - unlocking..... 14:21:10.961 [t2] DEBUG aqs - locking..... 14:21:10.961 [t2] DEBUG aqs - unlocking.....
Test reentry:
public static void main(String[] args) { MyLock lock = new MyLock(); new Thread(() -> { lock.lock(); try { log.debug("locking....."); Thread.sleep(1000); lock.lock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("unlocking....."); lock.unlock(); } }, "t1").start(); }
Result: it is proved that this is a non reentrant lock. After a thread gets the lock, it can't get the lock again
2.ReentrantLock principle
From the class diagram, we can see:
- The Lock interface is implemented
- A sync Abstract synchronizer is maintained internally, and sync inherits the AQS class
- sync synchronizer has two implementations:
- Unfair synchronizer
- Fair synchronizer
2.1 implementation principle of unfair lock:
2.1.1 lock and unlock process:
The lock unlocking process starts from the constructor, and the default is the implementation of unfair lock
public ReentrantLock() { sync = new NonfairSync(); }
NonfairSync inherits from AQS
Check the source code of NonfairSync: (JDK8)
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { // Try changing state from 0 to 1 if (compareAndSetState(0, 1)) // Change the owner thread to the current thread (change the current thread to the owner thread) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
1. When there is no competition:
- Try changing state from 0 to 1
- There is no thread competition. Change the owner thread to the current thread (change the current thread to the owner thread)
2. When the first competition appears, Thread-0 has changed the state to 1. Thread-1:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
- cas failed to change 0 to 1. Enter else method:
- Try acquire again and try to change the state from 0 to 1, but failed
- Enter the addWaiter logic, try to build a Node object and add it to the waiting queue
- The yellow triangle in the figure indicates the waitStatus status of the Node, where 0 is the default normal status
- Node creation is lazy
- For the first time, two Node objects will be created
- The first Node is called Dummy or sentinel. It is used to occupy a bit and is not associated with a thread
- The second Node is the Node associated with Thread-1, which is placed at the end of the linked list
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
- Thread into the current thread:
- Acquirequeueueued will keep trying to obtain locks in an endless loop, and enter park blocking after failure
- If you are next to the head (in the second place), try to acquire the lock again. We set the state here as 1 and fail
- Enter the shouldParkAfterFailedAcquire logic, change the waitStatus of the precursor node, that is, head, to - 1, and return false this time
- shouldParkAfterFailedAcquire returns to acquirequeueueueued after execution. Try to acquire the lock again. Of course, the state is still 1 at this time and fails
- When entering shouldParkAfterFailedAcquire again, it returns true this time because the waitStatus of its precursor node is - 1
- Enter parkAndCheckInterrupt, Thread-1 park (gray indicates blocked)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // Gets the precursor node of the current node node (Thread-1) final Node p = node.predecessor(); // Judge whether this node is the head node, that is, whether node is the second bit // If so, try to acquire the lock again if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // block return Thread.interrupted(); }
3. Once again, multiple threads go through the above process, and the competition fails, which becomes like this
It forms a linked list. The head is the node without associated thread, and the last thread is the tail node. Except for the last one, everyone's waitStatus value is - 1
4.Thread-0 releases the lock and enters the tryRelease process. If successful:
- Set exclusiveOwnerThread to null, state = 0
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // Wake up the successor node unparkSuccessor(h); return true; } return false; }
- In the release method of the unlock method, if the current queue is not null and the waitstatus of the head = - 1, enter the unparksuccess process:
- The nearest Node in the queue to the head (not cancelled) will be found in unparkwinner, and unpark will resume its operation, which is Thread-1 in this example
- Return to the acquirequeueueueued process of Thread-1; If the locking is successful (no competition), it will be set (in the acquirequeueueueueueueueueued method)
- exclusiveOwnerThread is Thread-1, state = 1
- The head points to the Node where Thread-1 is just located, and the Node clears the Thread
- The original head can be garbage collected because it is disconnected from the linked list
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
5. If there are other threads competing:
If there are other threads competing at this time (unfair embodiment), for example, Thread-4 comes at this time
If Thread-4 takes the lead again
- Thread-4 is set to exclusiveOwnerThread, state = 1
- Thread-1 enters the acquirequeueueueueueueueued process again, fails to obtain the lock, and re enters the park block
2.1.2 lock related source code:
// Sync inherited from AQS static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; // Lock implementation final void lock() { // First, use cas to try (only once) to change the state from 0 to 1. If it is successful, it indicates that an exclusive lock has been obtained if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // If the attempt fails, enter one acquire(1); } // A method inherited from AQS, which is easy to read and put here public final void acquire(int arg) { // ㈡ tryAcquire if ( !tryAcquire(arg) && // When tryAcquire returns false, addWaiter four is called first, followed by acquirequeueueueued five acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } // Two into three protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // Three inherited methods are convenient to read and put here final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // If the lock has not been obtained if (c == 0) { // Try to obtain with cas, which reflects the unfairness: do not check the AQS queue if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // If the lock has been obtained and the thread is still the current thread, it indicates that lock reentry has occurred else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // Failed to get. Return to the calling function return false; } // IV. The method inherited from AQS is easy to read and placed here private Node addWaiter(Node mode) { // Associate the current thread to a Node object. The mode is exclusive. The waitstatus of the new Node is 0 by default. Because waitstatus is a member variable, it is initialized to 0 by default Node node = new Node(Thread.currentThread(), mode); // If tail is not null, cas tries to add the Node object to the end of AQS queue Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { // Bidirectional linked list pred.next = node; return node; } } //If tail is null, try to add Node to AQS and enter six steps enq(node); return node; } // Six, AQS inherited method, easy to read, put here private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Not yet. Set the head as the sentinel node (no corresponding thread, status 0) if (compareAndSetHead(new Node())) { tail = head; } } else { // cas tries to add the Node object to the end of AQS queue node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // V. the method inherited from AQS is easy to read and placed here final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // The previous node is head, which means it's your turn (the node corresponding to the current thread). Try to get it if (p == head && tryAcquire(arg)) { // After obtaining success, set yourself (the node corresponding to the current thread) as head setHead(node); // Previous node p.next = null; failed = false; // Return interrupt flag false return interrupted; } if ( // Judge whether to park and enter seven shouldParkAfterFailedAcquire(p, node) && // park and wait. At this time, the status of the Node is set to Node SIGNAL ㈧ parkAndCheckInterrupt() ) { interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } // VII. The method inherited from AQS is easy to read and placed here private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // Gets the status of the previous node int ws = pred.waitStatus; if (ws == Node.SIGNAL) { // If the previous node is blocking, you can also block it yourself return true; } // >0 indicates cancellation status if (ws > 0) { // If the previous node is cancelled, the reconfiguration deletes all the previous cancelled nodes and returns to the outer loop for retry do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // There's no blockage this time // However, if the next retry is unsuccessful, blocking is required. At this time, the status of the previous node needs to be set to node SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 8. Block the current thread private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } }
2.1.3 unlock relevant source code:
// Sync inherited from AQS static final class NonfairSync extends Sync { // Unlock implementation public void unlock() { sync.release(1); } // The method inherited from AQS is easy to read and placed here public final boolean release(int arg) { // Try to release the lock and enter a if (tryRelease(arg)) { // Queue header node unpark Node h = head; if ( // Queue is not null h != null && // waitStatus == Node. unpark is required for signal h.waitStatus != 0 ) { // The thread waiting in unpark AQS enters the second stage unparkSuccessor(h); } return true; } return false; } // An inherited method, which is convenient for reading, is placed here protected final boolean tryRelease(int releases) { // state-- int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // Lock reentry is supported. It can be released successfully only when the state is reduced to 0 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // II. The method inherited from AQS is easy to read and placed here private void unparkSuccessor(Node node) { // If the status is node Signal attempts to reset the status to 0. If the thread obtains the lock, the head node will be discarded later // It's ok if you don't succeed int ws = node.waitStatus; if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } // Find the node that needs unpark, but this node is separated from the AQS queue and is completed by the wake-up node Node s = node.next; // Regardless of the cancelled nodes, find the node in front of the AQS queue that needs unpark from back to front if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } }
2.2 reentrant principle:
When re entering the lock, let the state increase automatically, and when unlocking, let the state decrease automatically
static final class NonfairSync extends Sync { // ... // Sync inherited method, easy to read, put here // The parameter is equal to 1 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // Acquire lock for the first time if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // If the lock has been obtained and the thread is still the current thread, it indicates that lock reentry has occurred else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // Sync inherited method, easy to read, put here // The parameter is 1 protected final boolean tryRelease(int releases) { // state-- 2 -1 int c = getState() - releases; // Lock reentry count if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // Lock reentry is supported. It can be released successfully only when the state is reduced to 0 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } }
2.3 interruptible principle
2.3.1 non interruptible mode: in this mode, even if it is interrupted, it will still stay in the AQS queue until it obtains the lock
// Sync inherited from AQS static final class NonfairSync extends Sync { // ... private final boolean parkAndCheckInterrupt() { // If the break flag is already true, the park will be invalidated LockSupport.park(this); // interrupted clears the break flag return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; // You still need to obtain the lock before you can return to the broken state return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { // If it is because interrupt is awakened, the interrupt status returned is true interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { // If the interrupt status is true selfInterrupt(); } } static void selfInterrupt() { // Re generate an interrupt. If the thread is running normally, the interrupt method will not report an error if it is not in sleep or other states Thread.currentThread().interrupt(); } }
2.3.2 interruptible mode:
static final class NonfairSync extends Sync { public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // If the lock is not obtained, enter one if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // An interruptible lock acquisition process private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { // In the process of park, if it is interrupt ed, it will enter this field // At this time, an exception is thrown instead of entering for (;) again throw new InterruptedException(); } } } finally { if (failed) cancelAcquire(node); } } }
2.4 fair lock principle:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } // The method inherited from AQS is easy to read and placed here public final void acquire(int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } // The main difference from non fair lock is the implementation of tryAcquire method protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // First check whether there are precursor nodes in the AQS queue. If not, compete if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // A method inherited from AQS, which is easy to read and put here public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // h != t indicates that there are nodes in the queue return h != t && ( // (s = h.next) == null indicates whether there is a dick in the queue (s = h.next) == null || // Or the second thread in the queue is not this thread s.thread != Thread.currentThread() ); } }
2.5 realization principle of conditional variable:
Each condition variable actually corresponds to a waiting queue (two-way linked list). Its implementation class is ConditionObject, which stores threads that do not meet the conditions and need to rest, similar to synchronized waitSet
1.await process:
Start Thread-0 to hold the lock, call await, enter the addConditionWaiter process of ConditionObject, create a new Node with the status of - 2 (Node.CONDITION), associate Thread-0 and add it to the tail of the waiting queue
Next, enter the full release process of AQS to release the lock on the synchronizer
The next node in the unpark AQS queue competes for the lock. Assuming that there are no other competing threads, the Thread-1 competition succeeds
2.single process:
Suppose Thread-1 wants to wake up Thread-0
Enter the doSignal process of ConditionObject and obtain the first Node in the waiting queue, that is, the Node where Thread-0 is located
Execute the transferForSignal process, add the Node to the end of the AQS queue, change the waitStatus of Thread-0 to 0 and the waitStatus of Thread-3 to - 1
Thread-1 releases the lock and enters the unlock process
3. Source code:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // First waiting node private transient Node firstWaiter; // Last waiting node private transient Node lastWaiter; public ConditionObject() { } // Add a Node to the waiting queue private Node addConditionWaiter() { Node t = lastWaiter; // All cancelled nodes are deleted from the queue linked list, as shown in Figure 2 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // Create a new Node associated with the current thread and add it to the end of the queue Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } // Wake up - transfer the first node that has not been cancelled to the AQS queue private void doSignal(Node first) { do { // It's already the tail node if ( (firstWaiter = first.nextWaiter) == null) { lastWaiter = null; } first.nextWaiter = null; } while ( // Transfer the nodes in the waiting queue to the AQS queue. If it is unsuccessful and there are still nodes, continue to cycle for three times !transferForSignal(first) && // Queue and node (first = firstWaiter) != null ); } // External class methods, easy to read, put here // 3. If the node status is cancel, return false to indicate that the transfer failed, otherwise the transfer succeeded final boolean transferForSignal(Node node) { // Set the current node status to 0 (because it is at the end of the queue), if the status is no longer node Condition, the description is cancelled if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // Join the end of AQS queue Node p = enq(node); int ws = p.waitStatus; if ( // The last node inserted into the node is cancelled ws > 0 || // The last node inserted into a node cannot be set to node SIGNAL !compareAndSetWaitStatus(p, ws, Node.SIGNAL) ) { // unpark unblocks and allows the thread to resynchronize the state LockSupport.unpark(node.thread); } return true; } // Wake up all - wait for all nodes in the queue to transfer to the AQS queue private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } // ㈡ private void unlinkCancelledWaiters() { // ... } // Wake up - you must hold a lock to wake up, so there is no need to consider locking in doSignal public final void signal() { // If the lock is not held, an exception is thrown if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // Wake up all - you must hold a lock to wake up, so there is no need to consider locking in dosignallall public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } // Non interruptible wait - until awakened public final void awaitUninterruptibly() { // Add a Node to the waiting queue, as shown in Figure 1 Node node = addConditionWaiter(); // Release the lock held by the node, see Figure 4 int savedState = fullyRelease(node); boolean interrupted = false; // If the node has not been transferred to the AQS queue, it will be blocked while (!isOnSyncQueue(node)) { // park blocking LockSupport.park(this); // If it is interrupted, only the interruption status is set if (Thread.interrupted()) interrupted = true; } // After waking up, try to compete for the lock. If it fails, enter the AQS queue if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } // External class methods, easy to read, put here // Fourth, because a thread may re-enter, you need to release all the state, obtain the state, and then subtract it all to release it all final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // Wake up the next node in the waiting queue if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } // Interrupt mode - resets the interrupt state when exiting the wait private static final int REINTERRUPT = 1; // Break mode - throw an exception when exiting the wait private static final int THROW_IE = -1; // Judge interrupt mode private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } // V. application interruption mode private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } // Wait - until awakened or interrupted public final void await() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } // Add a Node to the waiting queue, as shown in Figure 1 Node node = addConditionWaiter(); // Release the lock held by the node int savedState = fullyRelease(node); int interruptMode = 0; // If the node has not been transferred to the AQS queue, it will be blocked while (!isOnSyncQueue(node)) { // park blocking LockSupport.park(this); // If interrupted, exit the waiting queue if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // After exiting the waiting queue, you also need to obtain the lock of the AQS queue if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // All cancelled nodes are deleted from the queue linked list, as shown in Figure 2 if (node.nextWaiter != null) unlinkCancelledWaiters(); // Apply interrupt mode, see v if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // Wait - until awakened or interrupted or timed out public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } // Add a Node to the waiting queue, as shown in Figure 1 Node node = addConditionWaiter(); // Release the lock held by the node int savedState = fullyRelease(node); // Get deadline final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; // If the node has not been transferred to the AQS queue, it will be blocked while (!isOnSyncQueue(node)) { // Timed out, exiting the waiting queue if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } // park blocks for a certain time, and spinForTimeoutThreshold is 1000 ns if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // If interrupted, exit the waiting queue if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } // After exiting the waiting queue, you also need to obtain the lock of the AQS queue if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // All cancelled nodes are deleted from the queue linked list, as shown in Figure 2 if (node.nextWaiter != null) unlinkCancelledWaiters(); // Apply interrupt mode, see v if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } // Wait - until awakened or interrupted or timed out, the logic is similar to awaitNanos public final boolean awaitUntil(Date deadline) throws InterruptedException { // ... } // Wait - until awakened or interrupted or timed out, the logic is similar to awaitNanos public final boolean await(long time, TimeUnit unit) throws InterruptedException { // ... } // Tool method omitted }