Abstractqueuedsynchronizer (AQS) for Java Concurrent Programming

Abstractqueuedsynchronizer (AQS):

  1. The JDK concurrency package (package name: java.util.concurrent, hereinafter referred to as JUC) provides many tool classes for concurrent operations, such as ReentrantLock, CountDownLatch, etc. The foundation of these concurrency utility classes is abstractqueuedsynchronizer

  2. *AQS maintains a shared resource and two queues: * one is a synchronization queue; One is the condition queue.

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        //Head node of synchronization queue
        private transient volatile Node head;
    
        //Tail node of synchronization queue
        private transient volatile Node tail;
    
        //Synchronize the shared resources of the queue and the synchronization status of the queue
        private volatile int state;
    }
    
  3. Main information of Node class;

    static final class Node {
        //Static variable that identifies the node waiting for resources in shared mode
        static final Node SHARED = new Node();
        //The identity node waits for resources in exclusive mode
        static final Node EXCLUSIVE = null;
        //In the waiting state, the node cancels waiting for resources
        static final int CANCELLED =  1;
        //Wait state, identifying that the successor node needs to wake up
        static final int SIGNAL    = -1;
        //Wait state, which indicates that the thread is in a conditional wait state
        static final int CONDITION = -2;
        //The wait state identifies that the thread obtains resources in the shared mode. The behavior of releasing the lock will be propagated to subsequent nodes. This state acts on the head node
        static final int PROPAGATE = -3;
        //The node waiting state is one of the above 4 states, or 0
        volatile int waitStatus;
    
        //Precursor node of node
        volatile Node prev;
    
        //Successor node of node
        volatile Node next;
    
        //Thread corresponding to node
        volatile Thread thread;
    
        //During conditional waiting, identify the node of the next waiting condition and point to the next node in the condition queue
        //Alternatively, identify the sharing mode
        Node nextWaiter;
    }
    
  4. Synchronization queue is realized by two-way linked list. It is mainly used to record threads waiting to obtain shared resources

  5. The conditional queue is a one-way linked list structure, and the elements in the linked list are also nodes, except that the elements in the conditional queue use the nextWaiter of Node to point to the next element.

  6. Start with the protected method provided by AQS, analyze the working principle of AQS:

    /**
        *An attempt was made to acquire a shared resource in exclusive mode
        *@param arg Indicates the number of resources to obtain
        *@return true Indicates that the acquisition was successful and false indicates that the acquisition failed
        **/
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        *Attempt to free a shared resource in exclusive mode
        *@param arg Indicates the number of resources released
        *@return true Indicates that the release was successful, false indicates that the release failed
        **/
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        *Try to get shared resources in shared mode
        *@param arg Indicates the number of resources obtained
        *@return true Indicates that the acquisition was successful and false indicates that the acquisition failed
        **/
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        *Attempt to free a shared resource in shared mode
        *@param arg Indicates the number of resources released
        *@return true Indicates that the release was successful, false indicates that the release failed
        **/
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    /**
        * Whether the current thread obtains shared resources in exclusive mode. This method is used inside the condition object ConditionObject. If conditional waiting is not required, this method does not need to be implemented
        *@return true Indicates yes, false indicates No
        **/
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    

    In the mode of template method, AQS provides an algorithm framework for multiple threads to operate on shared resources. The above five protected methods are mainly used to try to obtain and release shared resources without blocking the current thread. AQS is left to specific business operation classes (such as ReentrantLock).

  7. Specific codes of AQS methods for obtaining and releasing resources:

    /**
        *Obtain the shared resources in exclusive mode, and return if the acquisition is successful. Otherwise, block the current thread, and put the current thread into the synchronization queue to wait for the resources to be obtained
        *@param arg Indicates the number of resources to obtain
        **/
    public final void acquire(int arg) {
        //First, call the tryAcquire method implemented by the subclass. If the method returns true, it means that the acquisition is successful and no subsequent judgment is made
        //Otherwise, call the acquirequeueueueued method to put the current thread into the synchronization queue and wait for resources
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
        *Wrap the current thread as a Node and put it into the synchronization queue
        *@param mode Mode, shared mode or exclusive mode
        *@return The node where the current thread is located
        **/
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // First try to put the node at the end of the queue. If it is successful, it will return. Otherwise, the node will be queued
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //Put the current node into the synchronization queue, and cas sets the header and tail nodes
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /**
        *Keep trying to get resources in the queue
        *@param node Node of current thread
        *@param arg Get the number of resources
        *@return Is the thread interrupted while waiting for resources
        **/
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //Gets the precursor node of the current node
                final Node p = node.predecessor();
                //Only when the precursor is the head node can you attempt to obtain the lock, because the head node is the node of the thread that currently holds the resource. If the precursor is not the head node, it is not necessary to attempt to obtain the lock
                if (p == head && tryAcquire(arg)) {
                    //After successful acquisition, set the current node as the head node
                    setHead(node);
                    //Release the original header node
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //If the node precursor is not the head node or the acquisition of resources fails, the current thread will be blocked
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
        *Free shared resources in exclusive mode
        *@param arg Indicates the number of resources released
        *@return true Indicates that the release was successful, false indicates that the release failed
        **/
    public final boolean release(int arg) {
        //Attempts to free resources, and returns directly if it fails
        if (tryRelease(arg)) {
            //After the release is successful, wake up the successor node
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    The exclusive mode in AQS is used to obtain and release resources. These two methods can be used to realize the function of lock. In fact, ReentrantLock is implemented based on the above methods. The method of obtaining and releasing resources in shared mode is similar to exclusive mode

  8. Wait and wake method of condition object:

    public class ConditionObject implements Condition, java.io.Serializable {    
    	//Node for the first conditional wait
        private transient Node firstWaiter;
        //Last conditional waiting node
        private transient Node lastWaiter;
    
        /**
          * Wake up the first waiting thread in the condition queue. At this time, the thread will enter the synchronization queue and wait for resources again
          */
        public final void signal() {
            //Determine whether the current thread occupies resources in exclusive mode
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                //Put the first thread in the condition queue back into the synchronization queue
                doSignal(first);
        }
    
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    
        final boolean transferForSignal(Node node) {
    
            //If the CAS operation fails, it means that the thread cancels obtaining shared resources. In this case, false is returned, and doSignal will try to put the next node into the synchronization queue
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            //Put nodes in synchronization queue
            Node p = enq(node);
            int ws = p.waitStatus;
            //Set the status of the precursor node of the node to Node.SIGNAL
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
        //Causes the thread that obtains the shared resource to wait and enter the condition queue. If the current thread is interrupted, it exits
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //Add a node to the condition queue
            Node node = addConditionWaiter();
            //Release the shared resources obtained by the current thread
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //Judge whether the current node is in the synchronization queue. If not, pause the current thread
            while (!isOnSyncQueue(node)) {
                //Pause the current thread, and the method responds to the interrupt; When the thread calling the signal() method releases the shared resource, execution continues from there
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //Wait again for the lock to be acquired
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
      }
    
  9. Summary: returns when the thread obtains the shared resource successfully. Otherwise, it enters the synchronization queue and waits for the precursor node to wake up. At this time, the current thread is blocked (the LockSupport.park method blocks the thread); After releasing the shared resources, the predecessor node will wake up (the LockSupport.unpark method wakes up the thread) the successor node. It should be noted that the thread that successfully obtains the shared resources must be the thread where the head node is located. When a thread that obtains shared resources calls the Condition.await() method, the current thread will enter the condition queue and wait; When other threads call the Condition.signal() method and release the shared resources, the current thread will re-enter the synchronization queue and wait to obtain the shared resources.

Keywords: Java data structure linked list JUC

Added by dr_freak on Fri, 15 Oct 2021 01:23:24 +0300