Multithreaded concurrency tool -- J.U.C concurrent contracting -- AQS and ReentrantLock principle

Most of the tools in the JUC package are based on AQS, so let's learn the principle of AQS first

1.AQS principle

1.1 overview:

The full name is AbstractQueuedSynchronizer, which is the framework of blocking lock and related synchronizer tools

1.2 features:

  • The state attribute is used to represent the state of resources (exclusive mode and shared mode). Subclasses need to define how to maintain this state and control how to obtain and release locks

    • getState - get state
    • setState - set state
    • compareAndSetState - cas mechanism sets state state
    • Exclusive mode allows only one thread to access resources, while shared mode allows multiple threads to access resources
  • FIFO based waiting queue is provided, which is similar to the EntryList of Monitor

  • Condition variables are used to realize the waiting and wake-up mechanism. Multiple condition variables are supported, similar to the WaitSet of Monitor

1.3 implementation mode:

Define class inheritance aqs, and implement the following methods in subclasses: (throw unsupported operation exception by default)

  1. tryAcquire
  2. tryRelease
  3. tryAcquireShared
  4. tryReleaseShared
  5. isHeldExclusively
//Get lock pose
// If lock acquisition fails
if (!tryAcquire(arg)) {
 // To join the queue, you can choose to block the current thread park unpark
}

//Release lock posture
// If the lock is released successfully
if (tryRelease(arg)) {
 // Let the blocked thread resume operation
}

1.4 using AQS to realize a non reentrant lock

Custom exclusive lock:

// Custom lock (non reentrant lock)
class MyLock implements Lock {

    // AQS has implemented most methods for us. We only need to implement a few important methods
    // Implement an exclusive lock synchronizer class
    class MySync extends AbstractQueuedSynchronizer {
        @Override // Attempt to acquire lock
        protected boolean tryAcquire(int arg) {
            // Try to modify the state. If you change from 0 to 1, it means you have acquired the lock
            // Maybe other threads also want to modify the state, so they need to use cas to ensure atomicity
            if (compareAndSetState(0, 1)) {
                // The lock is added and the owner is set to the current thread
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            // Put it later to ensure that the previous code is masked and visible to other threads
            setState(0);
            return true;
        }

        @Override // Whether to hold exclusive lock
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Return condition variable
        public Condition newCondition() {
            // Internal classes in AQS
            return new ConditionObject();
        }
    }

    private MySync mySync = new MySync();

    @Override // Lock (if unsuccessful, it will enter the waiting queue)
    public void lock() {
        // Try to lock. If it fails, it will be put into the waiting queue
        // tryAcquire cannot be used here. tryAcquire will only try once
        mySync.acquire(1);
    }

    @Override // Lock, interruptible
    public void lockInterruptibly() throws InterruptedException {
        mySync.acquireInterruptibly(1);
    }

    @Override // Try locking once
    public boolean tryLock() {
        return mySync.tryAcquire(1);
    }

    @Override // Try locking with timeout
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return mySync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override // Unlock
    public void unlock() {
        mySync.release(1);
    }

