AQS detailed explanation and source code annotation analysis

AQS detailed explanation and source code analysis

1. General

What is AQS? The full name is AbstractQueuedSynchronizer, which is an abstract class in JDK. First, let's look at the classes that inherit it:

Basically, all classes in JUC concurrent packages are related to it. AQS is a heavyweight basic framework used to build locks or other synchronizer components (read-write locks, etc.) and the cornerstone of the whole JUC system. It completes the queuing of resource acquisition threads through the built-in FIFO queue, and represents the state of holding locks through an int type variable.

Concurrent operations on shared resources can be maintained through AQS.

2. Composition

Through this UML diagram, we can know the source and internal composition of AQS. Two internal classes are also maintained in the AQS abstract class: node and conditionobject, which play a key role in the maintenance of shared resources. Node (1) is used for data encapsulation and user status maintenance of connectionobject. It can be preliminarily considered that AQS is a FIFO queue composed of synchronizer and two-way linked list.

As shown below:

State maintains the state of shared resources. Through the definition of state, it can describe the state of shared resources, whether they are held by threads or can be obtained. And changes to state are atomic.

(1) : here is a two-way linked list queue FIFO, which encapsulates threads into nodes and adds them to the queue. The Node is composed of waitstatus + front and rear pointers. Waitstatus is used to identify a waiting state of threads in the queue.

3.LockSupport

3.1 general

Before studying the source code of AQS, you need to understand LockSupport, the basic thread blocking primitive used to create locks and other synchronization classes, and the upgraded version of thread waiting wake-up mechanism.

LockSupport class uses a concept called permission to block and wake up threads. Each thread has a permission. Permission has only two values: 1 and 0. Each thread defaults to 0.

Permission can be regarded as a (0,1) Semaphore, but different from Semaphore (an auxiliary synchronization class of JUC), the cumulative online value of permission is 1.

The licenses in Semaphore, the auxiliary synchronization tool in JUC, can be accumulated, and LockSupport is up to 1, no matter how many copies are sent to the specified thread

3.2 waiting and wake-up mode

  • Use the wait() method in the Object class to block the thread, and notify() and allnotify() wake up the blocked thread
  • Use the await() method of Condition in the ReentrantLock class in the JUC package to block the thread, and the single() method notifies to wake up the specific thread
  • The LockSupport class can be used to block the current thread and wake up the specified blocked threads park() and unpark()

3.3 park() and unpark()

3.3.1 park()

When the thread calls park() by default, because the default value of the current permission is 0, the current thread will block. The blocked thread will not wake up until other threads (unpark) set the permission of the current thread to 1. Then a permit will be consumed.

The bottom layer is implemented through the unsafe class (operating system primitive)

3.3.2 unpark()

After the unpark() method is called, the permission of the specified thread will be increased by 1. However, multiple calls will not increase the permission (not accumulate), and will automatically wake up the thread blocked by the park and return.

The bottom layer is implemented through the unsafe class (operating system primitive)

3.3.3 examples

public class LockSupportDemo {
   
    public static void main(String[] args) {
        Thread a = new Thread(() -> {
            // Block thread
            LockSupport.park();
            // Waiting to be awakened
            System.out.println(Thread.currentThread().getName()+"\t Awakened");
        },"Thread one");
       
        // Here, let's start the wake-up thread and test whether it needs to be blocked before wake-up
        a.start();

        Thread b = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                // The unpark method needs to pass a specified thread
                LockSupport.unpark(a);
                System.out.println(Thread.currentThread().getName()+"\t Unlock thread 1");
            } catch (InterruptedException e) {
                System.out.println(e);
            }
        },"Thread two");

        b.start();
    }
}
/*
* Operation results:
* Thread two 	  Unlock thread 1
* Thread one 	  As soon as the thread is awakened
* */

3.3.4 compare the other two blocking and wake-up modes

  1. wait and notify cannot be used separately from synchronized

    An illegal monitor exception will pop up

  2. notify must be executed after wait to wake up successfully

    When waking up, there must be waiting. If it is blocked after waking up, the currently blocked thread will not wake up

    That is, the wake-up time is limited to this moment, and the threads blocked later will not be awakened

  3. await and single in ReentrantLock also have the same problems as above

  4. The use of park and unpark in LockSupport does not need to be used with locks. It can simply block and wake up threads, and there is no necessary sequence between blocking and wake-up

    No order is needed because it will be awakened as long as permission is issued, so it has nothing to do with the time of notification, as long as there is one.

3.4 source code analysis

