AQS(AbstractQueuedSynchronizer) source code

A few days ago, when comparing the relationship and difference between Synchronized and ReentrantLock, and learning to use Semaphore, CountDownLatch and CyclicBarrier, I found that there is such a synchronizer at the bottom layer. This makes me feel that to learn their underlying principles, I have to learn the underlying principles of AQS itself. Then, let's come.

Here's a reference Detailed explanation of queue synchronizer (AQS) And This is illustrated: I wrote more than 10000 words to let you understand how AQS works

catalogue

Member variable

Node

Design of exclusive synchronization component

Template method provided by synchronizer

Method for subclass override

Specific method implementation

acquire method

Release

Member variable

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    /**
		Create a new AbstractQueuedSynchronizer instance with an initial synchronization state of zero.
     */
    protected AbstractQueuedSynchronizer() { }
  
    
    static final class Node{...} //The nodes in the data structure are described in detail below.
  
  
    /**
		Wait for the head of the queue and delay initialization. In addition to initialization, it can only be modified through setHead method. Note: if the head exists, its waitStatus will not be CANCELLED.
     */
    private transient volatile Node head;

    /**
		Wait for the end of the queue to delay initialization. Method enq modification only to add a new wait node
     */
    private transient volatile Node tail;

    /**
		Synchronization status.
     */
    private volatile int state;
  
}

It can be seen that the synchronizer itself is actually a two-way linked list. The only nodes we can get directly are head and tail. The main thing is that we need to examine the meaning of this Node. Where the head Node is the Node of the running thread.

         

Node

static final class Node {
        /** Flag indicating that the node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Flag indicating that the node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value indicating that the thread has been canceled*/
        static final int CANCELLED =  1;
        /** Indicates the waitStatus value that the successor node needs to wake up */
        static final int SIGNAL    = -1;
        /** waitStatus Value indicates that the thread is waiting for a condition (entering the waiting queue) */
        static final int CONDITION = -2;
        /**Indicates the waitStatus value that the next acquireShared should propagate unconditionally*/
        static final int PROPAGATE = -3;


        volatile int waitStatus;

        /**
        Precursor node
         */
        volatile Node prev;

        /**
		  Successor node
         */
        volatile Node next;

        /**
            The thread loaded by this node is initialized at construction time and zeroed after use.
         */
        volatile Thread thread;

        /**
The node linked to the next wait condition, or the special value SHARED. Because conditional queues are only accessible in exclusive mode, we only need a simple link queue to save nodes because they are waiting for conditions. They are then moved to the queue to retrieve them. And because conditions can only be exclusive, we save fields by using special values to represent the sharing mode.
         */
        Node nextWaiter;

        /**
			Returns true if the node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

         /** Previous node*/
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

The code Lee defines a field waitStatus representing the waiting status of the current Node. The values include CANCELLED(-1), SIGNAL(-1), CONDITION(-2), PROPAGATE(-3) and 0. These five values represent different specific scenarios.