    @Override // Create condition variable
    public Condition newCondition() {
        return mySync.newCondition();
    }
}

Test:

public static void main(String[] args) {
    MyLock lock = new MyLock();
    new Thread(() -> {
       lock.lock();
       try {
           log.debug("locking.....");
           Thread.sleep(1000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       } finally {
           log.debug("unlocking.....");
           lock.unlock();
       }
    }, "t1").start();
    new Thread(() -> {
        lock.lock();
        try {
            log.debug("locking.....");
        } finally {
            log.debug("unlocking.....");
            lock.unlock();
        }
    }, "t2").start();
}

result:

14:21:09.947 [t1] DEBUG aqs - locking.....
14:21:10.961 [t1] DEBUG aqs - unlocking.....
14:21:10.961 [t2] DEBUG aqs - locking.....
14:21:10.961 [t2] DEBUG aqs - unlocking.....

Test reentry:

public static void main(String[] args) {
    MyLock lock = new MyLock();
    new Thread(() -> {
       lock.lock();
       try {
           log.debug("locking.....");
           Thread.sleep(1000);
           lock.lock();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } finally {
           log.debug("unlocking.....");
           lock.unlock();
       }
    }, "t1").start();
}

Result: it is proved that this is a non reentrant lock. After a thread gets the lock, it can't get the lock again

2.ReentrantLock principle

From the class diagram, we can see:

  1. The Lock interface is implemented
  2. A sync Abstract synchronizer is maintained internally, and sync inherits the AQS class
  3. sync synchronizer has two implementations:
    1. Unfair synchronizer
    2. Fair synchronizer

2.1 implementation principle of unfair lock:

2.1.1 lock and unlock process:

The lock unlocking process starts from the constructor, and the default is the implementation of unfair lock

public ReentrantLock() {
	sync = new NonfairSync();
}

NonfairSync inherits from AQS
Check the source code of NonfairSync: (JDK8)

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
    	// Try changing state from 0 to 1
        if (compareAndSetState(0, 1))
        	// Change the owner thread to the current thread (change the current thread to the owner thread)
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
1. When there is no competition:
  1. Try changing state from 0 to 1
  2. There is no thread competition. Change the owner thread to the current thread (change the current thread to the owner thread)

2. When the first competition appears, Thread-0 has changed the state to 1. Thread-1:
public final void acquire(int arg) {
   if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  1. cas failed to change 0 to 1. Enter else method:
  2. Try acquire again and try to change the state from 0 to 1, but failed

  1. Enter the addWaiter logic, try to build a Node object and add it to the waiting queue
    1. The yellow triangle in the figure indicates the waitStatus status of the Node, where 0 is the default normal status
    2. Node creation is lazy
    3. For the first time, two Node objects will be created
    4. The first Node is called Dummy or sentinel. It is used to occupy a bit and is not associated with a thread
    5. The second Node is the Node associated with Thread-1, which is placed at the end of the linked list
 private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

  1. Thread into the current thread:
    1. Acquirequeueueued will keep trying to obtain locks in an endless loop, and enter park blocking after failure
    2. If you are next to the head (in the second place), try to acquire the lock again. We set the state here as 1 and fail
    3. Enter the shouldParkAfterFailedAcquire logic, change the waitStatus of the precursor node, that is, head, to - 1, and return false this time
    4. shouldParkAfterFailedAcquire returns to acquirequeueueueued after execution. Try to acquire the lock again. Of course, the state is still 1 at this time and fails
    5. When entering shouldParkAfterFailedAcquire again, it returns true this time because the waitStatus of its precursor node is - 1
    6. Enter parkAndCheckInterrupt, Thread-1 park (gray indicates blocked)
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	// 	Gets the precursor node of the current node node (Thread-1)
            final Node p = node.predecessor();
            // Judge whether this node is the head node, that is, whether node is the second bit
            // If so, try to acquire the lock again
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // block
    return Thread.interrupted();
}


3. Once again, multiple threads go through the above process, and the competition fails, which becomes like this

It forms a linked list. The head is the node without associated thread, and the last thread is the tail node. Except for the last one, everyone's waitStatus value is - 1

4.Thread-0 releases the lock and enters the tryRelease process. If successful:
  1. Set exclusiveOwnerThread to null, state = 0
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	// Wake up the successor node
            unparkSuccessor(h);
        return true;
    }
    return false;
}

  1. In the release method of the unlock method, if the current queue is not null and the waitstatus of the head = - 1, enter the unparksuccess process:
    1. The nearest Node in the queue to the head (not cancelled) will be found in unparkwinner, and unpark will resume its operation, which is Thread-1 in this example
    2. Return to the acquirequeueueueued process of Thread-1; If the locking is successful (no competition), it will be set (in the acquirequeueueueueueueueueued method)
      1. exclusiveOwnerThread is Thread-1, state = 1
      2. The head points to the Node where Thread-1 is just located, and the Node clears the Thread
      3. The original head can be garbage collected because it is disconnected from the linked list
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

5. If there are other threads competing:

If there are other threads competing at this time (unfair embodiment), for example, Thread-4 comes at this time

If Thread-4 takes the lead again

  1. Thread-4 is set to exclusiveOwnerThread, state = 1
  2. Thread-1 enters the acquirequeueueueueueueueued process again, fails to obtain the lock, and re enters the park block

