Exclusion lock analysis of AQS series

Catalogue of series articles

AbstractQueuedSynchronizer basic analysis of AQS series
Exclusion lock analysis of AQS series

1, Get resources

1. acquire resource

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

This method is the entrance to obtain resources. The detailed method will be introduced later. Here we mainly describe a large process.

  1. An attempt to obtain a resource ends if successful
  2. Failed to get resources. Create a new node and insert it into the queue
  3. Queue in the queue and get resources
  4. If a thread interrupt occurs during the wait, interrupt itself
    You can see the following:
    (1) The first is an attempt to obtain resources. The specific attempt logic is implemented by each implementation class. If it fails, it will be added to the queue
    (2) This method does not respond to the interrupt. If the thread is interrupted during the waiting process, the thread will not respond. It will only interrupt itself after the waiting is over

2. Join the queue addWaiter

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //Failed cycle spin attempt
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //In a special scenario, the head and tail pointer nodes are initialized without initialization
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

This method is used to queue thread nodes. The queue logic is not complex. Initialize a node in exclusion mode to join the tail of the queue, and the tail points to the node.
This method makes a quick attempt first, and then spins in a loop after failure. This implementation mode is very classic. Many JDK and contract source codes are implemented in this way. This method can also be seen in the subsequent AQS logic.

3. Queued to obtain resources acquirequeueueueueueueueued

final boolean acquireQueued(final Node node, int arg) {
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
         	 // Get precursor node
             final Node p = node.predecessor();
             // Queue to the first and try to acquire the lock
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
             }
             //In the queue, confirm whether blocking is required
             //If necessary, block and retry after being awakened or interrupted
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         if (failed)
         	//Canceling queuing is only applicable to the scenario where exceptions are thrown during queuing
             cancelAcquire(node);
     }
 }

This method is the core logic, which realizes the logic from a node queuing and blocking in the queue to the success of obtaining resources. It has a return value and returns whether the thread in the queuing process has been interrupted.
Finally, considering the scenario of throwing exceptions in the queuing process, a finally processing is added to cancel the queuing for such manufacturers to obtain resources. The main logic is to release resources, which is basically equivalent to the subsequent release logic of releasing resources.

4. Blocking check shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     int ws = pred.waitStatus;
     //The only case that fits
     if (ws == Node.SIGNAL)
         /*
          * This node has already set status asking a release
          * to signal it, so it can safely park.
          */
         return true;
     //What if it doesn't meet the requirements
     if (ws > 0) {
     	//Ws > 0 indicates that the node is invalid. Adjust the queuing position and find a valid precursor node forward
         /*
          * Predecessor was cancelled. Skip over predecessors and
          * indicate retry.
          */
         do {
             node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
     } else {
         /*
          * waitStatus must be 0 or PROPAGATE.  Indicate that we
          * need a signal, but don't park yet.  Caller will need to
          * retry to make sure it cannot acquire before parking.
          */
         //Modify precursor node status
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
     return false;
 }

This method is very important. It is used for checking and processing before blocking queuing. Blocking can only be performed if the conditions are met, otherwise it will continue to retry in the loop.
You can see the overall logic:

  1. Meet the blocking conditions: the status of the precursor node is singal
  2. What to do if it does not comply: adjust the queue, find a valid precursor node, and modify the status of the precursor node
    It can be seen that this is consistent with the previous explanation of the signal state (the successor node of the signal node handles the blocked or about to be blocked state).
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

The above is a specific blocking method. You can exit blocking only when you are awakened or the thread is interrupted. The last sentence checks and clears the interrupt flag, so if the acquire method is interrupted, it needs to be self interrupted once.

2, Release resources

1. release resources

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //If there are nodes in the queue, you need to wake up
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

Similarly, corresponding to trying to obtain resources, tryRelease trying to release resources is also handed over to the implementation class for implementation.

2. Wake up the successor node unparksuccess

private void unparkSuccessor(Node node) {
     /*
      * If status is negative (i.e., possibly needing signal) try
      * to clear in anticipation of signalling.  It is OK if this
      * fails or if status is changed by waiting thread.
      */
     int ws = node.waitStatus;
     //Modify node status, marking
     if (ws < 0)
         compareAndSetWaitStatus(node, ws, 0);

     /*
      * Thread to unpark is held in successor, which is normally
      * just the next node.  But if cancelled or apparently null,
      * traverse backwards from tail to find the actual
      * non-cancelled successor.
      */
     //Find the node to wake up
     Node s = node.next;
     if (s == null || s.waitStatus > 0) {
         s = null;
         //Note that if the successor node of the current node does not exist or is invalid
         //Look forward from the tail node to find the first valid node after the current node
         for (Node t = tail; t != null && t != node; t = t.prev)
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)
         LockSupport.unpark(s.thread);
 }

The method is not complicated

  1. Modify the status flag of the current node
  2. Find the next node and wake up the thread if it exists

3, Total process of obtaining & releasing resources

4, Other methods of obtaining resources

1. Response interrupt acquireinterruptible

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    //1. Perform an interrupt check when trying to get resources
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
     	throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //1. Perform an interrupt check when trying to get resources
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

The main difference is that the exception thrown when the thread is interrupted is completely the same as others

  1. An interrupt check was performed when trying to get a resource
  2. Exception thrown by interrupt when blocking

2. Timeout parameter tryAcquireNanos

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    //1. When queuing to obtain resources, first judge whether it has timed out
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            //2. Each time you judge whether blocking is required, you should first judge whether it is timeout
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
            	//3. When blocking, check is added. If the time is insufficient, only spin is performed to reduce the performance consumption of blocking
                nanosTimeout > spinForTimeoutThreshold)
                //4. Block the specified time when blocking
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

First of all, this method responds to interrupts, so how to implement the timeout logic compared with the acquireinterruptible method

  1. When queuing to obtain resources, first judge whether it has timed out
  2. Each time you judge whether blocking is required, you first judge whether it times out
  3. When blocking, check is added. If the time is insufficient, only spin is performed to reduce the performance consumption of blocking
  4. Blocking for a specified time

Keywords: Java JDK Back-end

Added by prime on Sun, 26 Dec 2021 09:55:52 +0200