Line by line source code analysis clearly AbstractQueuedSynchronizer

1, Foreword

This is the last article in the AQS series. In the first article, we analyzed the core of AQS through ReetrantLock fair locks. In the second article, we talked about unfair locks and conditions, and wrote some knowledge about thread interrupts. In this article, we will explain the use of AQS sharing mode. With the previous knowledge, I believe you will feel very relaxed.

This article first uses CountDownLatch to clarify the sharing mode, and then follows the source code of other AQS related classes CyclicBarrier and Semaphore.

2, CountDownLatch

CountDownLatch is a typical AQS sharing mode. It is a high-frequency class. latch means door bolt and fence in Chinese. I won't talk nonsense about how to explain it. Please feel free to see two examples to know where and how to use it.

Let's take a look at the example given by Doug Lea in java doc. This example is very practical. I often write this code.

Suppose we have N (N > 0) task, then we will initialize a CountDownLatch with N, and then pass the reference of latch to each thread. After each thread completes the task, calling latch.countDown() represents a task completed. The thread calling the method of latch.await() will block until all tasks are completed.

class Driver2 { // ...
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = Executors.newFixedThreadPool(8);

        // Create N tasks and submit them to the thread pool for execution
        for (int i = 0; i < N; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        // This method will not return until all tasks are completed
        doneSignal.await();           // wait for all to finish
    }
}

class WorkerRunnable implements Runnable {
    private final CountDownLatch doneSignal;
    private final int i;

    WorkerRunnable(CountDownLatch doneSignal, int i) {
        this.doneSignal = doneSignal;
        this.i = i;
    }

    public void run() {
        try {
            doWork(i);
            // When the task of this thread is completed, call the countDown method
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

Therefore, CountDownLatch is very practical. We often split a large task, and then start multiple threads for execution. After all threads are executed, we can perform other operations. In this example, only the main thread called the await method.

Let's look at another example. This example is typical. It uses two CountDownLatch:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        // Insert some code here to ensure that each thread above starts first before executing the following code.
        doSomethingElse();            // don't let run yet
        // Because here N == 1, all await methods can pass as long as they are called once
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        // Wait for all tasks to finish
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            // In order to make all threads start the task at the same time, we let all threads block here first
            // When everyone is ready, open the door bolt
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

In this example, doneSignal is the same as the first example. Let's talk about startSignal here. N newly opened threads call startSignal.await() to block and wait. They block on the fence. Only when the conditions are met (startSignal.countDown()), can they pass through the fence at the same time. The purpose is to make all threads stand on the starting line.

If only one thread calls the await method to wait for the task to complete, CountDownLatch will be much simpler. Therefore, readers of subsequent source code analysis must build a scenario in their mind: there are m threads doing tasks and n threads waiting for these m threads to complete tasks on a fence until all m tasks are completed, n threads pass through the fence at the same time.

We use the following program to analyze the source code. t1 and t2 call the countDown() method, and t3 and t4 call the await method to block:

import java.util.concurrent.CountDownLatch;

/**
 * @author chenbjf
 * @version 1.0
 * @date 2021/10/28 13:42
 */
public class CountDownLatchExample {

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(2);

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            }
        });

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                latch.countDown();
            }

        });

        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println("thread3 from await Wake up in");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            }
        });

        Thread thread4 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    latch.await();
                    System.out.println("thread4 from await Wake up in");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            }
        });
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
    }
}

After about 10 seconds, the above program will return the following results (the order may not be the following):

thread3 from await Wake up in
thread4 from await Wake up in

Next, let's take a look at the constructor of CountDownLatch, as follows:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

// The old routine encapsulates a Sync class internally, which inherits from AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    //The state attribute in AQS is set to the count value of the input parameter.
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
All codes are routines. First analyze the routines: AQS Inside state It's an integer value. Use one here int count Parameter initialization actually sets this value, and all calls await The wait thread of the method will hang, and then some other threads will do it state = state - 1 Operation, when state While decreasing to 0, that will state The thread reduced to 0 will be responsible for waking up all calls await Method. It's all routine, just Doug Lea The routine is very deep and the code is very clever. Otherwise, we don't need to analyze the source code.

For CountDownLatch, we only need to care about two methods, one is the countDown() method and the other is the await() method. Every time the countDown() method is called, the state will be reduced by 1 until the value of state is 0; Await is a blocking method. When the state is reduced to 0, the await method will return. Await can be called by multiple threads. Readers should have a diagram in mind at this time: all threads calling await method block in the blocking queue of AQS, wait for the conditions to be met (state == 0), and wake up the threads one by one from the queue.

