1, Introduction
1. What is a Condition
Any Java Object has a set of monitor methods (defined on java.lang.Object), mainly including wait(), wait(long timeout), notify() and notifyAll() methods. These methods cooperate with the synchronized synchronization keyword to realize the wait / notification mode. The Condition interface also provides a monitor method similar to Object. In combination with Lock, the wait / notify mode can be realized, but there are still differences in usage and functional characteristics between the two—— From the art of Java Concurrent Programming
The following is a comparison between Condition and Object monitor methods (from the art of Java Concurrent Programming)
Comparison item | Object Monitor Methods | Condition |
---|---|---|
Preconditions | Get lock of object | 1. Call lock Lock() get lock 2. Call lock Newcondition() gets the Condition object |
Call mode | Call directly. Such as object wait() | Call directly. Such as condition await() |
Number of waiting queues | One | Multiple |
The current thread releases the lock and enters the waiting state | support | support |
The current thread releases the lock and enters the waiting state, and does not respond to interrupts in the waiting state | I won't support it | support |
The current thread releases the lock and enters a timeout wait state | support | support |
The current thread releases the lock and enters the wait state until some time in the future | I won't support it | support |
Wake up a thread in the waiting queue | support | support |
Wake up all threads in the waiting queue | support | support |
2. Condition interface
Let's take a look at the definition of the Condition interface
public interface Condition { //Wait. The current thread is in a waiting state until it receives the signal or before it is interrupted void await() throws InterruptedException; //Wait. The current thread is in a waiting state until it receives the signal and does not respond to the interrupt void awaitUninterruptibly(); //Wait. The current thread is in a waiting state before receiving the signal, or before interrupting, or before reaching the specified waiting time. The return value = nanosTimeout - the actual time consumed. The return value < = 0 indicates timeout long awaitNanos(long nanosTimeout) throws InterruptedException; //Wait. The current thread is in a waiting state before receiving the signal, or before interrupting, or before reaching the specified waiting time. The boolean type is returned to indicate whether the acceptance signal is obtained within the specified time, and false indicates timeout. //It is different from the previous method in that the timeout time unit can be customized, which is equivalent to awaitnanos (unit. Tonanos (time)) > 0 boolean await(long time, TimeUnit unit) throws InterruptedException; //Wait: the current thread is interrupted or wakes up a waiting thread before receiving the signal. If all threads are waiting for this condition, select one of them to wake up. The thread must reacquire the lock before returning from await. Waiting until the deadline boolean awaitUntil(Date deadline) throws InterruptedException; //Wake up a waiting thread. If all threads are waiting for this Condition, select one of them to wake up. Before returning from await, the thread must re acquire the Condition related lock. void signal(); //Wake up all waiting threads. If all threads are waiting for this Condition, wake up all threads. Before returning from await, each thread must re acquire the Condition related lock. void signalAll(); }
Condition is a generalized condition queue. It provides a more flexible wait / notify mode for the thread. The thread performs the suspend operation after calling the await method, and will not be awakened until a condition that the thread waits for is true. Condition must be used with locks because access to shared state variables occurs in a multithreaded environment. An instance of condition must be bound to a Lock, so condition is generally used as the internal implementation of Lock.
2, Basic use
Take the basic B thread waking up A thread as an example
1. Object waiting for wake-up
public static void main(String[] args) throws InterruptedException { //Object lock Object obj=new Object(); //Create thread A Thread threadA = new Thread(()->{ System.out.println("A Attempt to acquire lock..."); synchronized (obj){ System.out.println("A Lock acquisition succeeded!"); try { TimeUnit.SECONDS.sleep(1); System.out.println("A Start releasing the lock and start waiting..."); //Thread A starts waiting obj.wait(); System.out.println("A Be informed to continue running until the end."); } catch (InterruptedException e) { e.printStackTrace(); } } }); //Create thread B Thread threadB = new Thread(()->{ System.out.println("B Attempt to acquire lock..."); synchronized (obj){ System.out.println("B Lock acquisition succeeded!"); try { TimeUnit.SECONDS.sleep(3); System.out.println("B Start releasing lock..."); //Thread B starts waking up thread A obj.notify(); System.out.println("B Random notification lock A thread in the waiting queue of the object!"); } catch (InterruptedException e) { e.printStackTrace(); } } }); //Start thread A threadA.start(); //Here, in order to prevent thread A from executing notify first, thread B will never wake up TimeUnit.SECONDS.sleep(1); //Start thread B threadB.start(); }
The implementation results are as follows
A Attempt to acquire lock... A Lock acquisition succeeded! A Start releasing the lock and start waiting... B Attempt to acquire lock... B Lock acquisition succeeded! B Start releasing lock... B Random notification lock A thread in the waiting queue of the object! A Be informed to continue running until the end. Process finished with exit code 0
2. Condition waiting for wakeup
public static void main(String[] args) throws InterruptedException { //Create lock object Lock lock = new ReentrantLock(); //New condition Condition condition = lock.newCondition(); //Create thread A Thread threadA = new Thread(()->{ System.out.println("A Attempt to acquire lock..."); lock.lock(); try { System.out.println("A Lock acquisition succeeded!"); TimeUnit.SECONDS.sleep(1); System.out.println("A Start releasing the lock and start waiting..."); //Thread A starts waiting condition.await(); System.out.println("A Notified to continue running..."); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); System.out.println("A The thread has released the lock and the execution is over!"); } }); //Create thread B Thread threadB = new Thread(()->{ System.out.println("B Attempt to acquire lock..."); lock.lock(); try { System.out.println("B Lock acquisition succeeded!"); TimeUnit.SECONDS.sleep(3); System.out.println("B Start releasing lock..."); //Thread B starts waking up thread A condition.signal(); System.out.println("B Random notification lock A thread in the waiting queue of the object!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); System.out.println("B The thread has released the lock and the execution is over!"); } }); //Start thread A threadA.start(); //Here, in order to prevent thread A from executing notify first, thread B will never wake up TimeUnit.SECONDS.sleep(1); //Start thread B threadB.start(); }
The implementation results are as follows
A Attempt to acquire lock... A Lock acquisition succeeded! A Start releasing the lock and start waiting... B Attempt to acquire lock... B Lock acquired successfully! B Start releasing lock... B Random notification lock A thread in the waiting queue of the object! B The thread has released the lock and the execution is over! A Notified to continue running... A The thread has released the lock and the execution is over! Process finished with exit code 0
3, Implementation principle
1. ConditionObject class
ConditionObject is the specific implementation of Condition in java concurrency. Since the operation of Condition needs to obtain relevant locks, and AQS is the implementation basis of synchronous locks, ConditionObject is defined as the internal class of AQS.
1.1 class inheritance
ConditionObject is defined as follows
public class ConditionObject implements Condition, java.io.Serializable { }
Implement Condition and Serializable interfaces.
1.2 properties of class
It mainly contains the following attributes
//Serialization version number private static final long serialVersionUID = 1173984872572414699L; //The first node of the conditional (waiting) queue private transient Node firstWaiter; //The last node of the conditional (waiting) queue private transient Node lastWaiter;
2. Waiting queue
Each ConditionObject contains a FIFO queue. The Node type in the queue is the internal class of AQS - Node class. Each Node contains a thread reference, which is the thread waiting on the Condition object. Different from CLH synchronization queue, the waiting queue of Condition is a one-way queue, that is, each Node only contains references to the next Node, as shown in the following figure
3. Analysis of await method
Calling the await() method of Condition will make the current thread enter the waiting state, join the Condition waiting queue and release the lock at the same time. The await method code is as follows
public final void await() throws InterruptedException { //Response interrupt if (Thread.interrupted()) throw new InterruptedException(); //Adds the current thread to the waiting queue Node node = addConditionWaiter(); //Release lock int savedState = fullyRelease(node); //Interrupt mode int interruptMode = 0; //Check whether this node is in the synchronization queue. If it is not, it means that the thread is not qualified to compete for the lock. Continue to wait until it is detected that this node is in the synchronization queue while (!isOnSyncQueue(node)) { //Suspend thread LockSupport.park(this); //If it is found that the thread has been interrupted, exit directly if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Let's analyze it step by step. First, judge the response interrupt. If no interrupt exception is thrown, the addConditionWaiter method is called to add the current thread to the waiting queue. The method is as follows
private Node addConditionWaiter() { //Get tail node Node t = lastWaiter; //If t is not empty and its waiting state is not CONDITION, it means that the node is not in the waiting state and needs to be cleared if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); //Get new tail node again t = lastWaiter; } //Create a node containing the current thread with the status of CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //If t is empty, the queue is empty, so the new node is the first node if (t == null) //Set header node firstWaiter = node; //Otherwise, the queue is not empty, and the node is inserted to the last position else //Insert node to last position t.nextWaiter = node; //Set tail node lastWaiter = node; //Return node return node; } //Clear all nodes in the Condition queue whose status is not Condition private void unlinkCancelledWaiters() { //Get header node Node t = firstWaiter; Node trail = null; //Clear the nodes whose waiting state is not CONDITION from beginning to end while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
After the current node is successfully added to the waiting queue, the fullyRelease method will be called to completely release the lock
final int fullyRelease(Node node) { //Fail flag boolean failed = true; try { //Get synchronization status int savedState = getState(); //If the lock is released successfully if (release(savedState)) { //The failure flag is No failed = false; //Return to the original synchronization status return savedState; } //If the lock release fails, an exception is thrown directly else { throw new IllegalMonitorStateException(); } } finally { //If the operation fails, set the node state to invalid if (failed) node.waitStatus = Node.CANCELLED; } }
After the lock is released successfully, it will spin. During the spin, it will constantly judge whether the node is in the synchronization queue. If not, it means that the thread is not qualified to compete for the lock. Continue to wait until it is detected that the node is in the synchronization queue. The code is as follows
while (!isOnSyncQueue(node)) { LockSupport.park(this); //After the thread is awakened, you need to judge whether there has been an interrupt when it is suspended. If there has been an interrupt, you can exit the loop directly if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //---------------------------The following is a detailed explanation of the method called by the above code----------------------------------- //Returns whether an interrupt has occurred and the mode of the interrupt //THROW_IE: indicates to interrupt again when exiting waiting //REINTERRUPT: indicates that an InterruptedException exception is thrown when exiting the wait private int checkInterruptWhileWaiting(Node node) { //If the thread is interrupted, execute the method transferaftercanceledwait, otherwise return 0 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } //??? I don't understand what this method does, and I don't dare to talk nonsense. If I know, my little partner can confide in me final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
Let's first look at the implementation of the isOnSyncQueue method
final boolean isOnSyncQueue(Node node) { //For the nodes that can be in the CLH queue, the status of the node will not be CONDITION (the spin is modified to SIGNAL), and the precursor cannot be empty (there must be a precursor for non head nodes in the CLH queue). If one of them is not satisfied, it means that the node must not be in the CLH queue, and false is returned if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //If the successor of the node exists, it means that it must be in the team //Why? Because the node node is inserted at the end of the waiting queue before calling this method, the next field must be null. If it is not empty, it means it is definitely not in the waiting queue, that is, in the CLH queue if (node.next != null) // If has successor, it must be on queue return true; //Look up the CLH queue from the tail to the head to see if there are node nodes return findNodeFromTail(node); } //Judge whether there are node nodes in CLH queue private boolean findNodeFromTail(Node node) { //Get node Node t = tail; //Traverse backward and forward to find out whether there is a node node for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
Why should we judge whether it is in the CLH queue? Because the node waiting in the queue will be transferred to the CLH queue when performing the notification operation, which means that this node can be awakened. Any node that needs to be awakened in the waiting queue will queue in the CLH queue to obtain the lock. So far, there are all processes in which the current thread is suspended. When other threads notify the thread to wake up, it will continue to execute. The remaining code is three if judgments, as shown below
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
-
The first if judgment: it calls the acquirequeueueueueueueued method, which is a spin process, that is, when each thread enters the synchronization queue, it will spin to observe whether it meets the conditions and obtain the synchronization state, and then it can exit from the spin process, Otherwise, continue to spin (the code details have been explained in detail in the previous AQS article and will not be repeated here. For details, refer to the article: In depth understanding of AQS implementation principle ), the returned result indicates whether the current interrupt has occurred. At the same time, judge whether the interrupt mode is not THROW_IE, if not, the interruptMode will be set to REINTERRUPT, that is, self interrupt after waiting.
-
The second if judgment: if the next waiting person of the node is not empty, all nodes in the status of cancelled will be cleared.
-
The third if judgment: interrupt the operation according to the interrupt mode
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { //If interrupt mode is THROW_IE, throw exception directly if (interruptMode == THROW_IE) throw new InterruptedException(); //If the interrupt mode is REINTERRUPT, self interrupt else if (interruptMode == REINTERRUPT) selfInterrupt(); }
So far, the whole process of await is finished. Let's summarize the execution process of await. The premise is to obtain the lock (the detailed operation of interruption is ignored here)
- 1) Wrap the current thread as a node and insert it into the end of the waiting queue of Condition
- 2) Release lock
- 3) Spin suspend: when other threads wake up the current thread, the exit condition is that the node has been moved to the CLH queue or the thread corresponding to the current node has been interrupted, otherwise continue to suspend
- 4) Obtain the lock for subsequent operations
4. signal method analysis
Calling the signal() method of Condition will wake up the node waiting in the waiting queue for the longest time (the first node in the Condition queue). Before waking up the node, it will move the node to the CLH synchronization queue. The code is as follows
public final void signal() { //Judge whether the current thread is the thread holding the lock. If not, throw an exception if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //Get header node Node first = firstWaiter; //If the header node is not empty, wake up the header node if (first != null) doSignal(first); }
To call this method, you need to judge whether the current thread has obtained the lock, otherwise you will throw an exception directly, and then call the method doSignal to wake up the head node in the queue. The code is as follows
private void doSignal(Node first) { //Because it has been determined in the outer layer that first is not empty, use do while.. do { //Remove the first and modify the head node. At the same time, if the queue is empty, you need to modify the tail node if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } //Move the node to the CLH synchronization queue and return whether the move succeeded or not 0 final boolean transferForSignal(Node node) { //Change the node state from CONDITION to initial state 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //If CAS modification fails, false is returned return false; //Adding a node to the CLH queue returns the node in front of the node node in the CLH queue Node p = enq(node); //Get the status of p int ws = p.waitStatus; //If the state of node P is greater than 0 (cancel state), directly wake up the thread corresponding to the node node. Otherwise, CAS modifies the state of node p to SIGNAL, and wakes up if the modification fails. Otherwise, it will not wake up //The guess is that if the setting fails to prevent other nodes from waking up the thread, wake it up first if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //Wake up thread LockSupport.unpark(node.thread); return true; }
The whole signal process is relatively simple. Let's summarize it
- Judge whether the current thread is the thread holding the lock. If not, throw an exception, otherwise continue to execute
- Move the head node of the waiting queue corresponding to the Condition to the CLH queue
5. Summary
In general, the implementation of Condition is to add a waiting queue. When await method is called, the current thread will be placed at the end of the waiting queue. Meanwhile, the current thread will hang and wait for other threads to wake up. When other threads call the signal method, a thread will be taken from the waiting queue and inserted into the CLH queue, followed by the conventional AQS lock acquisition process.
Other methods are as follows:
public final void awaitUninterruptibly(){...} public final long awaitNanos(long nanosTimeout){...} public final boolean await(long time, TimeUnit unit){...} public final boolean awaitUntil(Date deadline){...} public final void signalAll(){...}
The logic of these methods is basically the same as await and signal, but some interrupt neglect and timeout judgment are added. The space is limited, so I won't repeat it here. Interested partners can study it by themselves.