AQS Source Exploration-JUC Series

Python WeChat Subscription Applet Course Video

https://edu.csdn.net/course/detail/36074

Python Actual Quantitative Transaction Finance System

https://edu.csdn.net/course/detail/35475
This article has been published on Public Number. Thank you for your interest and look forward to communicating with you.

AQS Source Exploration-JUC Series

Sharing mode

doAcquireShared

This method fails to get resources in shared mode, performs queue and wait operations, and the waiting threads spin in this method after being waked up until they get the resources.

/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
  	// New Nodes Enqueued, SHARED Mode
    final Node node = addWaiter(Node.SHARED);
  	// Identity acquisition failed
    boolean failed = true;
    try {
      	// Identifies if the thread is interrupted
        boolean interrupted = false;
        for (;;) {
          	// Get the front node of the current node
            final Node p = node.predecessor();
          	// Front Node is Head Node
            if (p == head) {
              	// Try to get resources [1]
                int r = tryAcquireShared(arg);
              	// Return value greater than or equal to 0 is successful
                if (r >= 0) {
                  	// Set Head Node [2]
                    setHeadAndPropagate(node, r);
                  	// Disconnect previous node next reference
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
          	// Cancel node trying to acquire lock
            cancelAcquire(node);
    }
}

  • [1] The implementation logic in this queue-waiting method is basically the same as in exclusive mode. Shared mode uses tryAcquireShared to get the return value of the resource method and int s to indicate success or failure. If successful, the header is set and the original header is moved out of the queue
  • [2] Exclusive mode code calls setHead after successful resource acquisition, shared mode calls setHeadAndPropagate method, setHeadAndPropagate method executes setHead and then calls doReleaseShared() to trigger an attempt to wake up the node. It is understandable to trigger a wake-up node when a resource is released, but here is why a wake-up node needs to be triggered after the successor node of the head node has successfully acquired the resource.

A special analysis of this point is:

+ release Wake-up operations are performed from head Look back for nodes, and find only one, not many at a time to wake up
+ In exclusive mode release Operations require judgment only head Not for null also head Of waitStatus It is not cancel state that wakes up subsequent nodes, because only one thread in exclusive mode gets resources, and only the nodes in the queue wait head Latest**One**A valid node needs to wake up, so it is no longer necessary for this wakened node to decide if it needs to wake up its successor nodes, only relying on its own release That's it
+ In shared mode, multiple threads can obtain resources at the same time successfully, which means that multiple threads can release resources at the same time, so they are still dependent head Look back**One**A valid node wake-up must not satisfy the requirements, assuming that there are already multiple threads waiting in the queue for resource release, which instantly releases two resources and wakes them when they are released head The successor nodes, which the two release threads get head It's the same, so you can't wake up at this time head The last two valid nodes, and the expectation is wake up**Two**,This is key to understanding the different ways in which exclusive and shared modes handle wake-up operations. After understanding these points, think again and just add head It is sufficient to trigger wake-up when a node changes, and subsequent nodes in the queue will wake up in turn to be head Then continue waking up the node backwards, which looks like a propagation ( propagate)Action.

setHeadAndPropagate

/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);//Set Head Node [1]
    /*
 * Try to signal next queued node if:
 * Propagation was indicated by caller,
 * or was recorded (as h.waitStatus either before
 * or after setHead) by a previous operation
 * (note: this uses sign-check of waitStatus because
 * PROPAGATE status may transition to SIGNAL.)
 * and
 * The next node is waiting in shared mode,
 * or we don't know, because it appears null
 *
 * The conservatism in both of these checks may cause
 * unnecessary wake-ups, but only when there are multiple
 * racing acquires/releases, so most need signals now or soon
 * anyway.
 */
  	// Determine if wake-up action is required [2]
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

  • [1] Set the head er and then propagate (doReleaseShared), as shown in the previous method name

  • [2] The first if judgment is broken down as follows:

    • 1, propagate > 0, which is interpreted as true to indicate that there are resources available to access directly if
    • 2, h == null || h.waitStatus < 0, this h is the old head stored before the head is set, this first determines if the waitStatus of the old head enters if it is less than 0, and the first blank is the air defense pointer
    • 3, (h = head) === null || h.waitStatus < 0, neither of the first two holds true, and the new head waitStatus will be judged to enter if they are less than 0. The second if judges that only if it is certain that the successor node is a shared mode, the doReleaseShared method will be called.

releaseShared

/**
 * Releases in shared mode. Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument. This value is conveyed to
 * {@link #tryReleaseShared} but is otherwise uninterpreted
 * and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
@ReservedStackAccess
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

When release Shared executes successfully, call doReleaseShared to complete the resource release, focusing on the doReleaseShared code.

doReleaseShared

You already know that there are two places to call doReleaseShared, one is when a resource is released, and the other is when a subsequent node of a queue header node gets a new header (setHeadAndPropagate)

/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
 * Ensure that a release propagates, even if there are other
 * in-progress acquires/releases. This proceeds in the usual
 * way of trying to unparkSuccessor of head if it needs
 * signal. But if it does not, status is set to PROPAGATE to
 * ensure that upon release, propagation continues.
 * Additionally, we must loop in case a new node is added
 * while we are doing this. Also, unlike other uses of
 * unparkSuccessor, we need to know if CAS to reset status
 * fails, if so rechecking.
 */
  	// Execute within loop [1]
    for (;;) {
        Node h = head;
      	// [2]
        if (h != null && h != tail) {
            int ws = h.waitStatus;
          	// Head node waitStatus is SIGNAL [3]
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
          	// Head node waitStatus is 0 [4]
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
      	// The only judgement of a loop is that the head node has not changed
        if (h == head)                   // loop if head changed
            break;
    }
}

  • [1] The function of this method is to wake up the head's successor nodes, but here comes a dead loop. The exit condition is that the execution does not change until the last head. That is to say, whether unparkSuccessor is triggered or not when the execution condition is met, as long as the head changes, the logic to wake up the head's successor nodes must continue to loop. That is, it is not a thread that enters this method logic and finishes waking up. As long as the head keeps changing, there may be many threads executing this logic at the same time without exiting the loop until tryAcquireShared returns a negative number. Imagine many threads executing this loop code at the same time, if tryAcquireShared also happens to succeed. This enables those waiting nodes to wake up faster and gain resources.
  • [2] h!= Null & & h!= Tail's judgment is considered to be able to tell if the waiting queue is waiting for wakeup nodes at this time, so it goes into the logic behind it. In fact, there is also a scenario where the initialization header node performs cas set head, tail is still empty, which was highlighted in the previous section. This scenario can then be ignored in the logic of trying to wake up the head successor node here.
  • [3] If the waitStatus of the head are SIGNAL, then it means that the next node is eligible to wake up, but it needs cas to modify the waitStatus to 0 before it succeeds. If the modification fails, it means that a thread has already done the same thing first, so there is no need to go below to determine if the head has changed and continue the loop directly.
  • [4] If the waitStatus of the head is 0, cas is modified to PROPAGATE. If the modification fails, it means that a thread has already done the same thing first, and continues the loop directly as before. This state of PROPAGATE is analyzed in detail later.

Interrupt mode

It is found in the AQS notes and API s that there are uninterruptible mode s and interruptible modes based on whether or not an InterruptedException will be thrown. For example, the essential difference between acquire(int arg) and acquireInterruptibly(int arg), acquireShared(int arg), and acquireShared Interruptibly (int arg) is whether or not the execution process responds to interruptions.

Take acquireSharedInterruptibly as an example

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

Enter the method to determine the thread interrupt flag first, it seems very concerned about whether the thread executing the code is interrupted, haha.

doAcquireSharedInterruptibly method:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();[1]
        }
    } finally {
        if (failed)
            cancelAcquire(node);[3]
    }
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();[2]Return thread interrupt flag
}

  • [1] [2] [3], where the thread waits is LockSupport for parkAndCheckInterrupt. Park (this); This line of code continues to execute from below when waking up. A key piece of information involved here is LockSupport.park(this); Inside is a call to UNSAFE. The Park method, which is the native method, because there is no wait time set here, there are only two possible ways to wake up the thread: 1, calling the unPark method; 2, Thread interrupt. When a thread wakes up, it is not possible to confirm whether it wakes up unPark or interrupted. So in any case, you get the interrupt flag in the first step [2] after waking up the thread, and if the thread interrupts parkAndCheckInterrupt and returns true, an InterruptedException exception is thrown.