public static void unpark(Thread thread) {
   if (thread != null)
      UNSAFE.unpark(thread);
}

public static void park(Object blocker) {
   Thread t = Thread.currentThread();
   setBlocker(t, blocker);
   UNSAFE.park(false, 0L);
   setBlocker(t, null);
}

// View unsafe Unpark and unsafe park
public native void unpark(Object var1);

public native void park(boolean var1, long var2);

The bottom layer is the primitive level control through the native code in the unsafe class. (1)

Each call to park will consume a permit. If the permit is 0, there is nothing to consume, which will cause the thread to block at the moment. (default = 0)

unpark is the permission issued to the specified thread, but the issued permission cannot be accumulated. As mentioned earlier, the permission has only two states: 1 and 0.

You can let the thread block for many times in different time periods (without lock blessing), and then issue permission again at a specified time to wake up the thread, so as to block and wake up for many times.

(1) : the bottom layer of the unsafe class is implemented in C language. You can't view it in the Java source code, but look for sun.exe in the jar package under the JDK installation directory misc. The unsafe path can be found.

4. Source code analysis

As mentioned earlier, many locks and synchronizers in JUC are inherited from AQS, so AQS is an important framework cornerstone of JUC.

4.1 General

AQS is composed of synchronizer and queue in data structure. The queue is a virtual two-way queue at the bottom.

Principle overview: threads that grab resources directly use resources and process business logic. Threads that do not grab resources must involve a queuing mechanism (queue implementation). The thread that failed to preempt continues to queue, but the waiting thread still retains the possibility of acquiring the lock, and the process of acquiring the lock continues.

The synchronizer manages the waiting threads in the queue.

If shared resources are occupied, a certain blocking waiting wake-up mechanism is needed to ensure the allocation of locks. This mechanism is mainly implemented by using the variant of CHL (with a graph at the beginning and a two-way linked list queue). The thread that cannot obtain the lock temporarily is added to the queue and maintained through AQS (with a head node and tail node). This queue is the abstract expression of AQS.

It encapsulates the thread requesting shared resources into the node of the queue (the node has front and rear node pointing and waitstate identification, etc.), through CAS, spin and locksupport Park () maintains the state of the state (int type modified by volatile) variable (when the variable state is greater than 0, it indicates that a thread has obtained the shared resources), so as to achieve the control result of synchronization.

4.2 process details

Through the whole process of multiple threads competing for shared resources in ReentrantLock, this paper analyzes how AQS realizes the management of multithreading concurrency.

####4.2.1 try to lock

The first incoming thread will directly acquire the lock and set the lock of the shared resource object to the current thread. It will also modify the current state value to 1, indicating that the resource has been occupied.

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

4.2.2 locking failure

If the incoming thread finds that the shared resource object has been occupied by threads, it will try to get it again first.

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
1.tryAcquire(arg)

After the first failure to obtain resources, you will try to lock

protected final boolean tryAcquire(int acquires) {
   return nonfairTryAcquire(acquires);
}
========
final boolean nonfairTryAcquire(int acquires) {
   final Thread current = Thread.currentThread();
   int c = getState();
   // 1. If the current resource status is 0, it indicates that the previous thread using the resource has been completed
   if (c == 0) {
      // 1.1 try to lock and set the acquisition thread of the current shared resource as itself
      if (compareAndSetState(0, acquires)) {
         setExclusiveOwnerThread(current);
         return true;
      }
   }
   // 2. The following judgment is to judge whether the thread currently operating shared resources is the thread executing the current method.
   // 2. If yes, the value on the flag bit state will be increased by 1 (1)
   else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
         throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
   }
   return false;
}

If the attempt fails and returns false, the following acquirequeueueueueued() method will be executed. Before entering, the system will create a new node (sentinel node) for the waiting queue without storing any information to help occupy the space, and then the line that needs to occupy shared resources will enter, as shown in the figure:

(1) : the reason for this judgment is that the previous thread occupying the resource has just finished using it, but it will continue to use it later, so the resource will be preempted later = = > reentrant lock

2.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

addWaiter(Node.EXCLUSIVE) method. After the previous attempt to obtain the lock again fails, this method will be executed to add the current thread to the waiting queue. Source code:

private Node addWaiter(Node mode) {
   // 1. Create node
   Node node = new Node(Thread.currentThread(), mode);
   // Try the fast path of enq; backup to full enq on failure
   // 2. Get queue tail node
   Node pred = tail;
   // 3. Judge whether the current queue is empty
   // 3.1 not null
   if (pred != null) {
      // 3.1.1 add the current node to the end of the queue through CAS
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
         pred.next = node;
         return node;
      }
   }
   // 3.2 if the queue is empty, execute enq(node) method to initialize a queue
   enq(node);
   return node;
}

