Detailed explanation of Java concurrency conditions

1, Introduction

1. What is a Condition

Any Java Object has a set of monitor methods (defined on java.lang.Object), mainly including wait(), wait(long timeout), notify() and notifyAll() methods. These methods cooperate with the synchronized synchronization keyword to realize the wait / notification mode. The Condition interface also provides a monitor method similar to Object. In combination with Lock, the wait / notify mode can be realized, but there are still differences in usage and functional characteristics between the two—— From the art of Java Concurrent Programming

The following is a comparison between Condition and Object monitor methods (from the art of Java Concurrent Programming)

Comparison itemObject Monitor MethodsCondition
PreconditionsGet lock of object1. Call lock Lock() get lock
2. Call lock Newcondition() gets the Condition object
Call modeCall directly.
Such as object wait()
Call directly.
Such as condition await()
Number of waiting queuesOneMultiple
The current thread releases the lock and enters the waiting statesupportsupport
The current thread releases the lock and enters the waiting state, and does not respond to interrupts in the waiting stateI won't support itsupport
The current thread releases the lock and enters a timeout wait statesupportsupport
The current thread releases the lock and enters the wait state until some time in the futureI won't support itsupport
Wake up a thread in the waiting queuesupportsupport
Wake up all threads in the waiting queuesupportsupport

2. Condition interface

Let's take a look at the definition of the Condition interface

public interface Condition {

    //Wait. The current thread is in a waiting state until it receives the signal or before it is interrupted
    void await() throws InterruptedException;

    //Wait. The current thread is in a waiting state until it receives the signal and does not respond to the interrupt
    void awaitUninterruptibly();

    //Wait. The current thread is in a waiting state before receiving the signal, or before interrupting, or before reaching the specified waiting time. The return value = nanosTimeout - the actual time consumed. The return value < = 0 indicates timeout
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    //Wait. The current thread is in a waiting state before receiving the signal, or before interrupting, or before reaching the specified waiting time. The boolean type is returned to indicate whether the acceptance signal is obtained within the specified time, and false indicates timeout.
    //It is different from the previous method in that the timeout time unit can be customized, which is equivalent to awaitnanos (unit. Tonanos (time)) > 0
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    //Wait: the current thread is interrupted or wakes up a waiting thread before receiving the signal. If all threads are waiting for this condition, select one of them to wake up. The thread must reacquire the lock before returning from await. Waiting until the deadline
    boolean awaitUntil(Date deadline) throws InterruptedException;

    //Wake up a waiting thread. If all threads are waiting for this Condition, select one of them to wake up. Before returning from await, the thread must re acquire the Condition related lock.
    void signal();

    //Wake up all waiting threads. If all threads are waiting for this Condition, wake up all threads. Before returning from await, each thread must re acquire the Condition related lock.
    void signalAll();
}

Condition is a generalized condition queue. It provides a more flexible wait / notify mode for the thread. The thread performs the suspend operation after calling the await method, and will not be awakened until a condition that the thread waits for is true. Condition must be used with locks because access to shared state variables occurs in a multithreaded environment. An instance of condition must be bound to a Lock, so condition is generally used as the internal implementation of Lock.

2, Basic use

Take the basic B thread waking up A thread as an example

1. Object waiting for wake-up