2.1.2 lock related source code:

// Sync inherited from AQS
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

     // Lock implementation
    final void lock() {
        // First, use cas to try (only once) to change the state from 0 to 1. If it is successful, it indicates that an exclusive lock has been obtained
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // If the attempt fails, enter one
            acquire(1);
    }

    // A method inherited from AQS, which is easy to read and put here
    public final void acquire(int arg) {
        // ㈡ tryAcquire
        if (
                !tryAcquire(arg) &&
            	// When tryAcquire returns false, addWaiter four is called first, followed by acquirequeueueueued five
                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }

    // Two into three
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    // Three inherited methods are convenient to read and put here
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // If the lock has not been obtained
        if (c == 0) {
            // Try to obtain with cas, which reflects the unfairness: do not check the AQS queue
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // If the lock has been obtained and the thread is still the current thread, it indicates that lock reentry has occurred
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // Failed to get. Return to the calling function
        return false;
    }

    // IV. The method inherited from AQS is easy to read and placed here
    private Node addWaiter(Node mode) {
// Associate the current thread to a Node object. The mode is exclusive. The waitstatus of the new Node is 0 by default. Because waitstatus is a member variable, it is initialized to 0 by default
        Node node = new Node(Thread.currentThread(), mode);
        // If tail is not null, cas tries to add the Node object to the end of AQS queue
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // Bidirectional linked list
                pred.next = node;
                return node;
            }
        }
        //If tail is null, try to add Node to AQS and enter six steps
        enq(node);
        return node;
    }

    // Six, AQS inherited method, easy to read, put here
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                // Not yet. Set the head as the sentinel node (no corresponding thread, status 0)
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                // cas tries to add the Node object to the end of AQS queue
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // V. the method inherited from AQS is easy to read and placed here
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // The previous node is head, which means it's your turn (the node corresponding to the current thread). Try to get it
                if (p == head && tryAcquire(arg)) {
                    // After obtaining success, set yourself (the node corresponding to the current thread) as head
                    setHead(node);
                    // Previous node
                    p.next = null;
                    failed = false;
                    // Return interrupt flag false
                    return interrupted;
                }
                if (
                    // Judge whether to park and enter seven
                    shouldParkAfterFailedAcquire(p, node) &&
                    // park and wait. At this time, the status of the Node is set to Node SIGNAL ㈧
                    parkAndCheckInterrupt()
                ) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // VII. The method inherited from AQS is easy to read and placed here
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // Gets the status of the previous node
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) {
            // If the previous node is blocking, you can also block it yourself
            return true;
        }
        // >0 indicates cancellation status
        if (ws > 0) {
            // If the previous node is cancelled, the reconfiguration deletes all the previous cancelled nodes and returns to the outer loop for retry
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // There's no blockage this time
            // However, if the next retry is unsuccessful, blocking is required. At this time, the status of the previous node needs to be set to node SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // 8. Block the current thread
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
}

2.1.3 unlock relevant source code:

// Sync inherited from AQS
static final class NonfairSync extends Sync {
    // Unlock implementation
    public void unlock() {
        sync.release(1);
    }

