I Introduction to SynchronousQueue
- The SynchronousQueue is unbounded and a non buffered waiting Queue. However, due to the characteristics of the Queue itself, you must wait for other threads to remove elements before adding them; It can be considered that SynchronousQueue is a blocking Queue with a cache value of 1, but isEmpty() method always returns true, remainingCapacity() method always returns 0, remove() and removeAll() methods always return false, iterator() method always returns null, and peek() method always returns null.
- The difference between fair mode and unfair mode: if fair mode is adopted, SynchronousQueue will adopt fair lock and cooperate with a FIFO queue to block redundant producers and consumers, so as to the overall fairness strategy of the system;
- In case of unfair mode (SynchronousQueue default): SynchronousQueue adopts unfair lock and cooperates with a LIFO queue to manage redundant producers and consumers.
II SynchronousQueue data structure
- Because SynchronousQueue supports fair policy and unfair policy, there may be two data structures at the bottom: queue (realizing fair policy) and stack (realizing unfair Policy). Both queue and stack are realized through linked list. The specific data structure is as follows
- There are two types of data structures, stack and queue; The stack has a head node, and the queue has a head node and a tail node; Stack is used to implement unfair policies, and queue is used to implement fair policies.
III Source code analysis of synchronous queue
①. Class inheritance
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}
- SynchronousQueue inherits the AbstractQueue abstract class, which defines the basic operations on the queue; At the same time, the BlockingQueue interface is implemented. BlockingQueue represents a blocking queue, and its operation on the queue may throw exceptions; It also implements the Searializable interface, which can be serialized.
②. Properties of class
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // Version serial number private static final long serialVersionUID = -3223113410248163686L; // Available processors static final int NCPUS = Runtime.getRuntime().availableProcessors(); // Maximum idle time static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; // Maximum idle time of infinite waiting static final int maxUntimedSpins = maxTimedSpins * 16; // Hyperspace spin waiting threshold static final long spinForTimeoutThreshold = 1000L; // For serialization private ReentrantLock qlock; private WaitQueue waitingProducers; private WaitQueue waitingConsumers;
③. Class constructor
public SynchronousQueue() { // Unfair strategy (first in, last out) this(false); } public SynchronousQueue(boolean fair) { // Generate different structures according to the specified policy transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
④. Inner class of class_ Transferer
- Transferer is a public class of TransferStack stack and TransferQueue queue, which defines the public operation of transferring data. It is implemented by TransferStack and TransferQueue. WaitQueue, LifoWaitQueue and FifoWaitQueue are expressed to be compatible with jdk1 Legacy of the serialization policy of SynchronousQueue in version 5
abstract static class Transferer<E> { // Transfer data, put or take operation abstract E transfer(E e, boolean timed, long nanos); }
- Transferer defines the transfer operation, which is used to take or put data. The transfer method is implemented by subclasses.
⑤. Inner class of class_ TransfererStack
1. Class attributes and inheritance relationship
/** Dual stack */ static final class TransferStack<E> extends Transferer<E> { // Represents the consumer of the consumption data static final int REQUEST = 0; // Represents the producer of production data static final int DATA = 1; // Indicates matching another producer or consumer static final int FULFILLING = 2; // Head node volatile SNode head;
- TransferStack inherits the Transferer abstract class and implements the transfer method.
- TransferStack has three different statuses: REQUEST, which represents the consumer of consumption DATA; DATA, the producer of production DATA; Fullfilling, which means matching another producer or consumer. The operation of any thread on TransferStack belongs to one of the above three states. It also contains a head field, which represents the head node.
2. Internal class of class_ SNode class
static final class SNode { // Next node volatile SNode next; // Matching nodes volatile SNode match; // Waiting thread volatile Thread waiter; // Prime item Object item; // pattern int mode; // The item and mode fields do not need to be decorated with volatile because they are written before and read after volatile/atomic operations // Unsafe mechanics // Reflection mechanism private static final sun.misc.Unsafe UNSAFE; // Memory offset address of match field private static final long matchOffset; // Offset address of next field private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } }
- SNode class represents the nodes in the stack, and uses reflection mechanism and CAS to ensure atomicity and change the corresponding domain values.
3. Class constructor
SNode(Object item) { this.item = item; }
- This constructor only sets the item field of SNode, and other fields are the default values.
4. tryMatch function
boolean tryMatch(SNode s) { // The match field of this node is null, and the match field is successfully compared and replaced if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // Get the waiting thread of this node Thread w = waiter; if (w != null) { // There are waiting threads // Reset the waiting thread of this node to null waiter = null; // unpark wait thread LockSupport.unpark(w); } return true; } // If match is not null or CAS setting fails, compare whether the match field is equal to the s node. If it is equal, it means that the matching has been completed and the matching is successful return match == s; }
Match the s node with this node. If the matching is successful, unpark waits for the thread. The specific process is as follows
-
① Judge whether the match field of this node is null. If it is null, go to step ②; otherwise, go to step ⑤
-
② CAS sets the match field of this node as s node. If successful, go to step ③; otherwise, go to step ⑤
-
③ Judge whether the waiter field of this node is null. If not, go to step ④; otherwise, go to step ⑤
-
④ Reset the waiter field of this node to null, and the waiting thread represented by the unparkwaiter field. Go to step ⑥
-
⑤ Compare whether the match field of this node is this node. If yes, go to step ⑥; otherwise, go to step ⑦
-
⑥ Return true
-
⑦ Return false
5. isFulfilling function
- Indicates whether to include the filling tag.
// Indicates whether to include the filling tag. static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
6. transfer function
- This function is used to produce or consume an element, and the transfer function calls the awaitcomplete function.
// Used to produce or consume an element E transfer(E e, boolean timed, long nanos) { SNode s = null; // Determine the transfer mode according to e (put or take) int mode = (e == null) ? REQUEST : DATA; for (;;) { // Infinite loop // Save header node SNode h = head; if (h == null || h.mode == mode) { // The head node is null or the mode of the head node is the same as that of this transfer if (timed && nanos <= 0) { // If timed is set and the waiting time is less than or equal to 0, it means that you can't wait and need to operate immediately if (h != null && h.isCancelled()) // The head node is not null and the head node is cancelled casHead(h, h.next); // Reset the header node (the header node before Pop-Up) else // The header node is null or has not been cancelled // Return null return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // Generate an SNode node; Set the original head node as the next node of the node; Set the head node as this node // Spin or block until the s node is matched by the FulFill operation SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // The matching node is s node (s node is cancelled) // Clean up s node clean(s); // return return null; } if ((h = head) != null && h.next == s) // h is re assigned as the head node and is not null; The next field of the head node is the s node, which indicates that the matching is completed before a node is inserted into the s node // Compare and replace the head field (remove the nodes inserted before s and s nodes) casHead(h, s.next); // Return the element according to the type of this transfer return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // No fullfilling tag, trying to match if (h.isCancelled()) // Cancelled // Compare and replace the head field (pop up the head node) casHead(h, h.next); else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // Generate an SNode node; Set the original head node as the next node of the node; Set the head node as this node for (;;) { // Infinite loop // Save next node of s SNode m = s.next; if (m == null) { // The next field is null // Compare and replace the head field casHead(s, null); // Assignment s is null s = null; break; } // next field of m node SNode mn = m.next; if (m.tryMatch(s)) { // Try to match and succeed // Compare and replace the head field (pop up s node and m node) casHead(s, mn); // Return the element according to the type of this transfer return (E) ((mode == REQUEST) ? m.item : s.item); } else // Matching failed // Compare and replace the next field (pop up m node) s.casNext(m, mn); } } } else { // Header matching // Save the next field of the header node SNode m = h.next; // m and h can match if (m == null) // The next field is null // Compare and replace the head field (m is matched by other nodes, and h needs to be popped up) casHead(h, null); else { // The next field is not null // Get the next field of m node SNode mn = m.next; if (m.tryMatch(h)) // m and h match successfully // Compare and replace the head field (pop up h and m nodes) casHead(h, mn); else // Matching failed // Compare and replace the next field (remove m nodes) h.casNext(m, mn); } } } }
7. awaitFulfill function
- This function indicates that the current thread spins or blocks until the node is matched. The awaitfull function called the shouldSpin function
//Indicates that the current thread spins or blocks until the node is matched SNode awaitFulfill(SNode s, boolean timed, long nanos) { // Calculate the deadline according to the timed ID final long deadline = timed ? System.nanoTime() + nanos : 0L; // Get current thread Thread w = Thread.currentThread(); // Determine the idle waiting time according to s int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // Infinite loop to ensure successful operation if (w.isInterrupted()) // The current thread was interrupted // Cancel s node s.tryCancel(); // Get the match field of s node SNode m = s.match; if (m != null) // m is not null, there is a matching node // Return m node return m; if (timed) { // timed is set // Determine how long to wait nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // The waiting time is less than or equal to 0, and the waiting timeout // Cancel s node s.tryCancel(); // Skip the rest and continue continue; } } if (spins > 0) // Idle waiting time is greater than 0 // Do you really need to continue idling and waiting spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) // The waiting thread is null // Set the waiter thread as the current thread s.waiter = w; // establish waiter so can park next iter else if (!timed) // The timed ID is not set // Disables the current thread and sets the blocker LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // The waiting time is greater than the threshold // Disables the current thread to wait up to the specified wait time unless a license is available LockSupport.parkNanos(this, nanos); } }
8. shouldSpin function
boolean shouldSpin(SNode s) { // Get header node SNode h = head; // s is the head node, or the head node is null, or h contains the filling flag, and returns true return (h == s || h == null || isFulfilling(h.mode)); }
This function indicates that the thread contained in the current node (current thread) is waiting for idling. Idling waiting is required in the following cases
-
① The current node is the head node
-
② The header node is null
-
③ The header node is matching
9. clean function
- This function is used to remove all cancelled nodes from the top node of the stack to the node (excluding).
void clean(SNode s) { // The item of the s node is set to null s.item = null; // The waiter field is set to null s.waiter = null; // Get the next field of s node SNode past = s.next; if (past != null && past.isCancelled()) // The next field is not null and the next field is cancelled // Reset past past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled()) // From the top node of the stack to the past node (not included), remove the continuous cancellation nodes // Compare and replace the head field (pop up the cancelled node) casHead(p, p.next); // Unsplice embedded nodes while (p != null && p != past) { // Remove discontinuous cancellation nodes that were not removed in the previous step // Get the next field of p SNode n = p.next; if (n != null && n.isCancelled()) // n is not null and n is cancelled // Compare and replace the next field p.casNext(n, n.next); else // Set p to n p = n; } }
⑥. Inner class_ TransferQueue
1. Inheritance relationship and attribute of class
static final class TransferQueue<E> extends Transferer<E> { // Queue header transient volatile QNode head; // Tail node of queue transient volatile QNode tail; // Point to a cancelled node. When a node is last inserted into the queue, it may not leave the queue when it is cancelled transient volatile QNode cleanMe;
- The queue has a head node and a tail node, indicating the head and tail of the queue respectively, and also contains a domain indicating the cancellation node.
2. Internal class of class_ QNode class
static final class QNode { // Next node volatile QNode next; // Prime item volatile Object item; // Wait thread volatile Thread waiter; // Is it data final boolean isData; // Constructor QNode(Object item, boolean isData) { // Initialize item field this.item = item; // Initialize isData domain this.isData = isData; } // Compare and replace the next field boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Compare and replace the item field boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } // Cancel this node and set the item field to itself void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } // Cancelled boolean isCancelled() { // Is the item field equal to itself return item == this; } // Is it not in the queue boolean isOffList() { // Is next equal to itself return next == this; } // Reflection mechanism private static final sun.misc.Unsafe UNSAFE; // Offset address of item field private static final long itemOffset; // Offset address of next field private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
3. Class constructor
TransferQueue() { // Initialize a sentinel node QNode h = new QNode(null, false); // initialize to dummy node. // Set header node head = h; // Set tail node tail = h; }
- The constructor is used to initialize a queue and a sentinel node. Both the head node and the tail node point to the sentinel node.
4. transfer function
- This function is used to produce or consume an element
// This function is used to produce or consume an element E transfer(E e, boolean timed, long nanos) { QNode s = null; // Determine the type of transfer (put or take) boolean isData = (e != null); for (;;) { // Infinite loop to ensure successful operation // Get tail node QNode t = tail; // Get header node QNode h = head; if (t == null || h == null) // See uninitialized head and tail nodes / / saw uninitialized value // Skip the rest and continue continue; // spin if (h == t || t.isData == isData) { // The head node is equal to the tail node, or the tail node has the same mode as the current node. / / empty or same mode // Get the next field of the tail node QNode tn = t.next; if (t != tail) // t is not the tail node. It is inconsistent. Try again. / / inconsistent read continue; if (tn != null) { // tn is not null. Other threads have added tn nodes / / lagging tail // Set the new tail node to tn advanceTail(t, tn); // Skip the rest and continue continue; } if (timed && nanos <= 0) // If timed is set and the waiting time is less than or equal to 0, it means that you can't wait and need to operate immediately. / / can't wait // Return null return null; if (s == null) // s is null // Create a new node and assign it to s s = new QNode(e, isData); if (!t.casNext(null, s)) // Failed to set the next field of node t. / / failed to link in // Skip the rest and continue continue; // Set new tail node advanceTail(t, s); // swing tail and wait // Spins/blocks until node s is fulfilled // Spin or block until the s node is matched Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // x is equal to s, indicating that it has been cancelled / / wait was cancelled // eliminate clean(t, s); // Return null return null; } if (!s.isOffList()) { // Node s has not left the queue yet. / / not already unlinked // Set new header node advanceHead(t, s); // unlink if head if (x != null) // x is not null / / and forget fields // Set the item of s node s.item = s; // Set the waiter field of s node to null s.waiter = null; } return (x != null) ? (E)x : e; } else { // Complementary mode / / complementary mode // Get the next field of the header node (matching node) QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) // t is not the tail node or m is null or h is not the head node (inconsistent) // Skip the rest and continue continue; // inconsistent read // Get the element field of m node Object x = m.item; if (isData == (x != null) || // M nodes are matched / / m already completed x == m || // m node canceled / / m cancelled !m.casItem(x, e)) { // CAS operation failed / / lost CAS advanceHead(h, m); // The queue header node exits the queue and tries again. / / dequeue and retry continue; } // Match succeeded. Set a new header node advanceHead(h, m); // successfully fulfilled // Wait thread corresponding to unpark m node LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } }
5. awaitFulfill function
- This function indicates that the current thread spins or blocks until the node is matched
//This function indicates that the current thread spins or blocks until the node is matched Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // Calculate the deadline according to the timed ID final long deadline = timed ? System.nanoTime() + nanos : 0L; // Get current thread Thread w = Thread.currentThread(); // Calculate spin time int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // Infinite loop to ensure successful operation if (w.isInterrupted()) // The current thread was interrupted // cancel s.tryCancel(e); // Get element field of s Object x = s.item; if (x != e) // Element is not e // return return x; if (timed) { // timed is set // Calculate the waiting time nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // The waiting time is less than or equal to 0 // cancel s.tryCancel(e); // Skip the rest and continue continue; } } if (spins > 0) // Idle time greater than 0 // Reduce idling time --spins; else if (s.waiter == null) // The waiting thread is null // Set wait thread s.waiter = w; else if (!timed) // The timed ID is not set // Disables the current thread and sets the blocker LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // The waiting time is greater than the threshold // Disables the current thread to wait up to the specified wait time unless a license is available LockSupport.parkNanos(this, nanos); } }
6. clean function
- This function is used to remove nodes that have been cancelled
// This function is used to remove nodes that have been cancelled void clean(QNode pred, QNode s) { // Set the wait thread to null s.waiter = null; /** * At any time, the last inserted node cannot be deleted in order to meet this condition * If the s node cannot be deleted, we set the precursor of the s node as the cleanMe node * Delete the previously saved version. At least s nodes or previously saved nodes can be deleted * So it always ends */ while (pred.next == s) { // The next field of pred is s / / return early if already unlinked // Get header node QNode h = head; // Gets the next field of the header node QNode hn = h.next; // Absorb cancelled first node as head if (hn != null && hn.isCancelled()) { // hn is not null and hn is cancelled // Set new header node advanceHead(h, hn); // Skip the rest and continue continue; } // Obtain the tail node to ensure the read consistency of the tail node QNode t = tail; // Ensure consistent read for tail if (t == h) // The tail node is the head node, indicating that the queue is empty // return return; // Get the next field of the tail node QNode tn = t.next; if (t != tail) // t is not the tail node. It is inconsistent. Try again // Skip the rest and continue continue; if (tn != null) { // tn is not null // Set new tail node advanceTail(t, tn); // Skip the rest and continue continue; } if (s != t) { // S is not a tail node, remove s / / if not tail, try to unsplice QNode sn = s.next; if (sn == s || pred.casNext(s, sn)) return; } // Get cleanMe node QNode dp = cleanMe; if (dp != null) { // dp is not null. Disconnect the previously cancelled node // Get the next field of dp QNode d = dp.next; QNode dn; if (d == null || // d is gone or d == dp || // d is off list or !d.isCancelled() || // d not cancelled or (d != t && // d not tail and (dn = d.next) != null && // has successor dn != d && // that is on list dp.casNext(d, dn))) // d unspliced casCleanMe(dp, null); if (dp == pred) return; // s is already saved node } else if (casCleanMe(null, pred)) return; // Postpone cleaning s } }
⑦. Core function analysis
// Add the specified element to this queue and wait for another thread to receive it if necessary public void put(E e) throws InterruptedException { // If e is null, an exception is thrown if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { // Transfer operation // Interrupt current thread Thread.interrupted(); throw new InterruptedException(); } } // Inserts the specified element into this queue and, if necessary, waits for the specified time for another thread to receive it public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // If e is null, an exception is thrown if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) // Transfer operation return true; if (!Thread.interrupted()) // The current thread is not interrupted // return return false; throw new InterruptedException(); } // If another thread is waiting to receive the specified element, the specified element is inserted into this queue public boolean offer(E e) { // If e is null, an exception is thrown if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; // Transfer operation } // Gets and removes the header of this queue, waiting for another thread to insert it if necessary public E take() throws InterruptedException { // Transfer operation E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } // Gets and removes the header of this queue and waits for the specified time if necessary for another thread to insert it public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) // The element is not null or the current thread is not interrupted return e; throw new InterruptedException(); } // Gets and removes the header of the queue if another thread is currently using an element public E poll() { return transferer.transfer(null, true, 0); } // Always return true public boolean isEmpty() { return true; } // Always return 0 public int size() { return 0; } // Always return 0 public int remainingCapacity() { return 0; } // Do nothing public void clear() { } // Always return false public boolean contains(Object o) { return false; } // Always return false public boolean remove(Object o) { return false; } // Returns false unless the given collection is empty public boolean containsAll(Collection<?> c) { return c.isEmpty(); } // Always return false public boolean removeAll(Collection<?> c) { return false; } // Always return false public boolean retainAll(Collection<?> c) { return false; } // Always return null public E peek() { return null; } // Returns an empty iterator, where hasNext always returns false public Iterator<E> iterator() { return Collections.emptyIterator(); } // public Spliterator<E> spliterator() { return Spliterators.emptySpliterator(); } // Returns an array of length 0 public Object[] toArray() { return new Object[0]; } // Sets the 0th element of the specified array to null (if the array has a length other than 0) and returns it public <T> T[] toArray(T[] a) { if (a.length > 0) a[0] = null; return a; } // Remove all available elements from this queue and add them to the given collection public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); int n = 0; for (E e; (e = poll()) != null;) { c.add(e); ++n; } return n; } // Removes up to a given number of available elements from this queue and adds them to the given collection public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); int n = 0; for (E e; n < maxElements && (e = poll()) != null;) { c.add(e); ++n; } return n; }
- The functions of SynchronousQueue largely rely on the transfer function of TransferStack or TransferQueue. Therefore, you can understand the principle of SynchronousQueue by understanding the transfer function.
IV SynchronousQueue case analysis_ Unfair strategy
package com.xizi.queue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>(); Producer p1 = new Producer("p1", queue, 10); Producer p2 = new Producer("p2", queue, 50); Consumer c1 = new Consumer("c1", queue); Consumer c2 = new Consumer("c2", queue); c1.start(); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } c2.start(); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } p1.start(); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } p2.start(); } static class Producer extends Thread { private SynchronousQueue<Integer> queue; private int n; public Producer(String name, SynchronousQueue<Integer> queue, int n) { super(name); this.queue = queue; this.n = n; } public void run() { System.out.println(getName() + " offer result " + queue.offer(n)); } } static class Consumer extends Thread { private SynchronousQueue<Integer> queue; public Consumer(String name, SynchronousQueue<Integer> queue) { super(name); this.queue = queue; } public void run() { try { System.out.println(getName() + " take result " + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
- If two producers p1 and p2 and two consumers c1 and c2 are started in the order of c1, c2, p1 and p2, and each thread sleeps for 100ms after starting, there may be the following timing diagram
①. c1 executes the take operation. The main function calls are as follows
- Synchronous queue adopts unfair strategy, that is, the bottom layer adopts stack structure.
- After c1 thread enters awaitcomplete, it will idle and wait until the idle time elapses, and it will call locksupport The park function disables the current thread (c1) until the license is available.
②. c2 executes the take operation. The main function calls are as follows
- After the c2 thread enters awaitcomplete, it will idle and wait until the idle time elapses, and it will call locksupport The park function disables the current thread (c2) until the license is available. At this time, there are two nodes in the stack, the node where c2 thread is located and the node where c1 thread is located.
③. p1 thread executes the offer(10) operation. The main function calls are as follows
- After the offer(10) operation, the node where the c2 thread is located matches the head node (production data of the head node and consumption data of the node where the c2 thread is located). The c2 thread is unpark and can continue to run, while the c1 thread is still in the park (unfair policy).
- After the c2 thread is unparked, it continues to run. The main function calls are as follows (since the c2 thread is unparked in the awaitfull function, the recovery is also in the awaitfull function)
- When c2 thread recovers from unpark, the structure is as shown in the above figure. First, it returns from awaitcomplete function, then 10 from transfer function, and then 10 from take function.
④. p2 thread executes the offer(50) operation. The main function calls are as follows
- After the offer(50) operation is executed, the node where the c1 thread is located matches the head node (production data of the head node and consumption data of the node where the c1 thread is located). The c1 thread is unpark and can continue to run.
- After the c1 thread is unparked, it continues to run. The main function calls are as follows (since the c1 thread is unparked in the awaitfull function, the recovery is also in the awaitfull function)
V SynchronousQueue case analysis_ Fair strategy
Vi SynchronousQueue case analysis_ Change the starting sequence between c1, c2, p1 and p2
- Start sequence P1 - > C1 - > P2 - > C2
- Only the c1 thread is matched, the p1 thread stores the elements and returns false directly, because there is no consumer thread waiting at this time, while the p2 thread matches the c1 thread. The p2 thread stores the elements successfully, and the c1 thread obtains the elements successfully. At this time, the c2 thread is still in the park state, and the application cannot end normally.
- Therefore, it can be seen that there must be a fetch operation and then a save operation before the two can match correctly. If the save operation is followed by the fetch operation, the matching cannot succeed at this time and will be blocked. The fetch operation expects the next save operation to match.