preface
This article discusses the AQS interface provided in JDK and one of its implementation classes, ReentrantLock. The article is based on the book "the art of Java Concurrent Programming" and the video of dark horse Dark horse multithreading Take notes.
1. AQS
1. Concept
-
Overview: the full name is AbstractQueuedSynchronizer, which is the framework of blocking lock and related synchronizer tools
-
characteristic:
- The state attribute is used to represent the state of resources (exclusive mode and shared mode). Subclasses need to define how to maintain this state and control how to obtain and release locks
ⅰ. getState - get state
ⅱ. setState - set state
ⅲ. compareAndSetState - cas mechanism sets state state
ⅳ. Exclusive mode allows only one thread to access resources, while shared mode allows multiple threads to access resources
- FIFO based waiting queue is provided, which is similar to the EntryList of Monitor
- Condition variables are used to realize the waiting and wake-up mechanism. Multiple condition variables are supported, similar to the WaitSet of Monitor
- The state attribute is used to represent the state of resources (exclusive mode and shared mode). Subclasses need to define how to maintain this state and control how to obtain and release locks
Subclasses mainly implement such methods (UnsupportedOperationException is thrown by default)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
//Get lock pose // If lock acquisition fails if (!tryAcquire(arg)) { // After joining the queue, you can choose to block the current thread aqs and use park unpark to resume running and prohibit running } //Release lock posture // If the lock is released successfully if (tryRelease(arg)) { // Let the blocked thread resume operation }
2. Code
We first implement a non reentrant lock ourselves, and then look at the source code
@Slf4j //Custom non reentrant lock class MyLoack implements Lock{ //Exclusive lock, synchronizer class class MySync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0, 1)){ //The lock is added and the owner is set to the current thread setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { //Set that no thread occupies this lock. exclusiveOwnerThread is the current owner of exclusive mode synchronization setExclusiveOwnerThread(null); setState(0); //Put the writing of volatile variable in the back, and make the previous setExclusiveOwnerThread visible to other threads return true; } @Override //Do you hold an exclusive lock protected boolean isHeldExclusively() { return getState() == 1; } public Condition newCondition(){ return new ConditionObject(); } } private MySync sync = new MySync(); @Override //Lock public void lock() { sync.acquire(1); } @Override //Lock, interruptible lock public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override //Attempt to acquire lock public boolean tryLock() { return sync.tryAcquire(1); } @Override //Attempt to acquire lock parameter: time public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override //Unlock public void unlock() { //release wakes up the waiting thread sync.release(1); } @Override //Set up a new container public Condition newCondition() { return sync.newCondition(); } }
Test:
MyLoack lock = new MyLoack(); new Thread(()->{ lock.lock(); try { log.debug("Lock successfully"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("Unlocked successfully"); lock.unlock(); } }, "t1").start(); new Thread(()->{ lock.lock(); try { log.debug("Locking succeeded"); }finally { log.debug("Unlocked successfully"); lock.unlock(); } }, "t2").start();
Result output:
2. ReentrantLock principle
1. Implementation principle of unfair lock
From the perspective of constructors, ReentrantLock mode implements unfair locks
1. Locking process
1. Successful process
public void lock() { sync.lock(); }
final void lock() { //Try changing state from 0 to 1 if (compareAndSetState(0, 1)) //Successfully set the thread to be the current thread setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
2. Failure process
Based on the above, when another thread attempts to acquire a lock, compareAndSetState(0, 1) fails. At this time, enter the process of acquire(1)
public final void acquire(int arg) { //First, try acquire to acquire the lock again. If the acquisition fails, enter the acquirequeueueueueueueueued process if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
In this method, first tryAcquire attempts to acquire the lock again. If the acquisition fails, enter the acquireQueued process. The process of this method is to create a node and add it to the waiting queue. The process is as follows:
Thread-1 executed
-
CAS tried to change the state from 0 to 1 in the lock method, but failed
-
In the lock method, call the acquire method further and enter the tryAcquire logic. Here, we think that the state is already 1, and the result still fails
-
Next, enter the addWaiter logic of the acquire method to construct the Node queue
- The yellow triangle in the figure indicates the waitStatus status of the Node, where 0 is the default normal status
- Node creation is lazy
- The first Node is called Dummy or sentinel. It is used to occupy a bit and is not associated with a thread
private Node addWaiter(Node mode) { //Create a node and associate the node with the current thread Node node = new Node(Thread.currentThread(), mode); //Get tail node Node pred = tail; //This step is similar to the implementation in enq if (pred != null) { //Make a connection between the tail node and the current node. Note that the following is a two-way linked list node.prev = pred; //Connect node < - > tail if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //If the tail node is empty, it means that the queue has not been initialized. You need to initialize the head node and join the new node enq(node); return node; }
Next, the current thread enters the acquirequeueueueueueueued logic of the acquire method
- Acquirequeueueued will keep trying to obtain locks in an endless loop, and enter park blocking after failure
- If you are next to the head (in the second place), try to acquire the lock again. We set the state here as 1 and fail
- Enter the shouldParkAfterFailedAcquire logic, change the waitStatus of the precursor node, that is, head, to - 1, and return false this time- 1 indicates that this node has the responsibility to wake up its successor nodes
- shouldParkAfterFailedAcquire returns to acquirequeueueueued after execution. Try to acquire the lock again. Of course, the state is still 1 at this time and fails
- When entering shouldParkAfterFailedAcquire again, it returns true this time because the waitStatus of its precursor node is - 1
- Enter parkAndCheckInterrupt, Thread-1 park (gray indicates blocked)
final boolean acquireQueued(final Node node, int arg) { //Set to true boolean failed = true; try { boolean interrupted = false; for (;;) { //Gets the precursor node of the current node final Node p = node.predecessor(); //Judge that if the precursor node is the head node (placeholder node), it will try to obtain the lock again if (p == head && tryAcquire(arg)) { //If the lock is obtained successfully //If it is a header node, set the current node as the header, so the first node in the queue is //The working node is the waiting node at the beginning of the second node. You can also see from the above figure setHead(node); p.next = null; //It is convenient for GC to recycle old header nodes failed = false; //Returns false, indicating that it cannot be interrupted return interrupted; } //The lock cannot be obtained. The following is to determine whether the thread should be suspended or not if (shouldParkAfterFailedAcquire(p, node) && //The following is to suspend the current thread. At this time, you will find that not only will the thread be suspended, but also //The interrupt flag bit is set to true. If wait, sleep and join methods are used //Any one of them, throw the exception directly parkAndCheckInterrupt()) //This method is blocked here. Wait until the previous thread wakes up before running down interrupted = true; } } finally { if (failed) //You can only enter here by throwing an exception cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //Gets the status of the precursor node int ws = pred.waitStatus; //If the status is - 1 if (ws == Node.SIGNAL) //This node has been set to require the release of the signal, so it can be safely suspended return true; if (ws > 0) { //If it is greater than 0, the current node will be put first do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //Equal to 0 (equal to 0 at the beginning), the old set the status of the header node to - 1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //In this method, the park method is called and the interrupted flag is set to true private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
2. Unlocking process
public void unlock() { sync.release(1); }
1. Successfully unlocked
Now suppose that many threads have failed to compete, it will look like this
Thread-0 releases the lock and enters the tryrelease process. If successful
- Setting exclusiveOwnerThread to null
- state = 0
- If the current queue header Node is not null and the waitstatus of the head = - 1, enter the unparksuccess process: unparksuccess will find the nearest Node in the queue to the head (not cancelled), and unpark will resume its operation, which is Thread-1 in this example
- At this point, return to the acquirequeueueueueueueued method. The thread continues to run, enters the for loop again, and then locks
public final boolean release(int arg) { //Let's not consider the details here //Until exclusive ownerthread = null is set and star is set to 0 if (tryRelease(arg)) { //Get the first node Node h = head; //If the head is not null and the state of the head is 0 if (h != null && h.waitStatus != 0) //Wake up the next node, and then continue to run in the acquirequeueueueued method to try to lock unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /** Omit the previous code */ if (s != null) //Here, the unpark method of the thread is called to wake up LockSupport.unpark(s.thread); }
If the locking is successful (no other i thread competes with Thread-1), it will be set (in the acquirequeueueueueueueueued method)
- Thread state = 1, thread state = 1
- The head points to the Node where Thread-1 is just located, and the Node clears the Thread
- The original head can be garbage collected because it is disconnected from the linked list
2. Failed to unlock
If there are other threads competing at this time (unfair embodiment), for example, Thread-4 comes at this time. If Thread-4 happens to take the lead
- Thread-4 is set to exclusiveOwnerThread, state = 1
- Thread-1 enters acquirequeueueueued again. The process of this method obtains it again, and then rewrites it to enter park blocking
2. Reentrant principle
static final class NonfairSync extends Sync { // ... //Get lock reentrant // Sync inherited method, easy to read, put here final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //Get status first int c = getState(); //If it is 0, it means that no one has obtained the lock if (c == 0) { //Then change from 0 to 1 if (compareAndSetState(0, acquires)) { //Then set exclusiveOwnerThread to the current thread setExclusiveOwnerThread(current); return true; } } // If the lock has been obtained and the thread is still the current thread, it indicates that lock reentry has occurred else if (current == getExclusiveOwnerThread()) { // state++ //For example, the second time, c is 1. At this time, c + acquire means state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //Releasable lock // Sync inherited method, easy to read, put here protected final boolean tryRelease(int releases) { // state -- if it is reentrant, subtract one layer. For example, if it is 2 now, it is 2-1 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // Lock reentry is supported. It can be released successfully only when the state is reduced to 0 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); //If the lock is re entered, false will be returned for the first time, and true will not be returned until the number of lock re-entry is reduced to 0, indicating that the release is successful return free; } }
3. Interruptible principle
1. Non interruptible mode
Non interruptible mode: in this mode, even if it is interrupted, it will still stay in the AQS queue and can not run until the lock is obtained (to continue running, interruption means that the interrupt flag is set to true, which has no impact on the operation of the thread)
Let's look at the following code: try to acquire the lock in acquire, and then enter the acquirequeueueueueued method. These are all mentioned above. Let's look at the acquirequeueueueueueueueued method. When the lock is not obtained, it will be park. When the previous thread completes execution, it will call the unpark method to wake up the thread. Then the thread will execute interrupted = true downward, but after executing the statement, it will continue the for loop until the lock is obtained, At this point, you can enter if (P = = head & tryAcquire (ARG)) statement to return interrupted = true to acquire, and to acquire, call selfInterrupt() to continue to taste a break mark, because the parkAndCheckInterrupt in this method is parkAndCheckInterrupt. Interrupted clears the break flag
// Sync inherited from AQS static final class NonfairSync extends Sync { // ... private final boolean parkAndCheckInterrupt() { // If the break flag is already true, the park will be invalidated LockSupport.park(this); // interrupted will clear the break mark so that you can park next time. Otherwise, if there is a break mark, you can't park return Thread.interrupted(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; // You still need to obtain the lock before you can return to the broken state return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { // If it is because interrupt is awakened, the interrupt status returned is true interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } public final void acquire(int arg) { if ( //Here you can return to enter if to prove that the above method returns true !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { // If the interrupt status is true selfInterrupt(); } } static void selfInterrupt() { // Re generate an interrupt. If the thread is running normally, the interrupt method will not report an error if it is not in sleep or other states Thread.currentThread().interrupt(); } }
2. Interruptible mode
We call the acquireinterruptible method instead of the acquire method to acquire the lock. In this method, if the doacquireinterruptible fails to obtain the lock, it will still enter the park. At this time, it will throw an exception directly after being awakened by other threads. Therefore, if you are interrupted while waiting in the queue, you will throw an exception.
static final class NonfairSync extends Sync { public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // If the lock is not obtained, enter one if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // An interruptible lock acquisition process private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { // In the process of park, if it is interrupt ed, it will enter this field // At this time, an exception is thrown instead of entering for (;) again //Therefore, if you are interrupted while waiting in the queue, you will throw an exception directly throw new InterruptedException(); } } } finally { if (failed) cancelAcquire(node); } } }
4. Fair lock
1. Compare the following unfair locks
The code is the re-entry lock. You can see that the non fair lock will not check the AQS queue, but directly compare and setstate.
// Three inherited methods are convenient to read and put here final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // If the lock has not been obtained, this is the first time to enter if (c == 0) { // Try to obtain with cas, which reflects the unfairness: do not check the AQS queue if (compareAndSetState(0, acquires)) { //Set exclusiveOwnerThread to the current thread setExclusiveOwnerThread(current); return true; } } // If the lock has been obtained and the thread is still the current thread, it indicates that a lock re-entry lock has occurred else if (current == getExclusiveOwnerThread()) { // state++ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // Failed to get. Return to the calling function return false; }
2. Fair lock
In the tryAcquire method, it is judged that if there is no precursor node in the AQS queue, the current thread is allowed to go to CAS to obtain the lock
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } // The method inherited from AQS is easy to read and placed here public final void acquire(int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } // The main difference from non fair lock is the implementation of tryAcquire method protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // First check whether there are precursor nodes in the AQS queue. If not, compete if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //The following paragraph is also the principle of reentry, which is described above else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // A method inherited from AQS, which is easy to read and put here public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // h != When t (the head is not equal to the tail), it indicates that there are nodes in the queue. It is proved that there are nodes. If your thread is not in the second (with the highest weight), the eldest is occupied. return h != t && ( //Indicates whether there is a dick in the queue (s = h.next) == null || //Or the current thread is not the second thread s.thread != Thread.currentThread() ); } }
5. Realization principle of conditional variable
Each condition variable actually corresponds to a waiting queue, and its implementation class is ConditionObject
1. await process
Start Thread-0 to hold the lock, call await, enter the addConditionWaiter process of ConditionObject (single linked list), create a new Node with the status of - 2 (Node.CONDITION), associate Thread-0, and add it to the tail of the waiting queue
Next, enter the fully release process of AQS, release the lock on the synchronizer and wake up the successor nodes of the current node
Then thread-0 unpark the next node in AQS queue, competing for lock. Assuming there are no other competing threads, then Thread-1 competes successfully
park blocking Thread-0
public final void await() throws InterruptedException { //If the thread has been interrupted, an exception is thrown if (Thread.interrupted()) throw new InterruptedException(); //Add the node to the ConditionObject queue Node node = addConditionWaiter(); //fullyRelease: release all the locks on this thread. fullyRelease allows for the release of reentry locks int savedState = fullyRelease(node); int interruptMode = 0; //Judge if the above node is 2 and is the first while (!isOnSyncQueue(node)) { //The thread is blocked here LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //Implementation of single linked list private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //Create a node with the status set to 2 Node node = new Node(Thread.currentThread(), Node.CONDITION); //Connect to single linked list if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } //Release all locks final int fullyRelease(Node node) { boolean failed = true; try { //Here is the status obtained and the number of reentries int savedState = getState(); //Release the lock according to the number of reentries. In the process of failed, the successor nodes of this node will be awakened if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
2. signal process
Suppose Thread-1 wants to wake up Thread-0
Enter the doSignal process of ConditionObject and obtain the first Node in the waiting queue, that is, the Node where Thread-0 is located
Execute the transferForSignal process, add the Node to the end of the AQS queue, change the waitStatus of Thread-0 to 0 and the waitStatus of Thread-3 to - 1.
Thread-1 releases the lock and enters the unlock process
public final void signal() { //First, judge whether the thread calling signal is the lock holder. Only the owner thread is qualified to wake up if (!isHeldExclusively()) //No, just throw the exception throw new IllegalMonitorStateException(); //Find the element of team leader Node first = firstWaiter; //Call doSignal if (first != null) doSignal(first); } private void doSignal(Node first) { do { //Find the next element, then null, and set lastWaiter to null, indicating that there is no node //firstWaiter= first.nextWaiter: assign the next node as the first node if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //transferForSignal: transfer the above node to the waiting queue //If the transfer fails, we will see if there are more nodes. If there are, we will recycle //Sometimes when the element in the waiting queue is interrupted, it will be cancelled, and false will be returned } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //Set state = 0. Why 0? Because the last element added to the AQS competition queue is usually 0 //Other elements will be changed to - 1 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //enq method is to add a node to the AQS queue and return the precursor node of the added node, that is //Original tail node Node p = enq(node); //Gets the status of the precursor node int ws = p.waitStatus; //Then try to change the state of the precursor node to - 1, indicating that the precursor node is obliged to wake up the newly added tail node //Then wake up the node we just joined. In the above example //p = Thread-3, joined node - Thread-0 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }