AQS analysis of Java JUC Abstract synchronization queue

AQS resolution of abstract synchronization queue

AQS - underlying support for locks

AbstractQueuedSynchronizer Abstract synchronization queue is called AQS for short. It is the basic component to implement the synchronizer, and the underlying lock in the contract is implemented using AQS. Let's take a look at the class diagram structure of AQS.

It can be seen from the figure that AQS is a FIFO two-way queue, which records the elements at the head and tail of the queue through nodes head and tail. The queue element type is Node.

The thread variable in the Node is used to store the threads entering the AQS queue, and SHARED is used to mark that the threads are blocked and suspended when obtaining SHARED resources and put into the AQS queue; EXCLUSIVE is used to mark that the thread is suspended and put into the AQS queue when obtaining EXCLUSIVE resources; waitStatus records the current thread waiting status, which can be canceled (thread cancellation), signal (thread needs to be awakened), condition (thread waits in the condition queue), and propagate (notify other nodes when releasing SHARED resources); prev records the predecessor Node of the current Node, and next is the successor Node.

A single state information state is maintained in AQS, and the value can be modified through getState, setState and compareAndSetState functions.

  • For the implementation of ReentrantLock, state can represent the number of times the current thread obtains the lock;
  • For the readwrite lock ReentrantReadWriteLock, the high 16 bits of state indicate the read status, that is, the number of times the lock is obtained, and the low 16 bits indicate the number of times the thread can re-enter after obtaining the write lock;
  • For Semaphore, state indicates the number of currently available signals;
  • For CountDownlatch, state is used to represent the current value of the counter;

AQS has an internal class ConditionObject, which is used to realize thread synchronization in combination with locks. ConditionObject can directly access variables inside the AQS object, such as state value and queue.

ConditionObject is a condition variable. Each condition variable corresponds to a condition queue (one-way linked list queue), which is used to store the blocked threads after calling the await method of the condition variable. firstWaiter represents the first element of the queue and lastWaiter represents the last element of the queue.

Here, let's talk about several statuses represented by waitStatus.

  • Cancelled (value: 1): indicates that the current node has cancelled scheduling. When a timeout or is interrupted (in response to an interrupt), it will trigger a change to this state, and the node entering this state will not change again.
  • SIGNAL (value: - 1): indicates that the successor node is waiting for the current node to wake up. When the successor node joins the queue, the status of the predecessor node will be updated to SIGNAL.
  • CONDITION (value: - 2): indicates that the node is waiting on the CONDITION. When other threads call the signal() method of the CONDITION, the node in the CONDITION state will be transferred from the CONDITION queue to the synchronization queue, waiting to obtain the synchronization lock.
  • Propagate (value: - 3): in the sharing mode, the predecessor node will wake up not only its successor node, but also the successor node.
  • The value is: 0: the default state when new nodes join the queue.

For AQS, the key to thread synchronization is to operate the state value state. According to whether the state belongs to a thread, the mode of operating state is divided into exclusive and shared.

Obtain resources in exclusive mode through void acquire(int arg) and void acquireinterruptible (int ARG).

Release resources in exclusive mode: boolean release(int arg).

In the sharing mode, resources are obtained through void acquiresered (int ARG) and void acquireseredinterruptibly (int ARG).

In the sharing mode, release resources through: boolean releaseShared(int arg).

In the exclusive mode, the resource obtained is bound to a specific thread, that is, if a thread obtains a resource, it will be marked as obtained by this thread, and other threads will find that the resource is not held by themselves by operating state, and then block.

For example, in the implementation of the exclusive lock ReentrantLock: when a thread obtains the ReentrantLock lock, first use the CAS operation in AQS to change the state value from 0 to 1, and then set the current lock holder as the current thread. When the thread obtains the lock again and finds that it is the lock holder, it will change the state value from 1 to 2, that is, set the number of reentries, When another thread obtains a lock and finds that it is not the holder of the lock, it will be put into the AQS blocking queue and suspended.

The resources obtained by sharing are not related to specific threads. When multiple threads request resources, they obtain resources through CAS. When one thread obtains resources and other threads obtain them again, if the current resources can meet the needs, they only need to be obtained through CAS.

For example, Semaphore semaphores. When a thread obtains semaphores through the acquire method, it will first see whether the current semaphores meet the needs. If not, it will put the current thread into the blocking queue. If it meets the needs, it will obtain semaphores through spin CAS.

In exclusive mode, the process of obtaining and releasing resources is as follows:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

When a thread calls the acquire method to obtain exclusive resources, it first attempts to obtain resources by using the tryAcquire method. Specifically, it sets the value of the state variable state and returns directly upon success; If it fails, the current thread will be encapsulated as Node The exclusive Node is then inserted into the tail of the AQS blocking queue and calls locksupport Park (this) suspends itself.

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

When a thread invokes the release method, it attempts to release resources by using tryRelease operation. This is also the value setting the state variable state, then calling LockSupport.. The unpark (thread) method activates a blocked thread in the AQS queue. The activated thread uses tryAcquire to try to see whether the current variable state value can meet its own needs. If it meets its needs, it continues to execute downward. Otherwise, it will still be put into the queue and suspended.