Throwing an exception exits the spin and executes cancelAcquire(node). As detailed in the previous section on this method, it sets the waitstatus of the incoming node to the CANCELLED state.

The uninterruptible mode l implements Thread after the wake-up thread interrupt flag is true. CurrentThread(). Interrupt(); Set the interrupt again and do not process the interrupt. Users can check thread interrupt status outside for processing.

Timeout mechanism

In addition to interrupts, AQS also provides an API with a timeout mechanism based on the Unsafe park(boolean isAbsolute, long time) method.

For example, the doAcquireSharedNanos method in AQS:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)// Parameter Judgment
        return false;
  	// Calculate expiration time [1]
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
          	// Calculate Remaining Time [2]
            nanosTimeout = deadline - System.nanoTime();
          	// Return to [3] if it has timed out
            if (nanosTimeout <= 0L)
                return false;
          	// Limit remaining time [4]
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
          	// Handling interrupts [5]
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  • [1] Note that the expiration time is calculated before entering the for loop and will not change
  • [2], [3], [4] added the remaining time of spin calculation to the original spin logic. Each cycle uses deadline to calculate the remaining time-out first, and returns after the time-out. This for loop may be executed several times, time will be consumed during execution, and it may have already timed out before entering the park operation. The condition before Park execution is that the wait time must exceed spinForTimeoutThreshold (1000 microseconds). If the 1000 microseconds are not enough, there is no need to wait for the thread. Executing the loop code directly consumes time, and judging from [3], exit the loop
  • [5] Both doAcquireSharedNanos and doAcquireNanos methods in AQS are handling interrupts