  • CANCELLED: indicates that the current node has CANCELLED scheduling. When a timeout or is interrupted (in response to an interrupt), it will trigger a change to this state, and the node will not change after entering this state.
  • SIGNAL: indicates that the successor node is waiting for the current node to wake up. When the successor node joins the queue, the status of the predecessor node will be updated to SIGNAL (remember the value of - 1, because we will often mention it later)
  • Condition: indicates that the node is waiting on the condition. When other threads call the signal() method of the condition, the node in the condition state will be transferred from the waiting queue to the synchronization queue, waiting to obtain the synchronization lock. (Note: condition is a component of AQS, which will be described in detail later)
  • PROPAGATE: in the sharing mode, the predecessor node will wake up not only its successor nodes, but also the successor nodes.
  • 0: the default status when new nodes join the queue.

When waitStatus is negative, it indicates that the node is in a valid waiting state. When it is positive, it indicates that the node has been cancelled.

Design of exclusive synchronization component

Template method provided by synchronizer

//Exclusively obtain the synchronization status. If the current thread obtains the synchronization status successfully, it will return immediately. Otherwise, it will enter the synchronization queue and wait,
//This method will repeatedly call the overridden tryAcquire(int arg) method
public final void acquire(int arg) {...}


//It is basically the same as acquire(int arg), but this method responds to an interrupt.
public final void acquireInterruptibly(int arg){...}



//Exclusively release the synchronization state. This method will wake up the thread contained in the first node in the synchronization queue after releasing the synchronization state
public final boolean release(int arg) {...}

Method for subclass override

//Exclusively obtain the synchronization status. To implement this method, you need to query the current status and judge whether the synchronization status meets the expectation, and then set the synchronization status by CAS
protected boolean tryAcquire(int arg)
 
//Exclusively release the synchronization state, and the thread waiting to obtain the synchronization state will have the opportunity to obtain the synchronization state
protected boolean tryRelease(int arg)
 
//Whether the current synchronizer is occupied by the thread in exclusive mode. Generally, this method indicates whether it is exclusive by the current thread
protected boolean isHeldExclusively()

Specific method implementation

acquire method

/**
Get in exclusive mode, ignoring interrupts. By calling tryAcquire at least once, success is returned. Otherwise, the thread will queue, may block and unblock repeatedly, and call tryAcquire until it succeeds. This method can be used to implement the lock method lock . 
*/
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && //If tryacquire fails
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//Generate nodes and join the synchronization queue
            selfInterrupt();
    }

/**
Creates a node for the current thread and the given mode and queues it
*/
private Node addWaiter(Node mode) { //Sharing / exclusivity, null is exclusive
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) { //
            node.prev = pred;//After connecting to the tail
            if (compareAndSetTail(pred, node)) { //CAS settings. It is expected that the current tail is pred. If yes, it is changed to node.
               //If the CAS attempt is successful, it means that there is no other thread between "setting the precursor of the current node" and "CAS setting tail" 								 success
                //Just point the successor node of the "previous tail" to the node
                pred.next = node;
                return node;
            }
        }
        enq(node);//Otherwise, the correct addition of nodes is guaranteed through an endless loop
        return node;
    }


/**
Each node determines whether it is the head node in an endless loop.

*/
    final boolean acquireQueued(final Node node, int arg) { //spin
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//The precursor node is the head node and obtains the synchronization status
                    setHead(node); //Set its first node
                    p.next = null; // help GC break reference
                    failed = false;
                    return interrupted; //Spin exit
                }
                if (shouldParkAfterFailedAcquire(p, node) && //Judge whether to block or interrupt after failed to obtain the synchronization status
                    parkAndCheckInterrupt()) //Block current thread
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

The above logic can be represented by the following figure:

When the current thread fails to obtain the synchronization status, the synchronizer will construct the current thread, waiting status and other information into a Node and add it to the synchronizer queue. At the same time, it will block the current thread.

At the same time, it can be seen that the nodes in each synchronizer queue are spinning. Introspectively observe whether their precursor node is a head node. If so, try tryAcquire and exit the spin after success.

It can be seen that nodes do not communicate with each other in the process of cyclic inspection, but simply judge whether their precursor is the head node, so that the release rules of nodes comply with FIFO. It also facilitates the processing of premature notification (premature notification means that the thread of the precursor node that is not the head node is awakened due to interruption).  

Release

When the thread of the head node releases the synchronization state, it will wake up the subsequent nodes, and the subsequent nodes will set themselves as the head node when they successfully obtain the synchronization state.

 

public final boolean release(int arg) {
        if (tryRelease(arg)) { //If the synchronization status is released successfully
            Node h = head;
            if (h != null && h.waitStatus != 0) //Exclusive is SIGNAL -1. Wake up the following
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


private void unparkSuccessor(Node node) {

        int ws = node.waitStatus; //Current node waitStatus
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//Update to 0

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev) //Look forward from the tail
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //awaken
    }

Keywords: Java Concurrent Programming JUC FIFO

Added by ghazianibros on Sun, 16 Jan 2022 14:35:53 +0200