AQS - Abstract synchronization queue: implementation of exclusive lock

catalogue

Reference link: https://www.bilibili.com/video/BV12K411G7Fg

Through CAS, we can realize optimistic lock operation to synchronize threads. However, through the source code of CAS, we find that CAS can only modify a value in memory, rather than synchronize objects. So how to synchronize objects? At the same time, when multiple threads compete for unified resources, how can we manage all threads that need the resources? Therefore, AQS came into being.

Reference: going deep into the Java virtual machine

AbstractQueuedSynchronizer Abstract synchronization queue is called AQS for short. It is the basic component to implement the synchronizer, and the underlying lock in the contract is implemented through AQS. The structure is as follows

attribute

int state

In the shared mode, you need to indicate the number of threads holding the shared lock.

Shared lock and exclusive lock (exclusive lock)

Shared lock: this lock can be held by multiple threads. Shared locks only support reading data. If a thread adds a shared lock to the data, other data can only add a shared lock to the data.

Exclusive lock: only one thread can acquire the lock.

Shared lock and exclusive lock are different implementations of AQS

Node head & Node tail

It is used to maintain a two-way linked list of FIFO. The two Node nodes point to the head Node and the tail Node respectively

Node

The node structure in the queue is shown in the figure above

Method (taking exclusive mode as an example)

tryAcquire(int arg)

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

An attempt to acquire a lock is made, and the failure to acquire the lock is returned directly.

This method only throws an exception. AQS inheritance class needs to inherit this method to give the upper open space so that users can write business logic.

acquire(int arg)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            	acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

After the tryAcquire method fails, it will enter the waiting queue

addWaiter(Node.EXCLUSIVE), arg)

The main function is to create a new Node and insert the Node into the waiting queue.

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
    	// Get the current tail node. Tail is the AQS attribute
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // An attempt was made to make the current node the tail node through an atomic operation
            // In fact, after obtaining the pred, other threads may also modify the tail
            // compareAndSetTail(Node expect, Node update) will read the offset of tail
            // Judge whether the current pred is the end of the queue (the period may be modified by other threads). If so, update the end of the queue to the current node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
    
    	// Call the complete queue method, which will be entered when the above attempt to quickly queue fails
    	// For example, the tail is modified
        enq(node);
        return node;
}

acquireQueued(final Node node, int arg)

After joining the queue, spin the lock in the queue.

As can be seen from the code, the nodes behind the head node form a waiting queue

When the first node after the head obtains the lock, the node will become a new head node

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // Optional operation
            for (;;) {
                final Node p = node.predecessor();
                // The front node of the current node is the head node & & the current thread successfully obtains the lock
                if (p == head && tryAcquire(arg)) {
                    // The current node acts as the head node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // Determine whether the thread needs to be suspended according to the current node state
                if (shouldParkAfterFailedAcquire(p, node) &&
                    	parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

boolean tryRelease(int arg)

The same as tryAcquire, as a method open to the upper layer

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
}

boolean release(int arg)

Release the lock and notify the queue to change the thread state in the waiting queue

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	// Pass in the head and wake up the node in the waiting queue
                unparkSuccessor(h);
            return true;
        }
        return false;
}

unparkSuccessor(Node node)

After the head node has finished operating resources, it notifies the nodes in the waiting queue

In the operation of the following code, why don't you wake up from the beginning node

The search here is not atomic. Search from back to front may be because the queue construction order is not correct

  1. The back node pre points to the front node
  2. The front node next will point to the rear node

From front to back, the search may be interrupted because step 2 is not completed

private void unparkSuccessor(Node node) {
    	// Sets the status of the header node
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
		
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // Start searching from the tail node, the node that is the most front after the head and has waitstatus < = 0
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
    	// Wake up the found node. After waking up, the acquire method will be executed to obtain the lock
        if (s != null)
            LockSupport.unpark(s.thread);
}

Added by phpmoron on Fri, 29 Oct 2021 11:38:44 +0300