Shared Mode Synchronizer

The following describes two synchronizers extended using shared mode: Semaphore and CountDownLatch.

Semaphore

Semaphore is often used to limit the number of threads accessing resources.

Take the operation of a parking lot as an example. For simplicity, assume that the parking lot has only three parking spaces, all three parking spaces are empty at first. If five cars arrive at the same time, the doorman allows three of them to enter directly, then drops down the car block. The rest of the cars have to wait at the entrance, and subsequently the cars have to wait at the entrance. At this time, a car left the parking lot. When the doorman learns, he opens the car Block and puts one outside into it. If he leaves two more cars, he can put two more, so he can go back and forth.

In this parking lot system, parking spaces are public resources, and each car is like a thread, and the gatekeeper is the semaphore.

- Baidu Encyclopedia

Similar to the ReentrantLock implementation, it also has a Sync class inside, and the subclasses NonfairSync and FairSync implement unfair and fair modes, respectively.

NonfairSync.tryAcquireShared

Semaphore implements the tryAcquireShared method on its own.

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
// Sync
final int nonfairTryAcquireShared(int acquires) {
    for (;;) { // Spin [1]
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) //cas[2]
            return remaining;
    }
}

  • [1], [2] The nonfairTryAcquireShared implementation is a cyclic + cas operation that operates on the state field in AQS, gets the current state value first, and then calculates the remaining number using the number of requests. If the remaining number is less than 0, returns this negative value directly, indicating that the acquisition failed; If greater than or equal to 0 indicates an opportunity to succeed, cas attempts to update the state value, which does not take ABA into account because the ABA situation is also correct for the logic at this point. Of course, cas success is success, and if cas fails, nothing happens, and the loop continues. Method has only two possible returns, one cas succeeds and returns a value greater than or equal to 0. One calculates that the remainder is less than 0 and returns a value less than 0.

FairSync.tryAcquireShared

For fair scenarios, the Semaphore implementation is the same as ReentrantLock:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

The core is the hasQueuedPredecessors method, which has been analyzed in detail in the previous section and is consistent with other code and unfair pattern code.

Sync.tryReleaseShared

Either fair or unfair, the resource release operation executes the AQS releaseShared method to the tryReleaseShared method.

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

As with the previous get operation, the release operation requires modifying the state value using a loop + cas, throwing an exception in the case of an int value overflow.

Semaphore example

The following is an example code where Semaphore has an initial value of 2 for a total of 5 threads. The first two threads acquire d successfully, the other three threads entered the queue and waited. After 5 seconds, the two threads executed release separately, waiting for the thread in the queue to wake up, the first two threads in the queue succeeded, and the last thread continued to wait until another thread executed release.

public class SemaphoreTest {