public static void main(String[] args) throws InterruptedException {
    //Object lock
    Object obj=new Object();

    //Create thread A
    Thread threadA = new Thread(()->{
        System.out.println("A Attempt to acquire lock...");
        synchronized (obj){
            System.out.println("A Lock acquisition succeeded!");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("A Start releasing the lock and start waiting...");
                //Thread A starts waiting
                obj.wait();
                System.out.println("A Be informed to continue running until the end.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    });
    //Create thread B
    Thread threadB = new Thread(()->{
        System.out.println("B Attempt to acquire lock...");
        synchronized (obj){
            System.out.println("B Lock acquisition succeeded!");
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("B Start releasing lock...");
                //Thread B starts waking up thread A
                obj.notify();
                System.out.println("B Random notification lock A thread in the waiting queue of the object!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    //Start thread A
    threadA.start();
    //Here, in order to prevent thread A from executing notify first, thread B will never wake up
    TimeUnit.SECONDS.sleep(1);
    //Start thread B
    threadB.start();
}

The implementation results are as follows

A Attempt to acquire lock...
A Lock acquisition succeeded!
A Start releasing the lock and start waiting...
B Attempt to acquire lock...
B Lock acquisition succeeded!
B Start releasing lock...
B Random notification lock A thread in the waiting queue of the object!
A Be informed to continue running until the end.

Process finished with exit code 0

2. Condition waiting for wakeup

public static void main(String[] args) throws InterruptedException {
    //Create lock object
    Lock lock = new ReentrantLock();
    //New condition
    Condition condition = lock.newCondition();

    //Create thread A
    Thread threadA = new Thread(()->{
        System.out.println("A Attempt to acquire lock...");
        lock.lock();
        try {
            System.out.println("A Lock acquisition succeeded!");
            TimeUnit.SECONDS.sleep(1);
            System.out.println("A Start releasing the lock and start waiting...");
            //Thread A starts waiting
            condition.await();
            System.out.println("A Notified to continue running...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
            System.out.println("A The thread has released the lock and the execution is over!");
        }

    });
    //Create thread B
    Thread threadB = new Thread(()->{
        System.out.println("B Attempt to acquire lock...");
        lock.lock();
        try {
            System.out.println("B Lock acquisition succeeded!");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("B Start releasing lock...");
            //Thread B starts waking up thread A
            condition.signal();
            System.out.println("B Random notification lock A thread in the waiting queue of the object!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
            System.out.println("B The thread has released the lock and the execution is over!");
        }
    });
    //Start thread A
    threadA.start();
    //Here, in order to prevent thread A from executing notify first, thread B will never wake up
    TimeUnit.SECONDS.sleep(1);
    //Start thread B
    threadB.start();
}

The implementation results are as follows

A Attempt to acquire lock...
A Lock acquisition succeeded!
A Start releasing the lock and start waiting...
B Attempt to acquire lock...
B Lock acquired successfully!
B Start releasing lock...
B Random notification lock A thread in the waiting queue of the object!
B The thread has released the lock and the execution is over!
A Notified to continue running...
A The thread has released the lock and the execution is over!

Process finished with exit code 0

3, Implementation principle

1. ConditionObject class

ConditionObject is the specific implementation of Condition in java concurrency. Since the operation of Condition needs to obtain relevant locks, and AQS is the implementation basis of synchronous locks, ConditionObject is defined as the internal class of AQS.

1.1 class inheritance

ConditionObject is defined as follows

public class ConditionObject implements Condition, java.io.Serializable {
}

Implement Condition and Serializable interfaces.

1.2 properties of class

It mainly contains the following attributes

//Serialization version number
private static final long serialVersionUID = 1173984872572414699L;
//The first node of the conditional (waiting) queue
private transient Node firstWaiter;
//The last node of the conditional (waiting) queue
private transient Node lastWaiter;

2. Waiting queue

Each ConditionObject contains a FIFO queue. The Node type in the queue is the internal class of AQS - Node class. Each Node contains a thread reference, which is the thread waiting on the Condition object. Different from CLH synchronization queue, the waiting queue of Condition is a one-way queue, that is, each Node only contains references to the next Node, as shown in the following figure

3. Analysis of await method

Calling the await() method of Condition will make the current thread enter the waiting state, join the Condition waiting queue and release the lock at the same time. The await method code is as follows

public final void await() throws InterruptedException {
    //Response interrupt
    if (Thread.interrupted())
        throw new InterruptedException();
    //Adds the current thread to the waiting queue
    Node node = addConditionWaiter();
    //Release lock
    int savedState = fullyRelease(node);
    //Interrupt mode
    int interruptMode = 0;
    //Check whether this node is in the synchronization queue. If it is not, it means that the thread is not qualified to compete for the lock. Continue to wait until it is detected that this node is in the synchronization queue
    while (!isOnSyncQueue(node)) {
        //Suspend thread
        LockSupport.park(this);
        //If it is found that the thread has been interrupted, exit directly
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

Let's analyze it step by step. First, judge the response interrupt. If no interrupt exception is thrown, the addConditionWaiter method is called to add the current thread to the waiting queue. The method is as follows

private Node addConditionWaiter() {
    //Get tail node
    Node t = lastWaiter;
    //If t is not empty and its waiting state is not CONDITION, it means that the node is not in the waiting state and needs to be cleared
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        //Get new tail node again
        t = lastWaiter;
    }
    //Create a node containing the current thread with the status of CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //If t is empty, the queue is empty, so the new node is the first node
    if (t == null)
        //Set header node
        firstWaiter = node;
    //Otherwise, the queue is not empty, and the node is inserted to the last position
    else
        //Insert node to last position
        t.nextWaiter = node;
    //Set tail node
    lastWaiter = node;
    //Return node
    return node;
}

//Clear all nodes in the Condition queue whose status is not Condition
private void unlinkCancelledWaiters() {
    //Get header node
    Node t = firstWaiter;
    Node trail = null;
    //Clear the nodes whose waiting state is not CONDITION from beginning to end
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

After the current node is successfully added to the waiting queue, the fullyRelease method will be called to completely release the lock

final int fullyRelease(Node node) {
    //Fail flag
    boolean failed = true;
    try {
        //Get synchronization status
        int savedState = getState();
        //If the lock is released successfully
        if (release(savedState)) {
            //The failure flag is No
            failed = false;
            //Return to the original synchronization status
            return savedState;
        }  
        //If the lock release fails, an exception is thrown directly
        else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        //If the operation fails, set the node state to invalid
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

After the lock is released successfully, it will spin. During the spin, it will constantly judge whether the node is in the synchronization queue. If not, it means that the thread is not qualified to compete for the lock. Continue to wait until it is detected that the node is in the synchronization queue. The code is as follows

while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    //After the thread is awakened, you need to judge whether there has been an interrupt when it is suspended. If there has been an interrupt, you can exit the loop directly
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

//---------------------------The following is a detailed explanation of the method called by the above code-----------------------------------
//Returns whether an interrupt has occurred and the mode of the interrupt
//THROW_IE: indicates to interrupt again when exiting waiting
//REINTERRUPT: indicates that an InterruptedException exception is thrown when exiting the wait
private int checkInterruptWhileWaiting(Node node) {
    //If the thread is interrupted, execute the method transferaftercanceledwait, otherwise return 0
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}

//??? I don't understand what this method does, and I don't dare to talk nonsense. If I know, my little partner can confide in me
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

Let's first look at the implementation of the isOnSyncQueue method

final boolean isOnSyncQueue(Node node) {
    //For the nodes that can be in the CLH queue, the status of the node will not be CONDITION (the spin is modified to SIGNAL), and the precursor cannot be empty (there must be a precursor for non head nodes in the CLH queue). If one of them is not satisfied, it means that the node must not be in the CLH queue, and false is returned
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //If the successor of the node exists, it means that it must be in the team
    //Why? Because the node node is inserted at the end of the waiting queue before calling this method, the next field must be null. If it is not empty, it means it is definitely not in the waiting queue, that is, in the CLH queue
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //Look up the CLH queue from the tail to the head to see if there are node nodes
    return findNodeFromTail(node);
}

//Judge whether there are node nodes in CLH queue
private boolean findNodeFromTail(Node node) {
    //Get node
    Node t = tail;
    //Traverse backward and forward to find out whether there is a node node
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

Why should we judge whether it is in the CLH queue? Because the node waiting in the queue will be transferred to the CLH queue when performing the notification operation, which means that this node can be awakened. Any node that needs to be awakened in the waiting queue will queue in the CLH queue to obtain the lock. So far, there are all processes in which the current thread is suspended. When other threads notify the thread to wake up, it will continue to execute. The remaining code is three if judgments, as shown below

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
  • The first if judgment: it calls the acquirequeueueueueueueued method, which is a spin process, that is, when each thread enters the synchronization queue, it will spin to observe whether it meets the conditions and obtain the synchronization state, and then it can exit from the spin process, Otherwise, continue to spin (the code details have been explained in detail in the previous AQS article and will not be repeated here. For details, refer to the article: In depth understanding of AQS implementation principle ), the returned result indicates whether the current interrupt has occurred. At the same time, judge whether the interrupt mode is not THROW_IE, if not, the interruptMode will be set to REINTERRUPT, that is, self interrupt after waiting.

  • The second if judgment: if the next waiting person of the node is not empty, all nodes in the status of cancelled will be cleared.

  • The third if judgment: interrupt the operation according to the interrupt mode

    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        //If interrupt mode is THROW_IE, throw exception directly
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        //If the interrupt mode is REINTERRUPT, self interrupt
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    

So far, the whole process of await is finished. Let's summarize the execution process of await. The premise is to obtain the lock (the detailed operation of interruption is ignored here)

  • 1) Wrap the current thread as a node and insert it into the end of the waiting queue of Condition
  • 2) Release lock
  • 3) Spin suspend: when other threads wake up the current thread, the exit condition is that the node has been moved to the CLH queue or the thread corresponding to the current node has been interrupted, otherwise continue to suspend
  • 4) Obtain the lock for subsequent operations

4. signal method analysis

Calling the signal() method of Condition will wake up the node waiting in the waiting queue for the longest time (the first node in the Condition queue). Before waking up the node, it will move the node to the CLH synchronization queue. The code is as follows

public final void signal() {
    //Judge whether the current thread is the thread holding the lock. If not, throw an exception
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //Get header node
    Node first = firstWaiter;
    //If the header node is not empty, wake up the header node
    if (first != null)
        doSignal(first);
}

To call this method, you need to judge whether the current thread has obtained the lock, otherwise you will throw an exception directly, and then call the method doSignal to wake up the head node in the queue. The code is as follows

private void doSignal(Node first) {
    //Because it has been determined in the outer layer that first is not empty, use do while..
    do {
        //Remove the first and modify the head node. At the same time, if the queue is empty, you need to modify the tail node
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

//Move the node to the CLH synchronization queue and return whether the move succeeded or not 0
final boolean transferForSignal(Node node) {
    //Change the node state from CONDITION to initial state 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))		//If CAS modification fails, false is returned
        return false;

    //Adding a node to the CLH queue returns the node in front of the node node in the CLH queue
    Node p = enq(node);
    //Get the status of p
    int ws = p.waitStatus;
    //If the state of node P is greater than 0 (cancel state), directly wake up the thread corresponding to the node node. Otherwise, CAS modifies the state of node p to SIGNAL, and wakes up if the modification fails. Otherwise, it will not wake up
    //The guess is that if the setting fails to prevent other nodes from waking up the thread, wake it up first
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //Wake up thread
        LockSupport.unpark(node.thread);
    return true;
}

The whole signal process is relatively simple. Let's summarize it

  • Judge whether the current thread is the thread holding the lock. If not, throw an exception, otherwise continue to execute
  • Move the head node of the waiting queue corresponding to the Condition to the CLH queue

5. Summary

In general, the implementation of Condition is to add a waiting queue. When await method is called, the current thread will be placed at the end of the waiting queue. Meanwhile, the current thread will hang and wait for other threads to wake up. When other threads call the signal method, a thread will be taken from the waiting queue and inserted into the CLH queue, followed by the conventional AQS lock acquisition process.

Other methods are as follows:

public final void awaitUninterruptibly(){...}
public final long awaitNanos(long nanosTimeout){...}
public final boolean await(long time, TimeUnit unit){...}
public final boolean awaitUntil(Date deadline){...}
public final void signalAll(){...}

The logic of these methods is basically the same as await and signal, but some interrupt neglect and timeout judgment are added. The space is limited, so I won't repeat it here. Interested partners can study it by themselves.

Keywords: Java Back-end JUC

Added by MitchEvans on Sat, 05 Mar 2022 06:42:03 +0200