If the thread does not attempt to obtain the lock through the add. Queue method, it will not be blocked for another time. If the thread does not attempt to obtain the lock through the add. Queue method, it will not wake up again.

And set the waitstate of the sentinel node to - 1. Source code:

final boolean acquireQueued(final Node node, int arg) {
   // The default retry is failure
   boolean failed = true;
   try {
      // Default non interruptible
      boolean interrupted = false;
      // for(;;)  Equivalent to while(true), a spin
      for (;;) {
         final Node p = node.predecessor();
         // 1. Judge that the current node is the head node of the queue. If it is the head node, try to obtain the lock
         if (p == head && tryAcquire(arg)) {
            // 1.1 acquire lock successfully
            setHead(node);
            // 1.2 departure
            p.next = null; // help GC
            failed = false;
            return interrupted;
         }
         // 2. If the acquisition fails, shouldParkAfterFailedAcquire(p, node) performs park blocking
         // 2.parkAndCheckInterrupt() determines whether the current thread can be interrupted
         if (shouldParkAfterFailedAcquire(p, node) &&
             parkAndCheckInterrupt())
            interrupted = true;
      }
   } finally {
      if (failed)
         cancelAcquire(node);
   }
}

shouldParkAfterFailedAcquire(p, node) and parkAndCheckInterrupt() methods process the current head node through the state value of the sentinel node. If the lock cannot be obtained, it will be park and blocked. Source code:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   int ws = pred.waitStatus;
   if (ws == Node.SIGNAL)
      		/*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
      return true;
   if (ws > 0) {
      		/*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
      do {
         node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
   } else {
      		/*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
   }
   return false;
}
=========
private final boolean parkAndCheckInterrupt() {
     // Block the thread and wait for it to wake up
     LockSupport.park(this);
     return Thread.interrupted();
}

At this point, the thread that cannot get the resource is really blocked and queued.

4.2.3 unlocking

When the thread resource is used up, it will be unlocked. The source code of Reentrant unlock() method is as follows:

public void unlock() {
   sync.release(1);
}

The release(1) method performs the real unlocking operation in the AQS abstract class. Source code:

public final boolean release(int arg) {
   // 1. Try to unlock the resource
   if (tryRelease(arg)) {
      // 1.1 after the resource is unlocked, obtain the header node
      Node h = head;
      // 1.2 determine whether there are waiting threads
      if (h != null && h.waitStatus != 0)
         // 1.3 unlocking head node
         unparkSuccessor(h);
      return true;
   }
   return false;
}

tryRelease(arg) method to truly unlock resources. Source code:

protected final boolean tryRelease(int releases) {
   // 1. Get the status of resource status-1
   int c = getState() - releases;
   // 2. Judge whether the current thread is a thread holding resources
   if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
   boolean free = false;
   // 3. Set the resource status to 0 
   if (c == 0) {
      // 3.1 setting resources successfully unlocked
      free = true;
      // 3.2 set the holding thread of the resource to null
      setExclusiveOwnerThread(null);
   }
   // 4. Set resource status
   setState(c);
   return free;
}

4.3 summary

  1. No threads occupy shared resources and are locked directly

    • If so, subsequent threads that need to preempt shared resources will join the queue for queuing

      This queue is managed by the AQS synchronizer

  2. Unlock after use

    • The queue in the queue will be preempted together with the thread currently executing the resource
    • If the thread in the queue preempts resources, the sentinel node in the queue will leave the queue, and the original first waiting node will become a new sentinel node

Through AQS, we know how to manage threads that forcibly share resources in Java to ensure concurrency. synchronized and ReentrantLock are non fair locks by default, and the resource contention of subsequent threads is random, but ReentrantLock can be set as fair lock by setting the parameter to true when instantiating, Then locks will be obtained in sequence according to the queue entry order, which will reduce the concurrency performance.

The difference between fair lock and non fair lock lies in the internal class selection problem when ReentrantLock executes the locking method.

The fair lock will select the static final class fairsync extensions sync internal class, while the non fair lock will select the static final class nonfairsync extensions sync, so as to reach the waiting queue to obtain resources.

Keywords: Java Back-end JUC

Added by Hypnos on Wed, 09 Feb 2022 06:30:36 +0200