    static Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i++) {
            executorService.submit(new Task());
        }
    }
    
    static class Task implements Runnable{

        @Override
        public void run() {
            try {
                semaphore.acquire();
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName()+" semaphore release");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

The following illustrations are used:

1, three threads enter the wait queue and wait. The state value changes from 2 to 0 after the first two threads get success. Note that the node pointed to by the head is a virtual node, so there are actually four nodes in the wait queue at this time. The last node has zero waitstatus because there are no successor nodes, and the other nodes have SIGNAL waitstatus.

After 2,5 seconds, the first two threads that succeeded execute release one after the other, which executes to the doReleaseShared method to wake up the subsequent node of the head. We assume a concurrent scenario where the first thread in the doReleaseShared method code successfully executes compareAndSetWaitStatus(h, Node.SIGNAL, 0) and wakes up a subsequent node thread, T3, where the head's state is 0, The second thread executes compareAndSetWaitStatus(h, 0, Node.PROPAGATE) to update the header status to PROPAGATE, then assumes that the awakened thread has not yet updated the header pointing, the second thread exits after determining h == head is true. The awakened thread continues to execute doAcquireShared's spin code because the resource has been released and it succeeds, then executes the setHeadAndPropagate method to set the head and continue waking up the subsequent nodes, since the old head state in this scenario is PROPAGATE and the new head state is SIGNAL, any one of which meets the requirements for waking up the subsequent nodes. After waking up T4, you will also get success. Execute setHeadAndPropagate. The new head state is SIGNAL so it executes doReleaseShared and meets the wake-up successor node condition. Wake up T5, noting that there are only two resources here, but it will wake up one more thread. After waking up T5 executes doAcquireShared's spin code, but the resource acquisition fails. Then modify the state of the head node to SIGNAL, and the thread continues to wait.

Allows one or more threads to wait for a set of operations to complete in another thread. The comment translates its function as a latch, and all threads calling await wait before the lock until it calls countDown; Or CountDownLatch initialized to N can be used to have a thread wait until the N thread has completed an action, or an action has completed N times.

To implement CountDownLatch, it should also be simple to use AQS state counts and synchronous queues to block threads.

Source implementation is unfair and unfair, and a subclass Sync inheriting AbstractQueuedSynchronizer is implemented internally.

Sync.tryAcquireShared

This tryAcquireShared is different from the previous synchronizer implementations. Participating in acquires does not require use or change the state value. Instead, it judges that when the state is 0, the operation to obtain succeeds or the acquisition fails.

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

Sync.tryReleaseShared

The custom release operation ensures that the state value can only be reduced to 0, not to a negative number, and is still a loop + cas operation implementation.

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

await and countDown

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void countDown() {
    sync.releaseShared(1);
}


Await is get, countDown is release. When initializing CountDownLatch, you need to pass in a countDown value, which indicates the number of times a thread must call countDown before passing await. The initialization state is countDown, and the value of the state is reduced by 1 after each countDown execution.

From the previous implementation code, you can see that after the initialization of CountDownLatch, executing await before the state becomes 0 will enter the synchronization queue and wait. Each time countDown is executed, the state will be reduced by 1 and the wake-up waiting thread will be triggered when the state becomes 0.

Two typical uses of the CountDownLatch class are written on the annotation:

  • 1, a thread needs to wait for one or more threads to complete before it starts running. A thread executes await wait first, and then countDown this number of times, depending on the initialization value, can be executed by either one thread or multiple threads, and the state is reduced to less than or equal to zero to trigger a thread waiting before waking up.
  • 2. You can implement a scenario where multiple threads execute in parallel, waiting on await before they start execution, and then waking up multiple waiting threads almost simultaneously with a countDown method.

About PROPAGATE

The PROPAGATE status of waitStatus in shared mode is a bit abrupt, but when you check online, there's a bug story, and you'll take a closer look at the bug scenario and fix code. I find it very helpful to understand the competitive scenario that arises when a head successor node is waked up by a release operation in shared mode and a wake-up thread wakes up the successor node after a successful wake-up.

bug description: link

Repair code record: link

There are reproducible test codes and detailed descriptions in the bug description that can be combined to help understand.

Doug Lea said:

There was a race condition that could prevent a releaseShared from being propagated through to a parked thread, leaving the thread parked while a permit existed.

A little explanation here:

Look at the original code to determine if release will wake up a subsequent node: (h!= null && h.waitStatus!= 0, whether setHeadAndPropagate will wake up a subsequent node: propagate > 0 && node.waitStatus!= 0.

When a release occurs and the waked node gets the last resource (propagate=0), the head's state is 0, and another release occurs, which does not satisfy the condition that the wake-up succeeding node exits. The preceding waked thread executes setHeadAndPropagate and does not wake the succeeding node. The resource is available at this time, but the wake-up succeeding node cannot be triggered.

So the PROPAGATE state is introduced, which is when the head node is judged to be 0 while waiting for a node in the queue. This means that if the successor node of the head must have been awakened and executed, the head node will be changed to PROPAGATE state. If it is the last resource (propagate=0), but also if h.waitStatus < 0 still triggers wakeup. In the unparkSuccessor method, any state less than 0 is reset to 0, and PROPAGATE does not affect other processes.

summary

1. The difference between shared mode and exclusive mode in implementation is that shared mode needs to handle concurrent release and concurrent wake-up scenarios, while exclusive mode only needs to handle single wake-up and single wake-up scenarios. So success in shared mode also triggers wakeup of subsequent nodes.

2, AQS supports both interrupt and non-interrupt modes, and the timeout API handles interrupts, while interrupt scenarios change the state of nodes to cancel instead of deleting them at that time. These canceled nodes are removed from the queue when they wake up.

3. The synchronizer Semaphore and CountDownLatch based on AQS share mode are described above. It is very simple to implement these two synchronizers based on AQS internal mechanism, which is also the advantage of AQS.

4. This is the second article on AQS source code, and two more articles are expected. The next one focuses on the implementation of the condition queue.

Keywords: IT computer

Added by cjdesign on Sat, 26 Feb 2022 19:25:07 +0200