I. AQS Concept
1. Queue synchronizer is the basic framework for building locks or other synchronization components. It uses an int variable to represent the synchronization state, and completes the queuing of threads through built-in queues.
2. Following is the introduction of AQS in JDK8 document.
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable Provides a framework for blocking and associated synchronizers (semaphores, events, etc.) that rely on first-in-first-out (FIFO) waiting queues. This class is designed as a useful basis for most types of synchronizers, which depend on the int value of a single atom to represent their states.
State. Subclasses must define the protected method that changes this state, and define the meaning of that state based on whether the object is acquired or released. Given these, other methods in this class perform all queuing and blocking mechanisms. Subclasses can keep other state fields, but only if
Atomic update int uses method manipulation values getState(), setState(int) and compareAndSetState(int, int) to be tracked relative to synchronization. This class supports default exclusive and shared modes. When fetched in exclusive mode, attempts to get through other threads are unsuccessful. Sharing patterns acquired by multiple threads may (but do not need) succeed. Except in the mechanical sense, this class does not understand these differences when sharing
When the pattern is successfully acquired, the next waiting thread (if it exists) must also determine whether it can also be acquired. Threads waiting in different modes share the same FIFO queue. Typically, implementation subclasses support only one of these patterns, but both can be found in
ReadWriteLock plays a role. Subclasses that only support exclusive or shared patterns do not need to define methods that support unused patterns.
To sum up, it is:
(1) Subclasses manage synchronization state by inheriting AQS and implementing its abstract method, and change synchronization state by providing getState(), setState(int state), compareAndSetState(int expect, int update), because CAS operation ensures that the change of synchronization state is atomic.
(2) Subclasses are recommended to be defined as static internal classes of custom synchronization components. Synchronizer itself does not implement any synchronization interfaces, but only defines several methods of state acquisition and release to provide the use of custom synchronization components.
(3) Synchronizer can support both exclusive access to synchronization state and shared access to synchronization state (ReentrantLock, ReentrantReadWriteLock, CountDownLatch and other different types of synchronization components)
3. Synchronizer is the key to the realization of lock. Synchronizer is aggregated in the realization of lock and the semantics of lock is realized by synchronizer.
II. Interface and Example of AQS
1. Design and Implementation Principle of Synchronizer
Inherit the synchronizer and override the specified method, then combine the synchronizer in the implementation of the custom synchronization component, and call the template methods provided by the synchronizer (these template methods call the overridden methods); while override the specified method, you need to use getState(), setState(int state), compareAndSetState(int expect, int update) to access or Update synchronization status. Here are the implementation of the state variable and three methods definition declarations in the source code
1 /** 2 * The synchronization state.(Synchronization state) 3 */ 4 private volatile int state; 5 6 /** 7 * Returns the current value of synchronization state.(Returns the current synchronization state) 8 * This operation has memory semantics of a {@code volatile} read. 9 * The memory semantics of this operation are@code volatile read 10 * @return current state value 11 */ 12 protected final int getState() { 13 return state; 14 } 15 16 /** 17 * Sets the value of synchronization state.(Set a new synchronization state) 18 * This operation has memory semantics of a {@code volatile} write. 19 * The memory semantics of this operation are@code volatile read 20 * @param newState the new state value 21 */ 22 protected final void setState(int newState) { 23 state = newState; 24 } 25 26 /** 27 * Atomically sets synchronization state to the given updated 28 * value if the current state value equals the expected value.(If the state to be updated is the same as the expected state, the state is updated atomically. 29 * This operation has memory semantics of a {@code volatile} read 30 * and write.( The memory semantics of this operation are@code volatile read And write) 31 * 32 * @param expect the expected value 33 * @param update the new value 34 * @return {@code true} if successful. False return indicates that the actual 35 * value was not equal to the expected value.(Returns false if the updated state is different from the expected state. 36 */ 37 protected final boolean compareAndSetState(int expect, int update) { 38 // See below for intrinsics setup to support this 39 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 40 }
2. Here's how AQS provides rewritable methods
1 /** 2 * To achieve this method, we need to query the current state and determine whether the synchronization state meets the expectation, and then set the synchronization state by CAS. 3 * 4 */ 5 protected boolean tryAcquire(int arg) { 6 throw new UnsupportedOperationException(); 7 } 8 9 /** 10 * Exclusive release of synchronization state. Threads waiting to get synchronization state have the opportunity to get synchronization state. 11 * 12 */ 13 protected boolean tryRelease(int arg) { 14 throw new UnsupportedOperationException(); 15 } 16 17 /** 18 * Try to get in shared mode. This method should query if the state of the object allows it to be acquired in shared mode, and if so, it can be acquired. This method is always invoked by the thread obtained by execution. 19 * If this method reports a failure, the fetch method may queue threads (if not already queued) until they are released by other threads. Returns a negative value if the acquisition fails, and if the acquisition fails in shared mode, it fails. 20 * If subsequent shared mode acquisition succeeds, it is zero; and if subsequent shared mode acquisition succeeds and subsequent shared mode acquisition may succeed, it is positive. In this case, subsequent waiting threads must check availability. 21 */ 22 protected int tryAcquireShared(int arg) { 23 throw new UnsupportedOperationException(); //If shared mode is not supported ,The exception will be thrown 24 } 25 26 /** 27 * Attempt to set the state to release the synchronization state in shared mode. This method is always called by the thread executing the release. 28 */ 29 protected int tryReleaseShared(int arg) { 30 throw new UnsupportedOperationException(); //If shared mode is not supported ,The exception will be thrown 31 } 32 33 /** 34 * Whether the current synchronizer is occupied by threads in exclusive mode or not, the general method indicates whether it is occupied by the current thread. 35 */ 36 protected int isHeldExclusively(int arg) { 37 throw new UnsupportedOperationException(); //If shared mode is not supported ,The exception will be thrown 38 }
3. Template method provided by synchronizer
When implementing a custom synchronization component, you need to override the method above, while the template method below calls the method overridden above. The following describes the template method provided by the synchronizer
1 /** 2 * Acquisition in exclusive mode, ignoring interruption. By calling tryAcquire(int) at least once, it returns successfully. Else line 3 * Cheng queuing, may repeat blocking and unblock until successful call tryAcquire(int) 4 */ 5 public final void acquire(int arg) {...} 6 7 /** 8 * Acquisition by exclusive means, if interrupted, suspended. By first checking the interrupt status, and then calling at least once 9 * tryAcquire(int) ,Successful return. Otherwise, thread queuing may repeat blocking and unblock, call 10 * tryAcquire(int)Until success or thread interruption. 11 */ 12 public final void acquireInterruptibly(int arg) throws InterruptedException {...} 13 14 /** 15 * Attempt to obtain in exclusive mode, abort if interrupted, and fail if a given timeout time. First check the interrupt status, then 16 * Then call tryAcquire(int) at least once and return successfully. Otherwise, threads are queued and may repeat blocking and unblock 17 * Plug, call tryAcquire(int) until successful or thread interrupt or timeout 18 */ 19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...} 20 21 /** 22 * Get in shared mode, ignore interrupts. Successfully returned by executing tryAcquireShared(int) at least once with the first call 23 * Back. Otherwise, threads queuing may repeat blocking and unblocking until tryAcquireShared(int) is successfully invoked. 24 */ 25 public final void acquireShared(int arg){...} 26 27 /** 28 * Access by sharing, if interrupted, abort. First check the interrupt status, then call at least once 29 * tryAcquireShared(int) ,Successful return. Otherwise, thread queuing may repeat blocking and unblock, call 30 * tryAcquireShared(int)Until success or thread interruption. 31 */ 32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...} 33 34 /** 35 * Attempts to obtain in shared mode abort if interrupted and fail if the given time exceeds. By first checking for interrupts 36 * The status is then invoked at least once tryAcquireShared(int) and returned successfully. Otherwise, threads queuing may be heavy 37 * Complex blocking and unblocking, call tryAcquireShared(int) until successful or thread interruption or timeout. 38 */ 39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...} 40 41 /** 42 * Exclusive release synchronization state, which wakes up the thread contained by the first node in the synchronization queue after release of synchronization state 43 */ 44 public final boolean release(int arg){...} 45 46 /** 47 * Shared release synchronization state 48 */ 49 public final boolean releaseShared(int arg){...} 50 51 /** 52 * Gets the collection of threads on the waiting queue 53 */ 54 public final Collection<Thread> getQueuedThreads(){...}
3. Implementation and Analysis of Queue Synchronizer
1. Synchronized queue
The Implementation Principle of a)t Synchronized Queue
AQS maintains a synchronization queue to complete synchronization state management. When the current thread fails to obtain synchronization state, AQS constructs the current thread and the waiting state information into a node Node and joins it in the synchronization queue. At the same time, it blocks the current thread. When the synchronization state is released by the holding thread, the first node in the synchronization queue will be waked up to make it again. Attempt to get synchronization status. Nodes in the synchronization queue are used to store thread references, waiting states, precursor nodes and successor nodes of threads that fail to obtain synchronization status. Here's Node's attribute analysis
1 static final class Node { 2 /** Constructing Nodes in Shared Mode */ 3 static final Node SHARED = new Node(); 4 /** Constructing Nodes in Exclusive Mode */ 5 static final Node EXCLUSIVE = null; 6 7 /** The waitStatus value used to indicate that a thread has canceled (since the thread waiting in the synchronization queue has timed out or interrupted, it is necessary to cancel the wait from the synchronization queue, and the entry of the node into this state will not change)*/ 8 static final int CANCELLED = 1; 9 /** waitstatus Values indicate that subsequent threads need to cancel the wait (threads at subsequent nodes are in a wait state, while threads at current nodes will notify threads at subsequent nodes to run if synchronization or CANCELL is released) */ 10 static final int SIGNAL = -1; 11 /**waitStatus The value indicates that the thread is waiting for a Condition (the original node is in the waiting queue, the node thread is waiting on the Condition, and when other threads call the signal() method on the ondition), the node will from
Waiting for the queue to be transferred to the synchronization queue for the acquisition of synchronization status */ 12 static final int CONDITION = -2; 13 /** 14 * waitStatus Values indicate that the acquisition of the next shared synchronization state should be propagated unconditionally 15 */ 16 static final int PROPAGATE = -3; 17 18 /** 19 * Different wait-state int values 20 */ 21 volatile int waitStatus; 22 23 /** 24 * The precursor node, when the node joins the synchronous queue, will be set to the precursor node information. 25 */ 26 volatile Node prev; 27 28 /** 29 * Successor node 30 */ 31 volatile Node next; 32 33 /** 34 * Threads currently in sync state 35 */ 36 volatile Thread thread; 37 38 /** 39 * If the current node is shared, then this field is a SHARED constant; that is, the node type (exclusive and shared) and the successor node in the waiting queue share a field. 40 */ 41 Node nextWaiter; 42 43 /** 44 * If you wait in shared mode, return true (because the Node next Waiter field above is a SHARED constant in shared mode) 45 */ 46 final boolean isShared() { 47 return nextWaiter == SHARED; 48 } 49 50 final Node predecessor() throws NullPointerException { 51 Node p = prev; 52 if (p == null) 53 throw new NullPointerException(); 54 else 55 return p; 56 } 57 58 Node() { // Used to establish an initial header node or SHARED sign 59 } 60 61 Node(Thread thread, Node mode) { // Used to add to the waiting queue 62 this.nextWaiter = mode; 63 this.thread = thread; 64 } 65 66 Node(Thread thread, int waitStatus) { // Used by Condition 67 this.waitStatus = waitStatus; 68 this.thread = thread; 69 } 70 }
b) Synchronized Queue Diagram and Simple Analysis
Synchronization Queue Diagram: When a thread acquires synchronization status, other threads can not acquire the synchronization status, it will be constructed as Node and added to the synchronization queue. This addition process ensures thread security based on CAS.
(2) Synchronized queues follow FIFO. The first node is the node that acquires the synchronization state. When the thread of the first node releases the synchronization state, it will wake up the subsequent node (and then the subsequent node will become the new first node waiting for the synchronization state).
2. Acquisition and Release of Exclusive Synchronization State
(1) As mentioned earlier, the synchronizer acquire() method will acquire synchronization status, which will not respond to interruption. That is to say, when the thread fails to acquire the synchronization status, it will be constructed as a node to join the synchronization queue, and when the thread is interrupted, it will not be removed from the synchronization queue.
1 /** 2 * ①First, call the tryAcquire method to try to get the synchronization state. If the synchronization state fails, do the following 3 * ②Get Failed: Construct synchronization node in exclusive mode and add node to the end of synchronization queue using adopt addWaiter method 4 * ③The synchronization state is obtained by spinning the acquireQueue method. 5 * ④If the synchronization state is not obtained, the threads in the node are blocked, and the thread wake-up in the node is mainly realized by the queuing or interruption of the precursor node. 6 */ 7 public final void acquire(int arg) { 8 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 9 selfInterrupt(); 10 }
(2) The following is the implementation of addWaiter, enq and spin acquisition of synchronous state acquiQueue (the main function of this method is to construct threads that fail to acquire synchronous state into nodes and then add them to the end of the synchronous queue)
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 //Try to put it directly at the end of the team. 4 Node pred = tail; //Direct access synchronizer tail node 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 //The end of the queue is not empty. The constructed node is set as the end of the queue by atomic operation. 9 pred.next = node; 10 return node; 11 } 12 } 13 //Spinning ensures that constructed nodes are added to synchronous queues 14 enq(node); 15 return node; 16 } 17 private Node enq(final Node node) { 18 for (;;) { //Dead loops know that the addition is successful 19 Node t = tail; 20 if (t == null) { // Must initialize 21 if (compareAndSetHead(new Node())) 22 tail = head; 23 } else { 24 node.prev = t; 25 //adopt CAS The way to add a node to a synchronous queue before returning, otherwise you will keep trying to add it.(This actually adds to the synchronous queue in the case of concurrency Node Serialization) 26 if (compareAndSetTail(t, node)) { 27 t.next = node; 28 return t; 29 } 30 } 31 } 32 } 33 /** 34 * By tryAcquire() and addWaiter(), it indicates that the thread has failed to obtain synchronization status and is put into synchronization. 35 * The queue is at the end. Threads block until other threads (precursor nodes get synchronized loading or interrupted) release synchronization 36 * Wake up after the state, you can get it. 37 */ 38 final boolean acquireQueued(final Node node, int arg) { 39 boolean failed = true; 40 try { 41 boolean interrupted = false; 42 //Threads try to get synchronization status in a dead-loop fashion 43 for (;;) { 44 final Node p = node.predecessor(); //Getting the precursor node 45 //Synchronization can only be attempted when the front-end reception is the header node 46 if (p == head && tryAcquire(arg)) { 47 setHead(node); //Once you get the synchronization state, set yourself to the header node 48 p.next = null; //The precursor node has been synchronized to execute its own program, so it needs to release the resources occupied by the synchronization queue. JVM recovery 49 failed = false; 50 return interrupted; 51 } 52 //If acquiring synchronization status fails, you should spin to wait to continue acquiring and verifying your interrupt flag bit information 53 if (shouldParkAfterFailedAcquire(p, node) && 54 parkAndCheckInterrupt()) 55 interrupted = true; //If it is interrupted, change its interrupt flag status information 56 } 57 } finally { 58 if (failed) 59 cancelAcquire(node); 60 } 61 }
(3) The whole process of acquiring synchronization state exclusively
(4) Release of Exclusive Synchronizer: When release method is executed, it will wake up the successor node thread of the head node.
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;//head node //Subsequent Node Threads of Wake-up Header Nodes if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3. Acquisition and Release of Shared Synchronization State
(1) The main difference between shared access and exclusive access is whether multiple threads can get synchronization at the same time. Simply describe the difference between the two as shown in the figure (multiple threads can be allowed to access resources when shared access exists, but when exclusive access exists, others, whether shared or exclusive, will be blocked at the same time)
(2) Shared access to synchronization
1 /** 2 * This method is the top-level entry for the thread to obtain the shared synchronization state in the shared mode. It tries to get the synchronization state, and returns directly if it succeeds. 3 * If the acquisition fails, it enters the waiting queue and tries to acquire (execute the contents in the body of the doAcquireShared method) until the resource is acquired (provided that the return value of the tryAcquireShared method is greater than or equal to 0), ignoring the interruption throughout the process. 4 */ 5 public final void acquireShared(int arg) { 6 if (tryAcquireShared(arg) < 0) 7 doAcquireShared(arg); 8 } 9 /** 10 * "Spin "Attempts to Get Synchronization State 11 */ 12 private void doAcquireShared(int arg) { 13 //First, the thread includes thread reference, wait state, precursor node and successor node information wrapper. Node Then add it to the waiting queue(A Shared Mode Add) 14 final Node node = addWaiter(Node.SHARED); 15 boolean failed = true; 16 try { 17 boolean interrupted = false; //Interrupt flag for current thread 18 for (;;) { 19 final Node p = node.predecessor(); //Getting the precursor node 20 if (p == head) { 21 //When the current drive node is the header node, it tries to get synchronization status in a shared way. 22 int r = tryAcquireShared(arg); 23 //judge tryAcquireShared Return value 24 if (r >= 0) { 25 //If the return value is greater than or equal to 0, indicating that the synchronization status is successful, the current header node is modified and the information is propagated in a subsequent queue of nodes. 26 setHeadAndPropagate(node, r); 27 p.next = null; // Release resources of precursor nodes that have been acquired to synchronization state 28 if (interrupted) 29 selfInterrupt(); //Check interruption sign 30 failed = false; 31 return; 32 } 33 } 34 if (shouldParkAfterFailedAcquire(p, node) && 35 parkAndCheckInterrupt()) 36 interrupted = true; 37 } 38 } finally { 39 if (failed) 40 cancelAcquire(node); 41 } 42 }
According to the source code, we can understand the whole process of sharing the synchronization state.
The synchronizer first invokes the tryAcquireShared method to try to get the synchronization state, and then judges whether to get the synchronization state according to this return value (when the return value is greater than or equal to 0, it can be regarded as getting the synchronization state); if the first acquisition fails, it enters the'spin'state (executing the doAcquireShared method) and tries to get the synchronization state all the time; If the current precursor node is checked to be the head node, the synchronization state will be attempted, and once it is successful (the return value of the tryAcquireShared method is greater than or equal to 0), it can exit from the spin state.
In addition, there is another point that a thread in the waiting queue mentioned above needs to satisfy the condition that the precursor node is the head node, so it is the second node in the whole queue. When the header node releases all critical resources, we consider the different number of resources required for each thread to run, as shown in the following figure
(3) Release of Shared Synchronization State
For shared synchronization components (i.e., multiple threads accessing at the same time), the main difference between them and exclusive ones is that the tryReleaseShared method must ensure that the release of synchronization state is thread-safe (CAS mode releases synchronization state because since multiple threads can access it, the release time will also be multiple threads, so the release time line needs to be guaranteed. Cheng Anquan)
1 /** 2 * This method is the top-level entry for threads to release shared resources in shared mode. It releases a specified amount of resources, and if it successfully releases and allows waking up waiting threads, it wakes up other threads in the waiting queue to get resources. 3 */ 4 public final boolean releaseShared(int arg) { 5 if (tryReleaseShared(arg)) { 6 doReleaseShared(); // 7 return true; 8 } 9 return false; 10 }
4. Implementation of Custom Synchronization Component
1. Implementation of Shared Lock
(1) Customize a synchronization component that allows two threads to access (shared synchronization component), and more than two threads will be blocked.
(2) Since it is a shared synchronization component, according to the above mentioned, the component itself needs to use the shared template method acquireShared provided by AQS, etc. The internal classes of the component need to implement AQS, and rewrite the methods of sharing the synchronization state (tryAcquireShared(), tryReleaseShared(), etc.).
(3) Since two threads can access at the same time, the range of state number is 0, 1 and 2. When a thread acquires synchronous state, the state value decreases by 1, otherwise increases by 1. When the state value is 0, other threads that want to acquire synchronous state will be blocked. For synchronization state changes, CAS is needed to ensure atomicity.
Shared lock1 package cn.source.concurrent; 2 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 5 import java.util.concurrent.locks.Condition; 6 import java.util.concurrent.locks.Lock; 7 8 public class TestAQS implements Lock{ 9 10 private Sync sync = new Sync(2); 11 12 private static class Sync extends AbstractQueuedSynchronizer { 13 14 Sync(int num) { 15 if(num <= 0) { 16 throw new RuntimeException("num Need greater than 0"); 17 } 18 setState(num); 19 } 20 21 @Override 22 protected int tryAcquireShared(int arg) { 23 for(; ;) { 24 int currentState = getState(); 25 int newState = currentState - arg; 26 if(newState < 0 || compareAndSetState(currentState, newState)) { 27 return newState; 28 } 29 } 30 } 31 32 @Override 33 protected boolean tryReleaseShared(int arg) { 34 for(; ;) { 35 int currentState = getState(); 36 int newState = currentState + arg; 37 if(compareAndSetState(currentState, newState)) { 38 return true; 39 } 40 } 41 } 42 43 44 } 45 @Override 46 public void lock() { 47 sync.acquireShared(1); 48 } 49 50 @Override 51 public void unlock() { 52 sync.releaseShared(1); 53 } 54 55 //...... 56 }