The source code of Wait Notify NotifyAll in the Object class in Java is as follows:
/** * Thread waiting * @param var1 millisecond * @param var3 nanosecond */ public final void wait(long var1, int var3) throws InterruptedException { if (var1 < 0L) { throw new IllegalArgumentException("timeout value is negative"); } else if (var3 >= 0 && var3 <= 999999) { //Nanosecond > 0, millisecond direct++ if (var3 > 0) { ++var1; } //Call native method this.wait(var1); } else { throw new IllegalArgumentException("nanosecond timeout value out of range"); } } /** * native Method thread wait */ public final native void wait(long var1) throws InterruptedException; /** * native Method single thread wake-up */ public final native void notify(); /** * native Method wakes up all threads in the waiting pool */ public final native void notifyAll();
Conditions before parsing the source code:
Object lock ObjectMonitor has two queues: waiting queue and synchronization queue
wait method:
The thread waits, releases the object lock, joins the waiting queue, and then enters the park, waiting for other threads to release the lock unpark
synchronized (a) { a.wait(); }
Equivalent to
moniter.enter //Get object lock { 1.Determine whether the lock exists 2.Judge interrupt status 3.establish node Join the waiting queue 4.moniter.exit(According to different strategies, get the header node thread from the synchronization queue aļ¼Then execute the thread a of event.unpark Wake up mechanism) 5.This thread executes event.park Wait for other threads to wake up 6.You don't need to wake up if you don't need to throw an exception } moniter.exit //Release the lock and wake up the next object in the synchronization queue
- CHECK_OWNER judges whether the lock exists, and throws an exception if it does not exist. If Synchronize is not added, an IllegalMonitorStateException will be thrown
#define CHECK_OWNER() do { if (THREAD != _owner) { if (THREAD->is_lock_owned((address) _owner)) { _owner = THREAD ; /* Convert from basiclock addr to Thread addr */ _recursions = 0; OwnerIsThread = 1 ; } else { TEVENT (Throw IMSX) ; THROW(vmSymbols::java_lang_IllegalMonitorStateException()); } } } while (false)
- Call is_interrupted() judges and clears the thread interrupt status. If the interrupt status is true, an interrupt exception is thrown and the thread ends
//Call is_interrupted() judges and clears the thread interrupt status. If the interrupt status is true, an interrupt exception is thrown and the thread ends if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { ... TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; }
- Create a node with spin lock and put it into the queue
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") AddWaiter (&node) Thread::SpinRelease (&_WaitSetLock)
- Exit monitor (self)
intptr_t save = _recursions; // Record old recursion times _waiters++; // waiters self increasing _recursions = 0; // Set recurrence level to be 1 exit (Self) ; // Exit monitor
- Using parkevent Park method blocking waiting signal reminder
if (millis <= 0) { // Call the park() method to block the thread Self->_ParkEvent->park () ; } else { // Call the park() method to block the thread within the timeout ret = Self->_ParkEvent->park (millis) ; }
- To judge whether an interrupt is needed, parkevent Unpark wake-up to judge whether it was initiated by interrupt or notify
if (!WasNotified) { if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } }
The essence of wait is to call the wait method of ObjectMonitor
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { Thread * const Self = THREAD ; assert(Self->is_Java_thread(), "Must be Java thread!"); JavaThread *jt = (JavaThread *)THREAD; DeferredInitialize () ; // Throw IMSX or IEX. CHECK_OWNER(); //Call is_interrupted() judges and clears the thread interrupt status. If the interrupt status is true, an interrupt exception is thrown and the thread ends if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { //post monitor waited event //Note that this is the past tense. It's over if (JvmtiExport::should_post_monitor_waited()) { //Note: the parameter 'false' is passed here because the wait will not time out due to thread interruption JvmtiExport::post_monitor_waited(jt, this, false); } TEVENT (Wait - Throw IEX) ; THROW(vmSymbols::java_lang_InterruptedException()); return ; } TEVENT (Wait) ; assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; jt->set_current_waiting_monitor(this); // create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. //Create a node and put it into the queue //The key is that after reset(), but before park(), you must check for pending interrupts ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); //In this case, the waiting queue is a circular two-way linked list, but it can also be a priority queue or any data structure. //_ WaitSetLock protects the waiting queue //Generally, the wait queue can only be accessed by the owner of the monitor * except *, but it is also possible when park() returns due to interrupt timeout. //The competition is very small, so use a spin lock instead of a heavyweight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; if ((SyncFlags & 4) == 0) { _Responsible = NULL ; } intptr_t save = _recursions; // Record old recursion times _waiters++; // waiters self increasing _recursions = 0; // Set recurrence level to be 1 exit (Self) ; // Exit monitor guarantee (_owner != Self, "invariant") ; //Once the ownership of ObjectMonitor is removed in the exit() call above, //Another thread can enter ObjectMonitor and execute notify() and exit() object monitors. //If another thread's exit() call selects this thread as its successor, and this thread is publishing monitor_ CONTENDED_ unpark() call occurs during exit, //Then we use RawMonitors to run event risk processing and unpark() //To avoid this problem, we republish the event, even if the original unpark() is not used, //This will not cause any harm, because a successor has been selected for this monitor. if (node._notified != 0 && _succ == Self) { node._event->unpark(); } // The thread is on the WaitSet list - now park() it. // On MP systems it's conceivable that a brief spin before we park // could be profitable. // // TODO-FIXME: change the following logic to a loop of the form // while (!timeout && !interrupted && _notified == 0) park() int ret = OS_OK ; int WasNotified = 0 ; { // State transition wrappers OSThread* osthread = Self->osthread(); OSThreadWaitState osts(osthread, true); { ThreadBlockInVM tbivm(jt); // Thread is in thread_blocked state and oop access is unsafe. //The thread is blocked and oop access is unsafe jt->set_suspend_equivalent(); if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) { // Intentionally empty } else if (node._notified == 0) { if (millis <= 0) { // Call the park() method to block the thread Self->_ParkEvent->park () ; } else { // Call the park() method to block the thread within the timeout ret = Self->_ParkEvent->park (millis) ; } } // were we externally suspended while we were waiting? if (ExitSuspendEquivalent (jt)) { // TODO-FIXME: add -- if succ == Self then succ = null. jt->java_suspend_self(); } } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm //When the thread is not waiting on the queue, double check locking is used to avoid acquisition_ WaitSetLock if (node.TState == ObjectWaiter::TS_WAIT) { Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ; if (node.TState == ObjectWaiter::TS_WAIT) { DequeueSpecificWaiter (&node) ; // unlink from WaitSet assert(node._notified == 0, "invariant"); node.TState = ObjectWaiter::TS_RUN ; } Thread::SpinRelease (&_WaitSetLock) ; } //From the perspective of this thread, Node's TState is stable, //No other thread can modify TState asynchronously guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ; OrderAccess::loadload() ; if (_succ == Self) _succ = NULL ; WasNotified = node._notified ; // Reentry phase -- reacquire the monitor. // Re enter contested monitor after object wait(). // retain OBJECT_WAIT state until re-enter successfully completes // Thread state is thread_in_vm and oop access is again safe, // although the raw address of the object may have changed. // (Don't cache naked oops over safepoints, of course). // post monitor waited event. //Note that this is the past tense. It's over if (JvmtiExport::should_post_monitor_waited()) { JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT); } OrderAccess::fence() ; assert (Self->_Stalled != 0, "invariant") ; Self->_Stalled = 0 ; assert (_owner != Self, "invariant") ; ObjectWaiter::TStates v = node.TState ; if (v == ObjectWaiter::TS_RUN) { enter (Self) ; } else { guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ; ReenterI (Self, &node) ; node.wait_reenter_end(this); } // Self has reacquired the lock. // Lifecycle - the node representing Self must not appear on any queues. // Node is about to go out of scope, but even if it were immediate we wouldn't // Want residual elements associated with this thread left on any lists guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ; assert (_owner == Self, "invariant") ; assert (_succ != Self , "invariant") ; } // OSThreadWaitState() jt->set_current_waiting_monitor(NULL); guarantee (_recursions == 0, "invariant") ; _recursions = save; // restore the old recursion count _waiters--; // decrement the number of waiters // Verify a few postconditions assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; if (SyncFlags & 32) { OrderAccess::fence() ; } //Check whether notification notify occurs // After returning from the park() method, judge whether it is because of the interrupt return, and call again // thread::is_interrupted(Self, true) determines and clears the thread interrupt status // If the interrupt status is true, throw an interrupt exception and end. if (!WasNotified) { // no, it could be timeout or Thread.interrupt() or both // check for interrupt event, otherwise it is timeout if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) { TEVENT (Wait - throw IEX from epilog) ; THROW(vmSymbols::java_lang_InterruptedException()); } } //Note: false wake-up will be regarded as timeout; Monitor notifications take precedence over thread interrupts. }
notify method: obtain the first node from the waiting queue and then join the synchronization queue. It has no function of releasing the lock, which is provided by synchronized itself (important)
synchronized (a) { a.notify(); }
amount to
moniter.enter //Get object lock { 1.Determine whether the lock exists 2.Get the first node from the waiting queue 3.According to different policy Add policy to cxq perhaps entryList Synchronization queue } moniter.exit //Release the lock and wake up the next object in the synchronization queue
- CHECK_OWNER judges whether the lock exists, and throws an exception if it does not exist. If Synchronize is not added, an IllegalMonitorStateException will be thrown
#define CHECK_OWNER() do { if (THREAD != _owner) { if (THREAD->is_lock_owned((address) _owner)) { _owner = THREAD ; /* Convert from basiclock addr to Thread addr */ _recursions = 0; OwnerIsThread = 1 ; } else { TEVENT (Throw IMSX) ; THROW(vmSymbols::java_lang_IllegalMonitorStateException()); } } } while (false)
- Remove the first node from the waiting queue
ObjectWaiter * iterator = DequeueWaiter() ;
- According to different policies, add the nodes waiting for the column to the synchronization queue
if (Policy == 0) { // Prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } }......
The essence of notify is to call the notify method of ObjectMonitor
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { TEVENT (Notify1 - Transfer) ; guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; guarantee (iterator->_notified == 0, "invariant") ; if (Policy != 4) { iterator->TState = ObjectWaiter::TS_ENTER ; } iterator->_notified = 1 ; ObjectWaiter * List = _EntryList ; if (List != NULL) { assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } if (Policy == 0) { // Prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // Append to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { //Consideration: Currently, to get the tail of EntryList, you need to traverse the whole linked list //Convert tail access to CDLL instead of using the current DLL, so that the access time is fixed. ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } } else if (Policy == 2) { // prepend to cxq // Prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } } else if (Policy == 3) { // Append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } if (Policy < 4) { iterator->wait_reenter_begin(this); } // _WaitSetLock protects the wait queue, not the EntryList. We could // move the add-to-EntryList operation, above, outside the critical section // protected by _WaitSetLock. In practice that's not useful. With the // exception of wait() timeouts and interrupts the monitor owner // is the only thread that grabs _WaitSetLock. There's almost no contention // on _WaitSetLock so it's not profitable to reduce the length of the // critical section. } Thread::SpinRelease (&_WaitSetLock) ; if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) { ObjectMonitor::_sync_Notifications->inc() ; } }
notifyAll method: similar to the Notify method, it only uses the for loop to add all the nodes of the waiting queue to the synchronization queue. It has no function of releasing the lock, which is provided by synchronized itself
void ObjectMonitor::notifyAll(TRAPS) { CHECK_OWNER(); ObjectWaiter* iterator; if (_WaitSet == NULL) { TEVENT (Empty-NotifyAll) ; return ; } DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; int Tally = 0 ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ; for (;;) { iterator = DequeueWaiter () ; if (iterator == NULL) break ; TEVENT (NotifyAll - Transfer1) ; ++Tally ; // Disposition - what might we do with iterator ? // a. add it directly to the EntryList - either tail or head. // b. push it onto the front of the _cxq. // For now we use (a). guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ; guarantee (iterator->_notified == 0, "invariant") ; iterator->_notified = 1 ; if (Policy != 4) { iterator->TState = ObjectWaiter::TS_ENTER ; } ObjectWaiter * List = _EntryList ; if (List != NULL) { assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } if (Policy == 0) { // prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } } else if (Policy == 2) { // prepend to cxq // prepend to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } if (Policy < 4) { iterator->wait_reenter_begin(this); } // _WaitSetLock protects the wait queue, not the EntryList. We could // move the add-to-EntryList operation, above, outside the critical section // protected by _WaitSetLock. In practice that's not useful. With the // exception of wait() timeouts and interrupts the monitor owner // is the only thread that grabs _WaitSetLock. There's almost no contention // on _WaitSetLock so it's not profitable to reduce the length of the // critical section. } Thread::SpinRelease (&_WaitSetLock) ; if (Tally != 0 && ObjectMonitor::_sync_Notifications != NULL) { ObjectMonitor::_sync_Notifications->inc(Tally) ; } }
Question 1: parkevent exists in the previous link of wait Park blocks waiting for wake-up, but the essence of notify is to add the nodes in the waiting queue to the synchronization queue nodes, but there are many nodes in the synchronization queue. Who will use them and where parkevent is called Unpark wakes up the thread and continues down?
Question 2: the wait method simply exits the object lock. How does it give the object lock to other threads? Because the transfer of the object lock only occurs in the two threads of wait and notify. There is no third party to coordinate. How does the object lock flow.
In fact, the essence is a question: how is the object lock transferred?
Key point: the wait method itself calls objectmonitor once For the exit method, the Synchronized keyword itself also has an objectmonitor Exit method.
void ATTR ObjectMonitor::exit(TRAPS) { ...... //Extract the node from the synchronization queue according to the QMode policy if (QMode == 2 && _cxq != NULL) { // QMode == 2 : cxq has precedence over EntryList. // Try to directly wake a successor from the cxq. // If successful, the successor will need to unlink itself from cxq. w = _cxq ; assert (w != NULL, "invariant") ; assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ; ExitEpilog (Self, w) ; return ; } if (QMode == 3 && _cxq != NULL) { // Aggressively drain cxq into EntryList at the first opportunity. // This policy ensure that recently-run threads live at the head of EntryList. // Drain _cxq into EntryList - bulk transfer. // First, detach _cxq. // The following loop is tantamount to: w = swap (&cxq, NULL) w = _cxq ; for (;;) { assert (w != NULL, "Invariant") ; ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != NULL , "invariant") ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } // Append the RATs to the EntryList // TODO: organize EntryList as a CDLL so we can locate the tail in constant-time. ObjectWaiter * Tail ; for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ; if (Tail == NULL) { _EntryList = w ; } else { Tail->_next = w ; w->_prev = Tail ; } // Fall thru into code that tries to wake a successor from EntryList } ...... w = _EntryList ; if (w != NULL) { guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ExitEpilog (Self, w) ; return ; } } }
The key point is to get the corresponding node and execute the ExitEpilog method to wake up the wait ing node
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) { { assert (_owner == Self, "invariant") ; ParkEvent * Trigger = Wakee->_event ; .... //This corresponds to the parkevent used by the wait method park Trigger->unpark() ; //unpark wakes up the wait thread ..... if (ObjectMonitor::_sync_Parks != NULL) { ObjectMonitor::_sync_Parks->inc() ; } }
wait method process