Explain the AQS source code hand in hand
1, Overview
This article will take ReentrantLock as an example to show you the source code of AQS. In fact, it is not difficult. The following is a small case of fair lock. You can run and feel it yourself. The following will take you to read the source code bit by bit. If you look at it carefully, you will find that it is not difficult.
/** * @author VernHe * @date 2021 December 2 */ public class TestAQS { /** * true Indicates a fair lock, and false indicates a non fair lock */ static ReentrantLock reentrantLock = new ReentrantLock(true); public static void main(String[] args) { for (int i = 0; i < 5; i++) { new Thread(new Target(i)).start(); } } static class Target implements Runnable { // Number of each task private int num; public Target(int num) { this.num = num; } @Override public void run() { for (int i = 0; i < 2; i++) { reentrantLock.lock(); try { System.out.println("task" + this.num + "Yes"); } finally { // The unlock method must be written in finally reentrantLock.unlock(); } } } } }
2, Source code part
ReentrantLock
Press Ctrl + the left mouse button and click the lock() method to enter the method
public void lock() { sync.lock(); }
Here, you will find that the lock() method of sync is used. Press and hold Ctrl and click sync to find that it is actually an abstract static internal class
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { //Omitted here } }
According to the English comments (if you don't understand it, you can translate it). It can be seen that ReentrantLock implements fair / unfair locks based on the Sync class. The state attribute in AQS is used to represent the number of locks. In fact, AQS is actually AbstractQueuedSynchronizer. Continue to click the lock() method, select FairSync, and you can see the following source code
static final class FairSync extends Sync { final void lock() { acquire(1); } // ... omitted }
Enter acquire(1)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
Here are three methods in if:
-
tryAcquire()
As its name implies, it will try to acquire a lock once. The logic of fair / unfair lock will be a little different
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // Get the value of state int c = getState(); if (c == 0) { // If the state is 0, it means that everyone is currently using it if (!hasQueuedPredecessors() && // Fair: it will judge whether there are other threads in front of it. If it is not fair, it will not compareAndSetState(0, acquires)) { // CAS operation, trying to get setExclusiveOwnerThread(current); // Set yourself as an exclusive thread return true; } } else if (current == getExclusiveOwnerThread()) { // If someone uses it and the person who uses it is himself int nextc = c + acquires; // Each lock() will increase the value of state by 1 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); // Update state return true; } return false; }
To sum up, the above method is actually very simple. It will return true only when [no one uses it and the current thread is successfully monopolized] or [the thread being monopolized is its own]. To put it more popularly, try to see if it can be monopolized
-
addWaiter()
In fact, to put it bluntly, after the above attempt to obtain the lock fails, it will be added to the waiting queue
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Create a node for the current thread and set the corresponding mode // Try the fast path of enq; backup to full enq on failure Node pred = tail; // Point to the Node at the end of the waiting queue if (pred != null) { // If a thread is waiting node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; // Line up behind the last Node return node; // After successful queuing, the Node corresponding to the current thread is returned } } enq(node); // If this thread is the first to queue, or the previous queue is unsuccessful, try queuing again until it succeeds return node; // After successful queuing, the Node corresponding to the current thread is returned }
To sum up, the popular point is to queue up
-
acquireQueued()
If CAS spin is done here, the lock will be obtained continuously, and the current thread will be blocked after failure
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // Record exclusive failed try { boolean interrupted = false; // Whether the recording thread is interrupted for (;;) { // Loop until successful exclusive final Node p = node.predecessor(); // Get previous Node if (p == head && tryAcquire(arg)) { // If you are the second and succeed in monopolizing setHead(node); // Set the current Node to the new head p.next = null; // help GC failed = false; return interrupted; // Return, jump out of loop } if (shouldParkAfterFailedAcquire(p, node) && //Check if you should be blocked parkAndCheckInterrupt()) // Block the current thread (debug will stop at this step) interrupted = true; // When the thread is awakened again, it will know this method, and then continue the loop } } finally { if (failed) cancelAcquire(node); // If the exclusive fails, the following nodes will be awakened to continue to execute this method } }
AbstractQueuedSynchronizer
component
1. The waiting queue (CLH queue) is essentially a two-way linked list
2. state variable to save the synchronization status
3. Head and tail pointers are used to save the head and tail of the waiting queue
4. Internal class Node, through two pointers, the previous predecessor Node and the successor Node
5. Methods of operating queues and a series of CASnative methods
3, Summary
Personally, the AQS framework extracts many characteristics of synchronization strategies, such as semaphores, mutexes, and situations where some threads need to wait in various locks. Therefore, a blocking queue (CLH) is extracted to save blocked threads and wake up later. Different synchronization strategies allow different numbers of threads to run at the same time. Therefore, a state variable is extracted. Then there are a series of CAS methods to operate the blocking queue, and the bottom layer is the atomic operation implemented by C language.