In java development, our applications often use multithreading to improve the running efficiency of the program. Accessing thread shared variables in the case of multithreading may cause concurrency problems. At this time, concurrency lock is needed to solve the concurrency problem. Java provides two types of concurrency control mechanisms: synchonized keyword and AQS framework. They have their own advantages. However, when the locking and unlocking scenario is flexible, we often use AQS framework to solve the concurrency problem. This article will briefly introduce the structure and source code of AQS framework in Java. Most of this article refers to This blog
AQS structure
The full name of AQS is AbstractQueuedSynchronizer (Abstract queue synchronizer). AQS defines a synchronizer framework for multi-threaded access to shared resources. Many synchronization class implementations rely on it, such as ReentrantLock/Semaphore/CountDownLatch.
As shown in the figure below, AQS mainly includes two parts: shared resources and waiting queue. The AQS bottom layer has provided many methods for these two parts.
- Shared resource: shared resource is a volatile variable of type int.
- Wait queue: a wait queue is a thread safe queue. When a thread cannot get a lock, it will be park and put into the queue.
- New thread: under unfair circumstances, the new thread will first try to obtain resources directly, and will enter the queue only if it cannot obtain them.
Core idea
The core method of synchronizer is acquire and release operation, and the idea behind it is also relatively simple and clear.
The acquire operation is as follows:
// acquire operation while (The current synchronizer state does not allow get operations) { If the current thread is not in the queue, it is inserted into the queue Block current thread } If the thread is in the queue, it is removed from the queue
The release operation is as follows:
Update synchronizer status if (The new state allows a blocked thread to succeed) Unblock one or more threads in the queue
From the ideas of these two operations, we can extract three key operations: synchronizer state change, thread blocking and release, insert and move out of the queue. Therefore, in order to realize these two operations, it is necessary to coordinate the three basic components derived from the three key operations:
- Atomicity management of synchronizer state;
- Thread blocking and unblocking;
- Queue management;
Atomicity management of synchronizer state
The AQS class uses a single int (32 bits) to save the synchronization state, and expose getState, setState and compareAndSet operations to read and update the synchronization state. The attribute state is declared volatile, and compareAndSetState is implemented by using CAS instructions, so that it can be set to a new value atomically if and only if the synchronization state has a consistent expected value The atomicity management of synchronization state ensures the atomicity, visibility and order of synchronization state.
Thread blocking and unblocking
Until JSR166, thread blocking and thread unblocking are built-in Java based processes. No other API not based on Java built-in processes can be used to block threads and unblock threads. The only option is thread Suspend and thread Resume, but they all have unsolvable race problems, so they can't be used. At present, this method has been basically abandoned. The specific reason why it cannot be used can be answered by the official.
j. The u.c.locks package provides a LockSupport class to solve this problem. Method LockSupport Park blocks the current thread until there is a LockSupport The unpark method is called. Unpark calls are not counted, so calling unpark method multiple times before a park call will only release a park operation. In addition, they act on every thread, not every synchronizer. A thread calling the park operation on a new synchronizer may return immediately because there can be redundant unpark operations before that. However, in the absence of an unpark operation, the next call to park will block. Although you can explicitly cancel redundant unpark calls, it's not worth it. It is more efficient to call park multiple times when needed. The park method also supports optional relative or absolute timeout settings, as well as the JVM thread Interrupt can be used to unpark a thread through interrupt.
Queue management
The core of the whole framework is how to manage the thread blocking queue, which is a strict FIFO queue, so it does not support the synchronization of thread priority. The best choice of synchronization queue is the non blocking data structure that does not use the underlying lock. There are two main choices in the industry, one is MCS lock and the other is CLH lock. CLH is generally used for spin, but compared with MCS, CLH is easier to cancel and timeout, so CLH is selected as the basis of synchronization queue.
CLH queue is not really like a queue. Its outgoing and incoming are closely related to the actual business usage scenarios. It is a linked list queue that is accessed through two fields of AQS, head node and tail node. These two fields are of volatile type and both point to an empty node during initialization.
Conditional queue
The queue in the previous section is actually the synchronization queue of AQS. The queue in this section is the condition queue. In addition to the synchronization queue, the queue management also includes the condition queue. AQS has only one synchronization queue, but there can be multiple conditional queues. The AQS framework provides a ConditionObject class for classes that maintain exclusive synchronization and classes that implement the Lock interface.
The ConditionObject class implements the condition interface. The condition interface provides methods similar to the Object handler, such as await, signal and signalAll operations. It also extends the methods with timeout, detection and monitoring. The ConditionObject class effectively combines conditions with other synchronous operations. This class only supports Java style management access rules. In these rules, The conditional operation is legal only when the current thread holds the lock and the condition to be operated belongs to the lock. In this way, a ConditionObject associated with a ReentrantLock is similar to the built-in process (through Object.wait, etc.) the only difference between the two is the name of the method, additional functions and the fact that the user can declare multiple conditions for each lock.
ConditionObject class and AQS share internal nodes and have their own separate condition queue. The signal operation is realized by transferring the node from the condition queue to the synchronization queue. It is not necessary to wake up the thread that needs to wake up before it reacquires the lock.
Source code analysis
We mainly look at how AQS is implemented through the acquisition and release of exclusive synchronization state and shared synchronization state.
Acquisition of exclusive synchronization status
The exclusive synchronization state call method is acquire, and the code is as follows:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
The above code mainly completes the synchronization status acquisition, node construction, joining the synchronization queue and spin waiting in the synchronization queue. Its main logic is: first call the tryAcquire method implemented by the subclass, which ensures that the thread can safely obtain the synchronization status. If the synchronization status acquisition fails, An exclusive synchronization node is constructed (at the same time, only one thread can get the synchronization state successfully), and the node is added to the tail of the synchronization queue by addWaiter method. Finally, the acquireQueued method is invoked to enable the node to obtain the synchronization state by spuning. If not, the thread in the blocking node will be acquired, and the wake-up of the blocked thread depends mainly on the queue of the precursor node or the blocking thread. Break to achieve.
Let's first look at how the next node constructs and joins the synchronization queue. The code is as follows:
private Node addWaiter(Node mode) { // The current thread is constructed as a Node node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // Try to quickly add new nodes after the tail node to improve the efficiency of the algorithm. First point the tail node to pred Node pred = tail; if (pred != null) { //The tail node is not empty. The predecessor node of the current thread node points to the tail node node.prev = pred; //The tail node of concurrent processing may not be the previous node, so CAS update is required if (compareAndSetTail(pred, node)) { //CAS update succeeded. The current thread is the tail node. The subsequent nodes of the original tail node are the current node pred.next = node; return node; } } //Enter enq when the first queued node or tail node fails to add subsequent nodes enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //The tail node is empty. Set the initialization of the consistent synchronization queue of the head and tail nodes for the first time if (compareAndSetHead(new Node())) tail = head; } else { //After constructing the first node, all thread nodes are added to the synchronization queue in turn node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
After the node enters the synchronization queue, it enters a spin process. Each thread node is observing introspectively. When the conditions are met and the synchronization state is obtained, it can exit from the spin process. Otherwise, it will remain in the spin process and block the node's threads. The code is as follows:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //Gets the precursor node of the current thread node final Node p = node.predecessor(); //The precursor node is the head node and successfully obtains the synchronization status if (p == head && tryAcquire(arg)) { //Set the current node as the head node setHead(node); p.next = null; // help GC failed = false; return interrupted; } //Is it blocked if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
Let's see how shouldParkAfterFailedAcquire and parkAndCheckInterrupt block the current thread. The code is as follows:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //The state of the predecessor node determines the behavior of subsequent nodes int ws = pred.waitStatus; if (ws == Node.SIGNAL) /*The precursor node is - 1, and subsequent nodes can be blocked * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /*If the predecessor node is in the initial or shared state, it is set to - 1 to block subsequent nodes * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { //Blocking thread LockSupport.park(this); return Thread.interrupted(); }
Example of exclusive synchronous lock acquisition
Release of exclusive synchronization state
When the synchronization state is obtained successfully, the current thread returns from the acquire method. For a concurrent component such as lock, it means that the current thread has obtained the lock. If there is a method to obtain synchronization status, there is a corresponding release method. This method is release. Now let's look at the implementation of this method. The code is as follows:
public final boolean release(int arg) { if (tryRelease(arg)) {//Synchronization status released successfully Node h = head; if (h != null && h.waitStatus != 0) //Release header node directly unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /*Find qualified subsequent nodes * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //Wake up subsequent nodes LockSupport.unpark(s.thread); }
Exclusive release is very simple and explicit.
Summarize the acquisition and release of exclusive synchronization status: when obtaining synchronization status, the synchronizer maintains a synchronization queue, and the threads that fail to obtain status will be added to the queue and spin in the queue; The condition for moving out of the queue is that the precursor node is the head node and successfully obtains the synchronization status. When releasing the synchronization state, the synchronizer calls the tryRelease method to release the synchronization state, and then wakes up the successor nodes of the head node.
Acquisition of shared synchronization status
The method of shared synchronization state call is acquireShared, and the code is as follows:
public final void acquireShared(int arg) { //When the return value of obtaining synchronization status is greater than or equal to 0, it indicates that synchronization status can be obtained //Less than 0 indicates that the synchronization status can not be obtained and needs to enter the queue for waiting if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { //The same queueing operation as exclusive final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; //spin for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //If the precursor node is the head node and successfully obtains the synchronization state, you can exit the spin setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //The node exiting the spin becomes the first node setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
Like exclusive acquisition, shared acquisition also needs to release the synchronization state. The synchronization state can be released by calling the releaseShared method. The code is as follows:
public final boolean releaseShared(int arg) { //Release synchronization status if (tryReleaseShared(arg)) { //Wake up subsequent waiting nodes doReleaseShared(); return true; } return false; } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ //spin for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //Wake up subsequent nodes unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
The unparksuccess method is the same as exclusive.
Application of AQS
AQS is widely used in synchronization tools.
ReentrantLock: the ReentrantLock class uses AQS synchronization status to save the number of times the lock is held repeatedly. When a lock is acquired by a thread, ReentrantLock will also record the thread ID of the current lock, so as to check whether it is acquired repeatedly, and whether there is an illegal state exception when the wrong thread attempts to unlock. ReentrantLock also uses the ConditionObject provided by AQS and exposes other monitoring and monitoring related methods.
ReentrantReadWriteLock: the ReentrantReadWriteLock class uses 16 bits in the AQS synchronization status to save the number of times the write lock is held, and the remaining 16 bits are used to save the number of times the read lock is held. WriteLock is built in the same way as ReentrantLock. ReadLock supports multiple read threads at the same time by using the acquireShared method.
Semaphore: semaphore class (semaphore) uses AQS synchronization status to save the current count of semaphores. The acquireShared method defined in it will reduce the count or block threads when the count is non positive; the tryRelease method will increase the count and remove thread blocking when the count is positive.
CountDownLatch: the CountDownLatch class uses AQS synchronization status to represent the count. When the count is 0, all acquire operations (corresponding to the await method in CountDownLatch) can pass.
FutureTask: the FutureTask class uses AQS synchronization status to represent the running status (initialization, running, cancelled and completed) of an asynchronous computing task. Set (set method of FutureTask) or cancel (cancel method of FutureTask) a FutureTask will call AQS release operation. The unblocking of threads waiting for calculation results is realized through AQS acquire operation.
SynchronousQueues: the SynchronousQueues class uses internal wait nodes that can be used to coordinate producers and consumers. At the same time, it uses AQS synchronization status to control that when a consumer consumes the current item, a producer is allowed to continue production, and vice versa.
In addition to the tools provided by j.u.c, you can also customize the synchronizer that meets your own needs based on AQS.
I am the fox God, welcome to my WeChat official account.
This article is first released to WeChat official account, all rights reserved, no reprint!