AQS java concurrency cornerstone

AQS is a basic package under the java locks package. It implements the serializable interface and inherits the AbstractOwnableSynchronizer. Its important class attributes include state representing its state, Node head tail representing queue, and exclusiveOwnerThread of AbstractOwnableSynchronizer.

The bottom layer of the lock method of ReentrantLock uses AQS. For example, Thread 1 and thread 2 want to compete for a lock at the same time, that is, they want to execute the lock method at the same time. As a result, only one method can enter the code block. How does the bottom AQS operate? First, during lock, the state will be set from 0 to 1 through cas. We know that only one thread will be set successfully, so we set exclusiveOwnerThread to this thread. After that, the thread will fail to set, but it will still try to set it again, acquire(1)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire uses nonFairTryAcquire(int arg) to attempt to acquire a lock. cas sets the state from 0 to 1. If the setting is successful, this thread will be set as the exclusive thread that owns the lock. If it fails, it will judge whether it already owns the lock. If it already owns the lock, it needs to set the state to state+1. Otherwise, the attempt to obtain the lock fails.

The nonfairTryAcquire() method is as follows

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

After that, it will acquire queued (addwaiter (node. Exclusive), Arg)
addWaiter() encapsulates the failed thread as a Node and adds it to the waiting queue, that is, it is inserted into the two-way list The specific logic is to first encapsulate the thread into a Node node, and then cas sets tail as Node. After success, the encapsulated Node is returned, and if failure, the tail Node is updated in a circular manner.

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

If cas fails or tail is empty, set the loop

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueue is to judge the Node to be added to the queue. If the previous Node is the head Node, it represents that the current or previous thread has obtained the lock, then you can try to obtain the lock. If not, you need to block and wait. Blocking and waiting is also a saying. For example, you can't put the new Node into the next Node of any Node, For example, if the waitstatus of the previous Node is 0 or greater than 0, you need to change the state of the previous Node or delete the previous Node directly until the waitstatus of the previous Node is less than 0 In this way, the thread can be effectively awakened. This is the purpose of shouldParkAfterFailedAcquire, and then you can park. It is worth noting that after the thread parks, it cannot respond to the interrupt. After waking up, it will pass the thread Interrupted() returns whether it was interrupted by another thread before and resets the interrupt flag. After that, self interruption is carried out. Pay attention to thread isInterrupted() function and thread Similarities and differences of interrupted() function. isInterrupted only determines whether the thread has been interrupted by other threads, and the interrupt flag will not be reset.

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

Keywords: Java

Added by writer on Fri, 18 Feb 2022 04:54:15 +0200