When analyzing Java concurrent package java.util.concurrent source code, we need to understand AbstractQueued Synchronizer (abbreviated as AQS below), because it is the basic tool class of Java concurrent package and the basis of implementing ReentrantLock, CountDownLatch, Semaphore, FutureTask and other classes.
Google can find a lot of introductions about AQS, but many of them are not clear, because most articles do not clarify some of the key details.
Starting from the fair lock source code of ReentrantLock, this article will analyze how the AbstractQueued Synchronizer class works, hoping to provide you with some simple help.
Affirms the following points:
- This article is a bit long, but it's still quite simple. It's mainly for beginners of concurrent programming or developers who want to read Java concurrent package source code. For beginners, it may take hours to fully understand, but it's certainly worth the time.
- Source environment JDK 1.7 (1.8 has not changed), see the part that does not understand or doubt, it is better to open the source code to see. Doug Lea's code is really good.
- This article does not analyze sharing mode, which can reduce a lot of burden to readers. Third article The sharing mode is analyzed. And it doesn't analyze the condition part, so it should be easy to understand.
- This article uses the concept of ReentrantLock, which we usually use most, in essence, is incorrect. Readers should be clear that AQS is not only used to achieve re-entrant locks, but also hope that readers can use locks to associate AQS usage scenarios and reduce reading pressure.
- There is only one difference between fair locks and unfair locks in ReentrantLock. Second article Introduced.
- Readers in the comment area have feedback that this article directly uses code to say that it is unfriendly and should have multiple point flow charts. This article does have this problem. But as a past person, I want to tell you that for AQS, form is really not important, it is important to clarify the details.
AQS structure
Let's first look at the attributes of AQS and find out what the basic routines of AQS are. After all, you can guess!
// Head node, which you might understand best as the thread holding the lock at the moment private transient volatile Node head; // Blocked tail nodes, each new node coming in, are inserted to the end, thus forming a linked list. private transient volatile Node tail; // This is the most important, representing the state of the current lock, 0 represents not occupied, greater than 0 represents a thread holding the current lock. // This value can be greater than 1 because locks can be reentry, adding 1 for each reentry. private volatile int state; // For threads currently holding exclusive locks, the most important use case is that locks can be reentered. // reentrantLock.lock() can be called nested multiple times, so each time it is used to determine whether the current thread already has a lock. // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //Inherited from AbstractOwnable Synchronizer
Well, it should look very simple, after all, there are four attributes.
The waiting queue of AbstractQueued Synchronizer is shown below. Note that the queue in the subsequent analysis process does not contain head, head or head.
Wait for each thread in the queue to be wrapped into a Node instance. The data structure is a linked list. Let's look at the source code together.
static final class Node { // Identification node is currently in shared mode static final Node SHARED = new Node(); // Identification node is currently in exclusive mode static final Node EXCLUSIVE = null; // ======== The following int constants are for waitStatus=========== /** waitStatus value to indicate thread has cancelled */ // Code This thread cancels the contention for this lock static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // The official description is that it indicates that the thread corresponding to the successor node of the current node needs to be waked up. static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // This article does not analyze the condition, so skip it. This will be covered in the next article. static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // Same no analysis, skip it. static final int PROPAGATE = -3; // ===================================================== // Take values of 1, - 1, - 2, - 3, or 0 above (as will be mentioned later) // Understanding that, for the time being, you just need to know that if the value is greater than 0, the thread cancels the wait. // ps: You can't grab the lock in half a day. If you don't grab it, ReentrantLock can specify timeouot... volatile int waitStatus; // Reference of precursor nodes volatile Node prev; // Reference of succeeding nodes volatile Node next; // This is thread-based respect. volatile Thread thread; }
Node's data structure is actually quite simple, that is thread + waitStatus + pre + next four attributes, we should first have this concept in mind.
Above is the basic knowledge, which will be used many times later. Keep them in mind all the time. Just think about the structure chart in mind. Next, let's start with a fair lock for ReentrantLock. Again, the blocking queue I'm talking about does not contain the head node.
First, let's look at how ReentrantLock works.
// Let me use the service concept in web development. public class OrderService { // Use static so that each thread gets the same lock. Of course, service in spring mvc defaults to singleton. Don't tangle with this. private static ReentrantLock reentrantLock = new ReentrantLock(true); public void createOrder() { // For example, we allow only one thread to create an order at the same time. reentrantLock.lock(); // Typically, a lock is followed by a try statement try { // This code can only have one thread in at a time (the thread that gets the lock). // Other threads block on the lock() method, waiting for the lock to be acquired, and then coming in. // Execute code... // Execute code... // Execute code... } finally { // Release lock reentrantLock.unlock(); } } }
ReentrantLock uses the internal class Sync to manage locks internally, so the real acquisition and release locks are controlled by the Sync implementation class.
abstract static class Sync extends AbstractQueuedSynchronizer { }
Sync has two implementations: NonfairSync (unfair lock) and FairSync (fair lock). Let's look at the FairSync section.
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
Thread lock
Many people must have started to dislike the excessive nonsense above. Follow the code below and I won't talk nonsense.
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // Contention lock final void lock() { acquire(1); } // From the parent AQS, I paste it here directly. I will do the same when I analyze it below. It will not bring pressure to readers. // We see that this method is over if tryAcquire(arg) returns true. // Otherwise, the acquireQueued method will press threads into the queue public final void acquire(int arg) { // At this point Arg = 1 // Call tryAcquire(1) first. As you can see from the name, this is just a try. // Because it's possible to succeed directly, there's no need to queue up. // The semantics of a fair lock is that no one has a lock and there is no need to wait in a queue (hanging and awakening). if (!tryAcquire(arg) && // tryAcquire(arg) failed, at which point the current thread needs to be suspended and placed in the blocking queue. acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ // Trying to get the lock directly, the return value is boolean, representing whether the lock was acquired or not. // Return true: 1. No thread is waiting for the lock; 2. Re-enter the lock, the thread already holds the lock, so it can naturally be obtained directly. protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // State = 0 At this point, no thread holds locks if (c == 0) { // Although locks are available at this moment, they are fair locks. Since they are fair, they have to pay attention to coming first and arriving first. // See if anyone else has been waiting in line for half a day. if (!hasQueuedPredecessors() && // If there are no threads waiting, try CAS and get the lock. // If it doesn't succeed, it's just one problem. Just about at the same time, a thread has taken the lead.== // Because nobody's there just now. I've judged it. compareAndSetState(0, acquires)) { // Here's where I got the lock. Mark it. Tell you, now I'm occupying the lock. setExclusiveOwnerThread(current); return true; } } // Will enter this else branch, indicating reentry, need operation: state=state+1 // There is no concurrency problem else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // If you get here, it means that neither if nor else has returned true, indicating that the lock has not been acquired. // Go back to the above outer invocation method and continue to see: // if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); return false; } // Assuming tryAcquire(arg) returns false, the code will execute: // acquireQueued(addWaiter(Node.EXCLUSIVE), arg), // This method needs to be implemented first: addWaiter(Node.EXCLUSIVE) /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ // The purpose of this method is to wrap threads into node s and enter them into queues at the same time. // The parameter model is Node.EXCLUSIVE at this time, representing the exclusive mode. private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // The following lines of code want to add the current node to the end of the list, that is, to the end of the blocking queue Node pred = tail; // Tail! = null = > The queue is not empty (when tail==head, the queue is empty, but whatever it is) if (pred != null) { // Set the current end-of-line node as its precursor node.prev = pred; // Set yourself to the end of the queue with CAS, and if successful, tail == node becomes the new tail of the blocking queue if (compareAndSetTail(pred, node)) { // Enter here to show that the setup is successful. The current node==tail connects itself to the end of the previous team. // There is already node.prev = pred above, and with the following sentence, the two-way connection with the previous tail node is achieved. pred.next = node; // Threads are queued and can be returned return node; } } // Take a closer look at the above code, if you come here, // Explain pred== null (the queue is empty) or CAS failure (there are threads competing to join the queue) // Readers must keep up with ideas. If they don't, they should stop reading and look back carefully. Otherwise, they will waste time. enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ // Entering the team by spinning // As I said before, there are only two possibilities for this method: waiting for the queue to be empty, or having threads competing to join the queue. // The meaning of spin here is: in the process of setting tail in CAS, if one competition is not enough, I will compete many times, and always arrange for it. private Node enq(final Node node) { for (;;) { Node t = tail; // As I said before, queues that are empty will also come in here. if (t == null) { // Must initialize // Initialize the head node // Careful readers will know that the original head and tail initialization is null // Or a CAS step, you know, now maybe many threads come in at the same time? if (compareAndSetHead(new Node())) // For later use: at this point, the waitStatus==0 of the head node, see the construction method of new Node(). // There's a head at this point, but tail is still null. Set it up. // Point tail to head, rest assured, there will be a thread coming soon, and tail will be robbed at that time. // Note: tail=head is set here, but there is no return, no return, no return. // So, after setting it up, go on with the for loop, and next time go to the following else branch tail = head; } else { // The following lines are the same as the previous method addWaiter. // It's just this set in an infinite loop. Anyway, it's to queue the current threads to the end of the queue. If there's competition among threads, there's no duplicate queue. node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // Now, back to this code // if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); // The following method, the parameter node, passes through addWaiter(Node.EXCLUSIVE), and at this point enters the blocking queue. // Note: If acquireQueued (addWaiter (Node. EXCLUSIVE, arg) returns true, // This means that the above code will go into selfInterrupt(), so normally, the following should return false // This method is very important. It should be said that the real thread hangs up and then wakes up to get the lock. It's all in this method. final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // P = head indicates that the current node is the first in the blocking queue, although it has entered the blocking queue, because its precursor is head. // Note that the blocking queue does not contain the head node. Head generally refers to the thread that holds the lock. The queue behind the head is called the blocking queue. // So the current node can try to grab the lock. // Let's talk about why we can try: // First, it is the head of the team, which is the first condition. Secondly, the current head may be the node just initialized. // The enq(node) method mentions that the head is delayed initialization and that no threads are set at the time of new Node(). // That is to say, the current head does not belong to any thread, so as the head of the team, you can try it. // tryAcquire has been analyzed, and forgot to look ahead, that is, simply try to operate state with CAS. if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // Here, the if branch above is unsuccessful, or the current node is not the head of the team. // Or try Acquire (arg) doesn't win anybody else and keep looking down. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // When will failed be true??? // The case of tryAcquire() method throwing an exception if (failed) cancelAcquire(node); } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ // As I just said, there is no lock seized at this point. This method says, "The current thread has not seized the lock. Do you need to suspend the current thread? " // The first parameter is the precursor node, and the second parameter is the node representing the current thread. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // The waitStatus = - 1 of the precursor node indicates that the precursor node is in normal state and the current thread needs to be suspended, so it can return true directly. if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // The precursor node waitStatus is greater than 0, which means that the precursor node cancels the queue. // Here's what you need to know: the thread entering the blocked queue will be suspended, and the wake-up operation is performed by the precursor node. // So the following code says that the prev of the current node points to the node waitStatus<=0. // Simply put, it's to find a good father, because you have to rely on it to wake up, if the front-end node cancels the queue, // Looking for the precursor node of the precursor node to be a father, traversing forward can always find a good father. if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // Think carefully about what it means to enter this branch. // The waitStatus of the precursor node is not equal to - 1 and 1, that is, it can only be 0, - 2, - 3. // In the source code ahead of us, we didn't see any waitStatus settings, so when each new node entered the team, waitStatu was 0. // Normally, the precursor node is the previous tail, so its waitStatus should be 0 // Set the waitStatus of the precursor node to Node.SIGNAL (i.e. -1) with CAS compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // This method returns false, and then goes through the for sequence again. // Then come in again and return true from the first branch. return false; } // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) // This method ends with a brief analysis based on the return value. // If true is returned, indicating that the waitStatus==-1 of the precursor node is normal, then the current thread needs to be suspended and awakened later. // We also said that in the future, you will be waked up by the precursor node, waiting for the precursor node to get the lock, and then call you when the lock is released. // If false is returned, it means that there is no need to be suspended at present. Why? look-behind // This is the way to jump back to the front. // if (shouldParkAfterFailedAcquire(p, node) && // parkAndCheckInterrupt()) // interrupted = true; // 1. If shouldPark AfterFailed Acquire (p, node) returns true, // Then parkAndCheckInterrupt(): // This method is very simple, because the previous return true, so you need to suspend the thread, this method is responsible for suspending the thread. // LockSupport.park(this) is used here to suspend threads, and then stop here, waiting to be waked up.======= private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // 2. Next, if shouldPark AfterFailed Acquire (p, node) returns false // Looking closely at shouldParkAfterFailed Acquire (p, node), we can see that in fact, when you first enter, you will not return true. The reason is simple, the waitStatus=-1 of the precursor node depends on the settings of the successor node. That is to say, I haven't set - 1 for the predecessor yet. How can it be true? But you should see, this method is set in a loop, so the second time I come in, the state is - 1. // Explain why shouldPark AfterFailed Acquire (p, node) does not hang threads directly when it returns to false: // => In response to this approach, node is already a direct successor to head. Let the rest of the readers think for themselves. }
At this point, you will understand that, look at the method of final Boolean acquire Queued (final Node node, int arg) several times. I deduce how to go about each branch, what will happen in which case, and where to go.
Unlocking operation
Finally, it is necessary to introduce the wake-up action. We know that under normal circumstances, if the thread does not get the lock, the thread will be LockSupport.park(this); hang to stop and wait to be waked up.
// The wake-up code is still relatively simple. If you understand all the locks on it, you don't need to look down to see what's going on. public void unlock() { sync.release(1); } public final boolean release(int arg) { // Look back. if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // Back to ReentrantLock to see the try Release method protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // Whether to release the lock completely or not boolean free = false; // In fact, it is a reentrant problem. If c==0, that is to say, there is no nested lock, it can be released, otherwise it can not be released. if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node's successor, if one exists. * * @param node the node */ // Wake up successor node // As you can see from the above call, the parameter node is the head header node private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // If the head node is currently waitStatus < 0, change it to 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // The following code wakes up the successor node, but it is possible that the successor node cancels the wait (waitStatus==1) // Look forward from the end of the queue and find the first of all nodes with waitStatus <= 0 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // Look back and forth. Look carefully at the code without worrying about waitStatus==1 in the middle. for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // Wake-up thread LockSupport.unpark(s.thread); }
After waking up the thread, the wakened thread will continue to move forward from the following code:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // Just now the thread has been suspended here. return Thread.interrupted(); } // Back to this method: acquireQueued(final Node node, int arg), at which point the precursor of node is head.
Okay, I won't analyze the source code later. I have some questions to look at the code carefully.
summary
Let's sum up.
In concurrent environments, locking and unlocking require coordination of three components:
- Lock status. We need to know whether the lock is occupied by other threads. This is the role of state. When it is zero, it means that no thread occupies the lock. We can contend for the lock. If CAS is successful, it means that the lock is seized, so that other threads can not grab it. If the lock re-enters, state enters. Line + 1 is OK, unlock is minus 1, until the state changes to 0 again, representing the release of the lock, so lock() and unlock() must be paired. Then wake up the first thread in the waiting queue to occupy the lock.
- Thread blocking and unblocking. LockSupport.park(thread) is used in AQS to suspend threads, and unpark is used to wake up threads.
- Blocking queues. Because there may be many threads scrambling for locks, but only one thread can get the locks. Other threads have to wait. At this time, a queue is needed to manage these threads. AQS uses a FIFO queue, which is a linked list. Each node holds the reference of successor nodes. AQS is implemented with a variant of CLH lock. Interested readers can refer to this article. Introduction to CLH It's simple and clear.
Example Diagram Analysis
Here's a retrospective. Let's take a simple example. If you don't understand something above, here's another chance to help you understand it.
First, the first thread calls reentrantLock.lock(), and when you turn to the front, you can see that tryAcquire(1) returns true directly and ends. It's just state=1, not even the head is initialized, let alone the blocking queue. If thread 1 calls unlock(), then thread 2 comes, then the world is peaceful and there is no intersection at all, then why do I need AQS?
If thread 1 did not call unlock(), thread 2 called lock(), think about what would happen?
Thread 2 initializes head [new Node()], while thread 2 inserts into the blocked queue and hangs (note that this is a for loop, and the part that sets head and tail is not return ed, only if the queue is successful will it jump out of the loop)
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
First, thread 2 initializes the head node, at which time headtail, waitStatus0
Thread 2 then queues up:
At the same time, we also need to look at the waitStatus of the node at this time. We know that the head node is initialized by thread 2. At this time, waitStatus is not set. java will be set to 0 by default. But when the shouldParkAfterFailed Acquire method is used, thread 2 will set the precursor node, that is, the waitStatus of the head, to -1. .
What is the waitStatus of the thread 2 node at this time, because it is not set, so it is 0;
If thread 3 comes in at this time, it can be inserted directly after thread 2. At this time, the waitStatus of thread 3 is 0, and the waitStatus of precursor node thread 2 is set to -1 when shouldParkAfterFailedAcquire method is used.
Here we can briefly talk about the SIGNAL(-1) state in waitStatus. Doug Lea notes that it represents that the successor node needs to be waked up. That is to say, this waitStatus actually represents not its own state, but the state of the successor nodes. We know that each node will change the state of the precursor node to SIGNAL when it joins the queue, then block and wait for the precursor to wake up. There are two issues involved here: there are threads that cancel queuing and wake-up operations. In fact, the essence is the same. Readers can also follow the idea that "waitStatus represents the status of successor nodes" to see the source code.
(End of the text)