Next, we follow the process step by step: wait for await first, then wake up, and the await method returns. First, let's look at the await() method, which represents thread blocking, waiting for the value of state to be reduced to 0.

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // This is also an old routine. I said it in the interruption section of Chapter 2
    if (Thread.interrupted())
        throw new InterruptedException();
    // When await is called by t3 and t4, the state is greater than 0 (state is 2 at this time).
    // That is, the if returns true, and then look inside
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

//You can know from the name that this method is to obtain the shared lock, and this method is interruptible (throw InterruptedException to exit this method when interrupting).
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //First add the thread to the blocking queue
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // As above, this method returns - 1 as long as state is not equal to 0
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //Then come here. Now park is here
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Let's carefully analyze this method. After thread t3 joins the queue after step 1 addWaiter, we should get this:

Since the tryAcquireShared method will return - 1, the if (r > = 0) branch will not go in. When shouldParkAfterFailedAcquire is reached, t3 set the waitStatus value of head to - 1, as follows:

Then, when entering parkAndCheckInterrupt, t3 hangs. Let's analyze t4 queuing. t4 will set the waitStatus of the node where the precursor node t3 is located to - 1. After t4 queuing, it should be like this:

Then, t4 also hangs. Next, t3 and t4 wait to wake up. Next, let's look at the wake-up process. To enrich the following diagram, let's assume that CountDownLatch is initialized with 10.

Let's look at the specific process step by step. First, let's look at the countDown() method:

public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    // Tryrereleaseshared returns true only when state is reduced to 0
    // Otherwise, simply state = state - 1, and the countDown() method ends
    //    The operation that reduces the state to 0 is the most complex. Continue
    if (tryReleaseShared(arg)) {
        // Wake up the await thread
        doReleaseShared();
        return true;
    }
    return false;
}

// This method is very simple. Use the spin method to reduce state by 1
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

//The countDown method is to reduce the state value by 1 every call. If the state is reduced to 0, call the following method to wake up the threads in the blocking queue:
// When this method is called, state == 0
// This method does not need to look at all the code first. Go down to the place where I write comments according to the train of thought. We will go through a process first and analyze others carefully later
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
             // When t3 joins the queue, the waitStatus of the head node has been set to Node.SIGNAL (- 1)
            if (ws == Node.SIGNAL) {
                // Set the waitStatue of head to 0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // Here, wake up the successor node of the head, that is, the first node in the blocking queue
                // Here, that is, wake up t3
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Once t3 is awakened, we continue to return to the code of await and parkAndCheckInterrupt returns. We don't consider the interrupt first:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 2. Here is the next step
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 1. This method returns after waking up
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

//Next, t3 will go to the setHeadAndPropagate(node, r) method, first occupy the head, and then wake up other threads in the queue:
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    // The following is to wake up the node after the current node, that is, t3 has woken up and wake up t4 immediately
    // Similarly, if there is t5 after t4, when t4 wakes up, it wakes up t5 immediately
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // This method is used again, but now the head is not the original empty node, but the t3 node
            doReleaseShared();
    }
}

Let's go back to this method. Next, let's analyze the method of doReleaseShared. According to the process, the head node is t3 node at this time:

// When this method is called, state == 0
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 1. h == null: indicates that the blocking queue is empty
        // 2. h == tail: indicates that the header node may be the header node just initialized,
        //   Or an ordinary thread node, but since this node is the head node, it means that it has been awakened and there are no other nodes in the blocking queue
        // Therefore, it is not necessary to wake up the successor node in these two cases
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t4 set the waitStatus of the head node (T3 at this time) to Node.SIGNAL (- 1)
            if (ws == Node.SIGNAL) {
                // For the scenario of CAS failure here, please see the following interpretation
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // Here, wake up the successor node of the head, that is, the first node in the blocking queue
                // Here, that is, wake up t4
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     // The scenario of CAS failure is: when the execution reaches here, just one node joins the queue, and the ws will be set to - 1
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // If the previous wake-up thread has occupied the head when we get here, then recycle
        // Otherwise, if the head does not change, exit the loop,
        // Does exiting the loop mean that other nodes in the blocking queue will not wake up? Of course not. This method will still be called after waking up the thread
        if (h == head)                   // loop if head changed
            break;
    }
}

Let's analyze the last if statement before we can explain why the first CAS may fail:

  • h == head: indicates that the head node has not been occupied by the thread just awakened with unparksuccess (t4 here). At this time, break exits the loop.
  • h != head: the head node is occupied by the thread that just woke up (t4 in this case). Then, the next cycle will be resumed to wake up the next node (t4 in this case). We know that when t4 is awakened, it will actually take the initiative to awaken t5, t6, t7... So why do we need to carry out the next cycle to awaken t5? I think it's because of throughput.

If the scenario 2 above is satisfied, we can know why the CAS operation compareAndSetWaitStatus(h, Node.SIGNAL, 0) above fails? Because when the current for loop thread arrives here, the newly awakened thread t4 may just arrive here, so CAS may fail. In the first round of the for loop, t4 will wake up. After t4 wakes up, it will set itself as the head node. If the for loop runs to if (h == head) after t4 sets the head node, it will return false and the for loop will enter the next round. After t4 wakes up, it will also enter this method. Then the second round of the for loop and t4 may meet in this CAS, and only one will succeed.

3, CyclicBarrier

It literally means "reusable fence" or "periodic fence". In short, it is not useless after using it once. Compared with CountDownLatch, CyclicBarrier is much simpler. Its source code has nothing profound. It is a combination of ReentrantLock and Condition. Look at the following schematic diagram. Is CyclicBarrier very similar to CountDownLatch, but CyclicBarrier can have more than one fence because its fence can be reused (Cyclic).

Firstly, the source code implementation of CyclicBarrier is quite different from that of CountDownLatch. CountDownLatch uses AQS based sharing mode, while CyclicBarrier is implemented based on Condition.

Because the source code of CyclicBarrier is relatively much simpler, as long as readers are familiar with the previous analysis on Condition, the source code here is no pressure, just a few special concepts.

First, use a diagram to describe some concepts in CyclicBarrier and its basic use process:

Looking at the figure, we also know that the most important source code of CyclicBarrier is the await() method.

public class CyclicBarrier {
    // We said that the CyclicBarrier can be reused. We regard each use from the beginning to passing through the fence as a "generation" or "a cycle"
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();

    // CyclicBarrier is based on Condition
    // Condition means "condition". The waiting thread of CyclicBarrier passes through the barrier. The "condition" means that everyone is on the fence
    private final Condition trip = lock.newCondition();

    // Number of threads participating
    private final int parties;

    // If this is set, it means that the corresponding operation should be performed before crossing the fence
    private final Runnable barrierCommand;

    // Current "generation"
    private Generation generation = new Generation();