    // The method inherited from AQS is easy to read and placed here
    public final boolean release(int arg) {
        // Try to release the lock and enter a
        if (tryRelease(arg)) {
            // Queue header node unpark
            Node h = head;
            if (
                // Queue is not null
                h != null &&
                // waitStatus == Node. unpark is required for signal
                h.waitStatus != 0
            ) {
                // The thread waiting in unpark AQS enters the second stage
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

    // An inherited method, which is convenient for reading, is placed here
    protected final boolean tryRelease(int releases) {
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // Lock reentry is supported. It can be released successfully only when the state is reduced to 0
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    // II. The method inherited from AQS is easy to read and placed here
    private void unparkSuccessor(Node node) {
        // If the status is node Signal attempts to reset the status to 0. If the thread obtains the lock, the head node will be discarded later
        // It's ok if you don't succeed
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        // Find the node that needs unpark, but this node is separated from the AQS queue and is completed by the wake-up node
        Node s = node.next;
        // Regardless of the cancelled nodes, find the node in front of the AQS queue that needs unpark from back to front
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

2.2 reentrant principle:

When re entering the lock, let the state increase automatically, and when unlocking, let the state decrease automatically

static final class NonfairSync extends Sync {
    // ...

    // Sync inherited method, easy to read, put here
    // The parameter is equal to 1
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { // Acquire lock for the first time
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // If the lock has been obtained and the thread is still the current thread, it indicates that lock reentry has occurred
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // Sync inherited method, easy to read, put here
    // The parameter is 1
    protected final boolean tryRelease(int releases) {
        // state--	2 -1
        int c = getState() - releases; // Lock reentry count
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // Lock reentry is supported. It can be released successfully only when the state is reduced to 0
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}

2.3 interruptible principle

2.3.1 non interruptible mode: in this mode, even if it is interrupted, it will still stay in the AQS queue until it obtains the lock
// Sync inherited from AQS
static final class NonfairSync extends Sync {
    // ...

    private final boolean parkAndCheckInterrupt() {
        // If the break flag is already true, the park will be invalidated
        LockSupport.park(this);
        // interrupted clears the break flag
        return Thread.interrupted();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    // You still need to obtain the lock before you can return to the broken state
                    return interrupted;
                }
                if (
                        shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt()
                ) {
                    // If it is because interrupt is awakened, the interrupt status returned is true
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    public final void acquire(int arg) {
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            // If the interrupt status is true
            selfInterrupt();
        }
    }

    static void selfInterrupt() {
        // Re generate an interrupt. If the thread is running normally, the interrupt method will not report an error if it is not in sleep or other states
        Thread.currentThread().interrupt();
    }
}
2.3.2 interruptible mode:
static final class NonfairSync extends Sync {
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // If the lock is not obtained, enter one
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // An interruptible lock acquisition process
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
                    // In the process of park, if it is interrupt ed, it will enter this field
                    // At this time, an exception is thrown instead of entering for (;) again
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

2.4 fair lock principle:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
        acquire(1);
    }

    // The method inherited from AQS is easy to read and placed here
    public final void acquire(int arg) {
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }
    // The main difference from non fair lock is the implementation of tryAcquire method
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // First check whether there are precursor nodes in the AQS queue. If not, compete
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // A method inherited from AQS, which is easy to read and put here
    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        // h != t indicates that there are nodes in the queue
        return h != t &&
                (
                        // (s = h.next) == null indicates whether there is a dick in the queue
                        (s = h.next) == null || // Or the second thread in the queue is not this thread
                                s.thread != Thread.currentThread()
                );
    }
}

2.5 realization principle of conditional variable:

Each condition variable actually corresponds to a waiting queue (two-way linked list). Its implementation class is ConditionObject, which stores threads that do not meet the conditions and need to rest, similar to synchronized waitSet

1.await process:

Start Thread-0 to hold the lock, call await, enter the addConditionWaiter process of ConditionObject, create a new Node with the status of - 2 (Node.CONDITION), associate Thread-0 and add it to the tail of the waiting queue

Next, enter the full release process of AQS to release the lock on the synchronizer

The next node in the unpark AQS queue competes for the lock. Assuming that there are no other competing threads, the Thread-1 competition succeeds

2.single process:

Suppose Thread-1 wants to wake up Thread-0


Enter the doSignal process of ConditionObject and obtain the first Node in the waiting queue, that is, the Node where Thread-0 is located

Execute the transferForSignal process, add the Node to the end of the AQS queue, change the waitStatus of Thread-0 to 0 and the waitStatus of Thread-3 to - 1

Thread-1 releases the lock and enters the unlock process

3. Source code:
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    // First waiting node
    private transient Node firstWaiter;

    // Last waiting node
    private transient Node lastWaiter;
    public ConditionObject() { }
    // Add a Node to the waiting queue
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // All cancelled nodes are deleted from the queue linked list, as shown in Figure 2
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // Create a new Node associated with the current thread and add it to the end of the queue
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    // Wake up - transfer the first node that has not been cancelled to the AQS queue
    private void doSignal(Node first) {
        do {
            // It's already the tail node
            if ( (firstWaiter = first.nextWaiter) == null) {
                lastWaiter = null;
            }
            first.nextWaiter = null;
        } while (
            // Transfer the nodes in the waiting queue to the AQS queue. If it is unsuccessful and there are still nodes, continue to cycle for three times
                !transferForSignal(first) &&
                        // Queue and node
                        (first = firstWaiter) != null
        );
    }

    // External class methods, easy to read, put here
    // 3. If the node status is cancel, return false to indicate that the transfer failed, otherwise the transfer succeeded
    final boolean transferForSignal(Node node) {
        // Set the current node status to 0 (because it is at the end of the queue), if the status is no longer node Condition, the description is cancelled
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // Join the end of AQS queue
        Node p = enq(node);
        int ws = p.waitStatus;
        if (
            // The last node inserted into the node is cancelled
                ws > 0 ||
                        // The last node inserted into a node cannot be set to node SIGNAL
                        !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
        ) {
            // unpark unblocks and allows the thread to resynchronize the state
            LockSupport.unpark(node.thread);
        }
        return true;
    }
// Wake up all - wait for all nodes in the queue to transfer to the AQS queue
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

    // ㈡
    private void unlinkCancelledWaiters() {
        // ...
    }
    // Wake up - you must hold a lock to wake up, so there is no need to consider locking in doSignal
    public final void signal() {
        // If the lock is not held, an exception is thrown
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    // Wake up all - you must hold a lock to wake up, so there is no need to consider locking in dosignallall
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    // Non interruptible wait - until awakened
    public final void awaitUninterruptibly() {
        // Add a Node to the waiting queue, as shown in Figure 1
        Node node = addConditionWaiter();
        // Release the lock held by the node, see Figure 4
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        // If the node has not been transferred to the AQS queue, it will be blocked
        while (!isOnSyncQueue(node)) {
            // park blocking
            LockSupport.park(this);
            // If it is interrupted, only the interruption status is set
            if (Thread.interrupted())
                interrupted = true;
        }
        // After waking up, try to compete for the lock. If it fails, enter the AQS queue
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
    // External class methods, easy to read, put here
    // Fourth, because a thread may re-enter, you need to release all the state, obtain the state, and then subtract it all to release it all
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // Wake up the next node in the waiting queue
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    // Interrupt mode - resets the interrupt state when exiting the wait
    private static final int REINTERRUPT = 1;
    // Break mode - throw an exception when exiting the wait
    private static final int THROW_IE = -1;
    // Judge interrupt mode
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }
    // V. application interruption mode
    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    // Wait - until awakened or interrupted
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // Add a Node to the waiting queue, as shown in Figure 1
        Node node = addConditionWaiter();
        // Release the lock held by the node
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // If the node has not been transferred to the AQS queue, it will be blocked
        while (!isOnSyncQueue(node)) {
            // park blocking              
            LockSupport.park(this);
            // If interrupted, exit the waiting queue
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // After exiting the waiting queue, you also need to obtain the lock of the AQS queue
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // All cancelled nodes are deleted from the queue linked list, as shown in Figure 2
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // Apply interrupt mode, see v
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    // Wait - until awakened or interrupted or timed out
    public final long awaitNanos(long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // Add a Node to the waiting queue, as shown in Figure 1
        Node node = addConditionWaiter();
        // Release the lock held by the node
        int savedState = fullyRelease(node);
        // Get deadline
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        // If the node has not been transferred to the AQS queue, it will be blocked
        while (!isOnSyncQueue(node)) {
            // Timed out, exiting the waiting queue
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            // park blocks for a certain time, and spinForTimeoutThreshold is 1000 ns
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // If interrupted, exit the waiting queue
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        // After exiting the waiting queue, you also need to obtain the lock of the AQS queue
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // All cancelled nodes are deleted from the queue linked list, as shown in Figure 2
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // Apply interrupt mode, see v
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
    // Wait - until awakened or interrupted or timed out, the logic is similar to awaitNanos
    public final boolean awaitUntil(Date deadline) throws InterruptedException {
        // ...
    }
    // Wait - until awakened or interrupted or timed out, the logic is similar to awaitNanos
    public final boolean await(long time, TimeUnit unit) throws InterruptedException {
        // ...
    }
    // Tool method omitted
}

Keywords: Java C Multithreading

Added by mikerh9 on Mon, 21 Feb 2022 11:26:05 +0200