📢 It should be noted that the AQS class does not provide tryAcquire and tryRelease methods, which need to be implemented by specific subclasses. CAS algorithm is used to try to modify the state value according to different scenarios, and what is the meaning of the increase or decrease of the state value.

For example, the exclusive lock ReentrantLock inherited from the AQS implementation. When the status is 0, it means that the lock is idle, and when it is 1, it means that the lock has been occupied. When overriding tryAcquire, you need to use CAS algorithm internally to check whether the current state is 0. If it is 0, use CAS to set it to 1, and set the holder of the current lock as the current thread, and then return true. If CAS fails, return false.

In the sharing mode, the process of obtaining and releasing resources is as follows:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

When a thread calls acquiresshared to obtain shared resources, it will first try to obtain resources through tryacquiresshared. Specifically, it will set the value of the state variable state. Success will be returned directly. If failure, the current thread will be encapsulated as type Node The shared Node is inserted at the end of the AQS blocking queue and locksupport is used Park (this) suspends itself.

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

When a thread calls releaseShared, it releases resources by trying the tryrereleaseshared method. It also sets the value of the state variable state, and then uses locksupport Unpark (thread) to activate a blocked thread in the AQS blocking queue. The activated thread uses the tryrereleaseshared method to check whether the current state still meets its needs. If so, the activated thread continues to execute downward. Otherwise, it will still be placed in the AQS queue and suspended.

📢 It should also be noted that the AQS class does not provide available tryAcquireShared and tryrereleaseshared methods, which need to be implemented by subclasses.

For example, when rewriting tryAcquireShared, the read lock inherited from the readwrite lock ReentrantReadWriteLock implemented by AQS first checks whether the write lock is held by other threads. If so, it directly returns false. Otherwise, CAS is used to increment the upper 16 bits of state (in ReentrantReadWriteLock, the upper 16 bits of state are the number of times to obtain the read lock).

⚠️ In addition to rewriting the methods described above, the lock implemented based on AQS also needs to rewrite the isHeldExclusively method to judge whether the lock is exclusive or shared by the current thread.

In addition, we found that acquireinterruptible (int ARG) and acquiresharedinterruptible (int ARG) all have the interrupt keyword. So what's the difference between with and without this keyword?

In fact, a method without the interrupt keyword means that it will not respond to the interrupt, that is, when a thread calls a method without interrupt to obtain resources or fails to obtain resources and is suspended, other threads interrupt the thread, then the thread will not throw an exception because it is interrupted, continue to obtain resources or be suspended, that is, it will not respond to the terminal and ignore the interrupt.

With the interruptable keyword, an InterruptedException exception will be thrown and returned.

Next, let's take a look at how AQS maintains queues, mainly viewing queue joining operations.

When a thread fails to acquire a lock, the thread will be converted to a Node node, and then the Node will be inserted into the AQS blocking queue using the enq(final Node node) method.

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;//(1)
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))//(2)
                tail = head;
        } else {
            node.prev = t;//(3)
            if (compareAndSetTail(t, node)) {//(4)
                t.next = node;
                return t;
            }
        }
    }
}

In the above code, in the first cycle, when the AQS queue status is as shown in the figure (default), the head and tail point to null; When code (1) is executed, node T points to the tail node. The queue status is shown in step (1) in the figure. At this time, t is null. When code (2) is executed, CAS algorithm is used to set a sentinel node as the head node. If the setting is successful, the tail node also points to the sentinel node. At this time, the queue status is shown in step (2) in the figure.

Next, we also need to insert the node node, so we execute code (1) after the second cycle. The queue status is shown in step (3) below; Then execute code (3) to set the precursor node of node as the tail node, and the queue status is shown in step (4) below; Then, set the node node as the tail node through the CAS algorithm. After CAS succeeds, the queue status is shown in step (5) below; Then set the back drive node of the original tail node as the node node to complete the two-way linked list. The queue status is shown in step (6) below.

AQS - support for conditional variables

Synchronized and conditional variables can synchronize threads. The difference is that synchronized can only synchronize with a shared variable notify or wait method at the same time, while a lock of AQS can correspond to multiple conditional variables.

Next, let's look at an example.

public static void main(String[] args) {
    final ReentrantLock lock = new ReentrantLock();// (1)
    final Condition condition = lock.newCondition();// (2)
    lock.lock(); // (3)
    try {
        System.out.println("begin wait...");
        condition.await(); // (4)
        System.out.println("end wait...");
    } catch (Exception e) {
        lock.unlock(); // (5)
    }
    lock.lock(); // (6)
    try {
        System.out.println("begin signal...");
        condition.signal(); // (7)
        System.out.println("end signal...");
    } catch (Exception e) {
        lock.unlock(); // (8)
    }
}

This code first creates another exclusive lock ReentrantLock object, which is also implemented based on AQS.

Step 2: use the newCondition() method of the created Lock object to create a ConditionObject variable, which is a condition variable corresponding to the Lock lock.