    // The number of threads that have not reached the fence. This value is initially parties, and then decreases
    // Number of threads that have not reached the fence = parties - number of threads that have reached the fence
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

First, let's look at how to start a new generation:

// Start a new generation. When the last thread reaches the fence, call this method to wake up other threads and initialize the "next generation"
private void nextGeneration() {
    // First, you need to wake up all the threads waiting on the fence
    trip.signalAll();
    // Update the value of count
    count = parties;
    // Regenerate "next generation"
    generation = new Generation();
}

Opening a new generation is similar to re instantiating a CyclicBarrier instance to see how to break a fence:

private void breakBarrier() {
    // Set the status broken to true
    generation.broken = true;
    // Reset count to the initial value parties
    count = parties;
    // Wake up all threads that are already waiting
    trip.signalAll();
}

These two methods are used later. Now start to analyze the most important wait through the fence method await method:

/ Without timeout mechanism
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
// With timeout mechanism, TimeoutException exception will be thrown if timeout occurs
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

Keep looking inside:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    // First get the lock, and then remember to release the lock in finally
    // If we remember the condition section, we know that condition await() will release the lock, and we need to re acquire the lock when waking up by signal()
    lock.lock();
    try {
        final Generation g = generation;
        // Check whether the fence is broken. If it is broken, throw a BrokenBarrierException
        if (g.broken)
            throw new BrokenBarrierException();
        // Check the interrupt status. If it is interrupted, throw an InterruptedException exception
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // index is the return value of this await method
        // Note here that this is the value obtained after decrement of count
        int index = --count;

        // If it is equal to 0, it means that all threads are on the fence and ready to pass
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // If the operation to be performed before passing through the fence is specified during initialization, it will be executed here
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // If ranAction is true, there is no abnormal exit when executing command.run()
                ranAction = true;
                // Wake up the waiting thread and start a new generation
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    // If an exception occurs during the execution of the specified operation, the fence needs to be broken
                    // As we said before, breaking the fence means waking up all waiting threads, setting the broken to true and resetting the count to parties
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // If await is called by the last thread, the above returns
        // The following operations are performed for threads that are not the last to reach the fence
        for (;;) {
            try {
                // If there is a timeout mechanism, call the await method of the Condition with timeout and wait until the last thread calls await
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // If so, it means that the waiting thread is interrupted during await (Condition await)
                if (g == generation && ! g.broken) {
                    // Break the fence
                    breakBarrier();
                    // After breaking the fence, re throw the InterruptedException exception to the method called by the outer layer
                    throw ie;
                } else {
                    // Here, explain G= Generation indicates that a new generation has been generated, that is, the await execution of the last thread is completed,
                    // There is no need to throw an InterruptedException exception at this time. Just record the interrupt information
                    // Or if the fence has been broken, the InterruptedException exception should not be thrown,
                    // Instead, a BrokenBarrierException is thrown later
                    Thread.currentThread().interrupt();
                }
            }

              // After waking up, check whether the fence is "broken"
            if (g.broken)
                throw new BrokenBarrierException();

            // Except for exceptions, this for loop will exit from here
            // We should be clear that the last thread will call nextGeneration to start a new generation after executing the specified task (if any)
            // Then release the lock, and other threads get the lock from the await method of Condition and return. When they get here, they will actually meet G= Generation
            // When is not satisfied? If an exception is thrown during the execution of barrier command, the barrier breaking operation will be executed,
            // Set the broken to true and wake up these threads. These threads will throw a BrokenBarrierException from the if (g.broken) branch above and return
            // Of course, there is a last possibility, that is, await timeout. In this case, it will not be returned from the if branch exception above or from here, and the following code will be executed
            if (g != generation)
                return index;

            // If you wake up and find the timeout, break the fence and throw an exception
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

Well, I think I should make it clear. I almost didn't miss any line of code, did I?

Let's start the finishing work.

First, let's see how many threads have reached the fence and are waiting:

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

It is very simple to judge whether a fence has been broken. Just look at the value of broken:

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

When we talked about await, we almost made it clear when the fence will be broken. The summary is as follows:

  1. Interrupt, we said that if an interrupt occurs to a waiting thread, it will break the fence and throw an InterruptedException exception;
  2. Timeout, break the fence and throw a TimeoutException exception;
  3. The specified operation threw an exception, which we also said earlier.

Finally, let's see how to reset a fence:

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

Let's imagine that if thread parties = 4 is specified during initialization, the first three threads call await, and we call reset method before the fourth thread calls await, what will happen?

First, break the fence, which means that all waiting threads (three waiting threads) will wake up, and the await method will return by throwing a BrokenBarrierException exception. Then start a new generation, reset count and generation, which is equivalent to zero.

How about CyclicBarrier? The source code is very simple.

4, Semaphore

With the foundation of CountDownLatch, it will be much easier to analyze Semaphore. What is Semaphore? It is similar to a resource pool (readers can compare it to a thread pool). Each thread needs to call the acquire() method to obtain resources before execution. After execution, it needs to release resources to other threads.

You can probably guess that Semaphore is actually the use of shared locks in AQS, because each thread shares a pool.

Routine interpretation: when creating a Semaphore instance, you need a parameter permissions, which can basically be determined to be set to the state of AQS. Then, when each thread calls acquire, execute state = state - 1, and when releasing, execute state = state + 1. Of course, when acquiring, if state = 0, it indicates that there are no resources, You need to wait for other threads to release.

Constructor

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Similar to ReentrantLock, fair strategy and unfair strategy are used here. See acquire method:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

These methods are also old routines. You basically understand them. There are two more acquire methods that can pass parameters, but you also understand them. If we need to obtain more than one resource at a time, we will use this.

Next, let's look at the acquireunteruptibly () method that does not throw an InterruptedException exception:

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

As mentioned earlier, Semaphore can be divided into fair strategy and unfair strategy. Let's compare the two tryAcquireShared methods:

// Equity Strategy:
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // The difference is whether it will judge whether there are threads queuing before CAS subtraction
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// Unfair strategy:
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

It's also an old routine, so from the perspective of source code analysis, we don't really need to care about whether it's fair strategy or unfair strategy. The difference between them is often just one or two lines.

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

When tryAcquireShared(arg) returns less than 0, it indicates that the state is less than 0 (there are no resources). At this time, acquire cannot get the resources immediately. It needs to enter the blocking queue and wait. Although many codes are posted, I don't care about this:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

I won't introduce this method. After the thread is suspended, wait for resources to be released. Next, we'll look at the release method:

// Task introduction, release a resource
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        // Overflow, of course, we usually don't use such a large number
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

The tryrereleaseshared method always returns true, followed by dorreleaseshared. This is also a familiar method. I'll post the code and don't analyze it. This method is used to wake up all waiting threads:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

5, Summary

After writing here, I finally finished AbstractQueuedSynchronizer. Doug lea really exists like a God for Java concurrency. In the future, we will come into contact with a lot of Doug Lea's code. I hope all of us can constantly polish our technology in the direction of the great God, less tall architecture and more real excellent code.

Keywords: Java

Added by Josh18657 on Fri, 29 Oct 2021 09:18:50 +0300