📢 A Lock object can create multiple condition variables.

The third step is to obtain the exclusive lock, and then the fourth step calls the await() method of the condition variable to block and suspend the current thread. When other threads call the signal() method of the condition variable, the blocked thread will return from await. Note that, like calling the wait() method of Object, if it is called without obtaining the lock, the IllegalMonitorStateException exception exception will be thrown. Step 5 release the acquired lock.

In the above code, lock The function of new condition () is to create a ConditionObject declared inside AQS. ConditionObject is an internal class of AQS, which can access variables (such as state variable) and methods inside AQS. A condition queue (one-way linked list queue) is maintained inside each condition variable to store the blocked threads when calling the await() method of the condition variable. Note that this conditional queue is not the same as the AQS queue.

Let's take a look at the source code of await() method:

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();

            // Create a new node and insert it at the end of the condition queue (1)
            Node node = addConditionWaiter();
            // Release the lock and return to the status bit (2)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
                      // Call the park method to block and suspend the current thread (3)
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //...
        }

In this method, when the thread calls the await() method of the condition variable, it will internally construct a node of type The node node of the condition, and then insert the node into the end of the condition queue. After that, the current thread will release the acquired lock, that is, the operation state value, and be blocked and suspended.

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

When another thread calls the signal method of the condition variable (the lock() method of the lock must be called first to obtain the lock), it will internally remove a thread node in the condition queue from the condition queue and put it into the blocking queue of AQS, and then activate this thread.

📢 It should be noted that AQS only provides the implementation of ConditionObject and does not provide the newCondition function. Subclass implementation is required.

Let's take a look at how to put the condition queue after the await() method is blocked.

private Node addConditionWaiter() {
    //Get tail node
    Node t = lastWaiter;
    // If lastWaiter is not empty, check whether the queue has nodes cancelled
    if (t != null && t.waitStatus != Node.CONDITION) {
        //Traverse the condition queue node and remove the cancelled node
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //Use the current thread to build a node representing the current thread
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node; //No tail node is inserted into the head node
    else
        t.nextWaiter = node;//The trailing node of the tail node is equal to the current node
    lastWaiter = node; //The tail node is equal to the current node
    return node;
}

📢 Note: when multiple threads call lock When using the lock () method, only one thread obtains the lock, and other threads will be converted to the Node. The Node will insert into the corresponding AQS blocking queue and try to obtain the lock.

If the thread that obtains the lock calls the await method of the corresponding condition variable, the thread will release the obtained lock and be converted into a Node node to insert it into the condition queue corresponding to the condition variable.

When another thread calls the signal or signalAll method of the condition variable, it will move one or all nodes in the condition queue to the blocking queue of AQS and wait for the opportunity to obtain the lock.

Summary: a lock corresponds to an AQS blocking queue and multiple condition variables. Each condition variable has its own condition queue.

Implement custom exclusive locks

/**
 * @author Mystery jack
 * Official account: Java rookie programmer
 * @date 2022/1/20 Implement custom exclusive locks
 * @Description
 */
public class NonReentrantLock implements Lock, Serializable {

    //Custom implementation AQS
    private static class Sync extends AbstractQueuedSynchronizer {
        //Lock held
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        //If state == 0, try to acquire the lock
        @Override
        protected boolean tryAcquire(int arg) {
            assert arg == 1;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //Attempt to release lock setting state == 0
        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //Provide conditional variable interface
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

Implement producer consumer according to custom lock

/**
 * @author Mystery jack
 * Official account: Java rookie programmer
 * @date 2022/1/20
 * @Description Producer consumer model
 */
public class LockTest {

    final static NonReentrantLock lock = new NonReentrantLock();
    final static Condition consumerCondition = lock.newCondition();
    final static Condition producerCondition = lock.newCondition();
    final static Queue<String> QUEUE = new LinkedBlockingQueue<>();
    final static int QUEUE_SIZE = 10;

    public static void main(String[] args) {
        LockTest lockTest = new LockTest();
        // Start consumer thread
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 2; j++) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Consumer spending" + lockTest.get());
                }
            }, "consumer_" + i).start();
        }
        // Start producer thread
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lockTest.put("goods-" + new Random().nextInt(1000));
                }
            }, "product-" + i).start();
        }

    }

    private void put(String name) {
        //Get exclusive lock
        lock.lock();
        try {
            //If the queue is full, wait
            while (QUEUE.size() == QUEUE_SIZE) {
                producerCondition.await();
            }
            QUEUE.add(name);
            System.out.println(Thread.currentThread().getName() + "Produced" + name);
            //Wake up consumer thread
            consumerCondition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private String get() {
        String ret = "";
        //Get exclusive lock
        lock.lock();
        try {
            //If the queue is empty, wait
            while (QUEUE.size() == 0) {
                consumerCondition.await();
            }
            //Consume an element
            ret = QUEUE.poll();
            //Wake up production thread
            producerCondition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return ret;
    }

}

Keywords: Java Concurrent Programming JUC aqs

Added by budder on Fri, 21 Jan 2022 01:02:47 +0200