SynchronousQueue---JDK1.8 source code analysis

I Introduction to SynchronousQueue

  • The SynchronousQueue is unbounded and a non buffered waiting Queue. However, due to the characteristics of the Queue itself, you must wait for other threads to remove elements before adding them; It can be considered that SynchronousQueue is a blocking Queue with a cache value of 1, but isEmpty() method always returns true, remainingCapacity() method always returns 0, remove() and removeAll() methods always return false, iterator() method always returns null, and peek() method always returns null.
  • The difference between fair mode and unfair mode: if fair mode is adopted, SynchronousQueue will adopt fair lock and cooperate with a FIFO queue to block redundant producers and consumers, so as to the overall fairness strategy of the system;
  • In case of unfair mode (SynchronousQueue default): SynchronousQueue adopts unfair lock and cooperates with a LIFO queue to manage redundant producers and consumers.

II SynchronousQueue data structure

  • Because SynchronousQueue supports fair policy and unfair policy, there may be two data structures at the bottom: queue (realizing fair policy) and stack (realizing unfair Policy). Both queue and stack are realized through linked list. The specific data structure is as follows

  • There are two types of data structures, stack and queue; The stack has a head node, and the queue has a head node and a tail node; Stack is used to implement unfair policies, and queue is used to implement fair policies.

III Source code analysis of synchronous queue

①. Class inheritance


public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {}
  • SynchronousQueue inherits the AbstractQueue abstract class, which defines the basic operations on the queue; At the same time, the BlockingQueue interface is implemented. BlockingQueue represents a blocking queue, and its operation on the queue may throw exceptions; It also implements the Searializable interface, which can be serialized.

②. Properties of class

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    // Version serial number
    private static final long serialVersionUID = -3223113410248163686L;
    // Available processors
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    // Maximum idle time
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
    // Maximum idle time of infinite waiting
    static final int maxUntimedSpins = maxTimedSpins * 16;
    // Hyperspace spin waiting threshold
    static final long spinForTimeoutThreshold = 1000L;

    // For serialization
    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;

③. Class constructor

    public SynchronousQueue() {
        // Unfair strategy (first in, last out)
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        // Generate different structures according to the specified policy
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

④. Inner class of class_ Transferer

  • Transferer is a public class of TransferStack stack and TransferQueue queue, which defines the public operation of transferring data. It is implemented by TransferStack and TransferQueue. WaitQueue, LifoWaitQueue and FifoWaitQueue are expressed to be compatible with jdk1 Legacy of the serialization policy of SynchronousQueue in version 5
    abstract static class Transferer<E> {
        // Transfer data, put or take operation
        abstract E transfer(E e, boolean timed, long nanos);
    }

  • Transferer defines the transfer operation, which is used to take or put data. The transfer method is implemented by subclasses.

⑤. Inner class of class_ TransfererStack

1. Class attributes and inheritance relationship

    /** Dual stack */
    static final class TransferStack<E> extends Transferer<E> {

        // Represents the consumer of the consumption data
        static final int REQUEST    = 0;

        // Represents the producer of production data
        static final int DATA       = 1;

        // Indicates matching another producer or consumer
        static final int FULFILLING = 2;

        // Head node
        volatile SNode head;

  • TransferStack inherits the Transferer abstract class and implements the transfer method.
  • TransferStack has three different statuses: REQUEST, which represents the consumer of consumption DATA; DATA, the producer of production DATA; Fullfilling, which means matching another producer or consumer. The operation of any thread on TransferStack belongs to one of the above three states. It also contains a head field, which represents the head node.

2. Internal class of class_ SNode class

static final class SNode {
            // Next node
            volatile SNode next;
            // Matching nodes
            volatile SNode match;
            // Waiting thread
            volatile Thread waiter;
            // Prime item
            Object item;
            // pattern
            int mode;
            // The item and mode fields do not need to be decorated with volatile because they are written before and read after volatile/atomic operations

       
            // Unsafe mechanics
            // Reflection mechanism
            private static final sun.misc.Unsafe UNSAFE;
            // Memory offset address of match field
            private static final long matchOffset;
            // Offset address of next field
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = SNode.class;
                    matchOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("match"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
  • SNode class represents the nodes in the stack, and uses reflection mechanism and CAS to ensure atomicity and change the corresponding domain values.

3. Class constructor

SNode(Object item) {
         this.item = item;
 }
  • This constructor only sets the item field of SNode, and other fields are the default values.

4. tryMatch function

            boolean tryMatch(SNode s) {
                // The match field of this node is null, and the match field is successfully compared and replaced
                if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    // Get the waiting thread of this node
                    Thread w = waiter;
                    if (w != null) {    // There are waiting threads
                        // Reset the waiting thread of this node to null
                        waiter = null;
                        // unpark wait thread
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                // If match is not null or CAS setting fails, compare whether the match field is equal to the s node. If it is equal, it means that the matching has been completed and the matching is successful
                return match == s;
            }

Match the s node with this node. If the matching is successful, unpark waits for the thread. The specific process is as follows

  • ① Judge whether the match field of this node is null. If it is null, go to step ②; otherwise, go to step ⑤

  • ② CAS sets the match field of this node as s node. If successful, go to step ③; otherwise, go to step ⑤

  • ③ Judge whether the waiter field of this node is null. If not, go to step ④; otherwise, go to step ⑤

  • ④ Reset the waiter field of this node to null, and the waiting thread represented by the unparkwaiter field. Go to step ⑥

  • ⑤ Compare whether the match field of this node is this node. If yes, go to step ⑥; otherwise, go to step ⑦

  • ⑥ Return true

  • ⑦ Return false

5. isFulfilling function

  • Indicates whether to include the filling tag.

        // Indicates whether to include the filling tag.
        static boolean isFulfilling(int m) {
            return (m & FULFILLING) != 0;
        }

6. transfer function

  • This function is used to produce or consume an element, and the transfer function calls the awaitcomplete function.

 // Used to produce or consume an element
        E transfer(E e, boolean timed, long nanos) {
            SNode s = null;
            // Determine the transfer mode according to e (put or take)
            int mode = (e == null) ? REQUEST : DATA;
            for (;;) { // Infinite loop
                // Save header node
                SNode h = head;
                if (h == null || h.mode == mode) {  // The head node is null or the mode of the head node is the same as that of this transfer
                    if (timed && nanos <= 0) { // If timed is set and the waiting time is less than or equal to 0, it means that you can't wait and need to operate immediately
                        if (h != null && h.isCancelled()) // The head node is not null and the head node is cancelled
                            casHead(h, h.next); // Reset the header node (the header node before Pop-Up)
                        else // The header node is null or has not been cancelled
                            // Return null
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) { // Generate an SNode node; Set the original head node as the next node of the node; Set the head node as this node
                        // Spin or block until the s node is matched by the FulFill operation
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) { // The matching node is s node (s node is cancelled)
                            // Clean up s node
                            clean(s);
                            // return
                            return null;
                        }
                        if ((h = head) != null && h.next == s) // h is re assigned as the head node and is not null; The next field of the head node is the s node, which indicates that the matching is completed before a node is inserted into the s node
                            // Compare and replace the head field (remove the nodes inserted before s and s nodes)
                            casHead(h, s.next);
                        // Return the element according to the type of this transfer
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // No fullfilling tag, trying to match
                    if (h.isCancelled()) // Cancelled
                        // Compare and replace the head field (pop up the head node)
                        casHead(h, h.next);
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // Generate an SNode node; Set the original head node as the next node of the node; Set the head node as this node
                        for (;;) { // Infinite loop
                            // Save next node of s
                            SNode m = s.next;
                            if (m == null) { // The next field is null
                                // Compare and replace the head field
                                casHead(s, null);
                                // Assignment s is null
                                s = null;
                                break;
                            }
                            // next field of m node
                            SNode mn = m.next;
                            if (m.tryMatch(s)) { // Try to match and succeed
                                // Compare and replace the head field (pop up s node and m node)
                                casHead(s, mn);
                                // Return the element according to the type of this transfer
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else // Matching failed
                                // Compare and replace the next field (pop up m node)
                                s.casNext(m, mn);
                        }
                    }
                } else { // Header matching
                    // Save the next field of the header node
                    SNode m = h.next; // m and h can match
                    if (m == null) // The next field is null
                        // Compare and replace the head field (m is matched by other nodes, and h needs to be popped up)
                        casHead(h, null);
                    else { // The next field is not null
                        // Get the next field of m node
                        SNode mn = m.next;
                        if (m.tryMatch(h)) // m and h match successfully
                            // Compare and replace the head field (pop up h and m nodes)
                            casHead(h, mn);
                        else // Matching failed
                            // Compare and replace the next field (remove m nodes)
                            h.casNext(m, mn);
                    }
                }
            }
        }

7. awaitFulfill function

  • This function indicates that the current thread spins or blocks until the node is matched. The awaitfull function called the shouldSpin function

  //Indicates that the current thread spins or blocks until the node is matched
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            // Calculate the deadline according to the timed ID
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // Get current thread
            Thread w = Thread.currentThread();
            // Determine the idle waiting time according to s
            int spins = (shouldSpin(s) ?
                    (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) { // Infinite loop to ensure successful operation
                if (w.isInterrupted()) // The current thread was interrupted
                    // Cancel s node
                    s.tryCancel();
                // Get the match field of s node
                SNode m = s.match;
                if (m != null) // m is not null, there is a matching node
                    // Return m node
                    return m;
                if (timed) { // timed is set
                    // Determine how long to wait
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) { // The waiting time is less than or equal to 0, and the waiting timeout
                        // Cancel s node
                        s.tryCancel();
                        // Skip the rest and continue
                        continue;
                    }
                }
                if (spins > 0) // Idle waiting time is greater than 0
                    // Do you really need to continue idling and waiting
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null) // The waiting thread is null
                    // Set the waiter thread as the current thread
                    s.waiter = w; // establish waiter so can park next iter
                else if (!timed) // The timed ID is not set
                    // Disables the current thread and sets the blocker
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold) // The waiting time is greater than the threshold
                    // Disables the current thread to wait up to the specified wait time unless a license is available
                    LockSupport.parkNanos(this, nanos);
            }
        }

8. shouldSpin function

 boolean shouldSpin(SNode s) {
            // Get header node
            SNode h = head;
            // s is the head node, or the head node is null, or h contains the filling flag, and returns true
            return (h == s || h == null || isFulfilling(h.mode)); 
        }

This function indicates that the thread contained in the current node (current thread) is waiting for idling. Idling waiting is required in the following cases

  • ① The current node is the head node

  • ② The header node is null

  • ③ The header node is matching

9. clean function

  • This function is used to remove all cancelled nodes from the top node of the stack to the node (excluding).

 void clean(SNode s) {
            // The item of the s node is set to null
            s.item = null;
            // The waiter field is set to null
            s.waiter = null;
            // Get the next field of s node
            SNode past = s.next;
            if (past != null && past.isCancelled()) // The next field is not null and the next field is cancelled
                // Reset past
                past = past.next;

            // Absorb cancelled nodes at head
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled()) // From the top node of the stack to the past node (not included), remove the continuous cancellation nodes
                // Compare and replace the head field (pop up the cancelled node)
                casHead(p, p.next);

            // Unsplice embedded nodes
            while (p != null && p != past) { // Remove discontinuous cancellation nodes that were not removed in the previous step
                // Get the next field of p
                SNode n = p.next;
                if (n != null && n.isCancelled()) // n is not null and n is cancelled
                    // Compare and replace the next field
                    p.casNext(n, n.next);
                else
                    // Set p to n
                    p = n;
            }
        }

⑥. Inner class_ TransferQueue

1. Inheritance relationship and attribute of class

    static final class TransferQueue<E> extends Transferer<E> {

        // Queue header
        transient volatile QNode head;
        // Tail node of queue
        transient volatile QNode tail;
        // Point to a cancelled node. When a node is last inserted into the queue, it may not leave the queue when it is cancelled
        transient volatile QNode cleanMe;
  • The queue has a head node and a tail node, indicating the head and tail of the queue respectively, and also contains a domain indicating the cancellation node.

2. Internal class of class_ QNode class

static final class QNode {
            // Next node
            volatile QNode next;
            // Prime item
            volatile Object item;
            // Wait thread
            volatile Thread waiter;
            // Is it data
            final boolean isData;

            // Constructor
            QNode(Object item, boolean isData) {
                // Initialize item field
                this.item = item;
                // Initialize isData domain
                this.isData = isData;
            }

            // Compare and replace the next field
            boolean casNext(QNode cmp, QNode val) {
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            // Compare and replace the item field
            boolean casItem(Object cmp, Object val) {
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }


            // Cancel this node and set the item field to itself
            void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }

            // Cancelled
            boolean isCancelled() {
                // Is the item field equal to itself
                return item == this;
            }


            // Is it not in the queue
            boolean isOffList() {
                // Is next equal to itself
                return next == this;
            }

            // Reflection mechanism
            private static final sun.misc.Unsafe UNSAFE;
            // Offset address of item field
            private static final long itemOffset;
            // Offset address of next field
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class<?> k = QNode.class;
                    itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

3. Class constructor

       TransferQueue() {
            // Initialize a sentinel node
            QNode h = new QNode(null, false); // initialize to dummy node.
            // Set header node
            head = h;
            // Set tail node
            tail = h;
        }
  • The constructor is used to initialize a queue and a sentinel node. Both the head node and the tail node point to the sentinel node.

4. transfer function

  • This function is used to produce or consume an element

  // This function is used to produce or consume an element
        E transfer(E e, boolean timed, long nanos) {
            QNode s = null;
            // Determine the type of transfer (put or take)
            boolean isData = (e != null);

            for (;;) { // Infinite loop to ensure successful operation
                // Get tail node
                QNode t = tail;
                // Get header node
                QNode h = head;
                if (t == null || h == null) // See uninitialized head and tail nodes / / saw uninitialized value
                    // Skip the rest and continue
                    continue;                       // spin

                if (h == t || t.isData == isData) { // The head node is equal to the tail node, or the tail node has the same mode as the current node. / / empty or same mode
                    // Get the next field of the tail node
                    QNode tn = t.next;
                    if (t != tail) // t is not the tail node. It is inconsistent. Try again. / / inconsistent read
                        continue;
                    if (tn != null) { // tn is not null. Other threads have added tn nodes / / lagging tail
                        // Set the new tail node to tn
                        advanceTail(t, tn);
                        // Skip the rest and continue
                        continue;
                    }
                    if (timed && nanos <= 0) // If timed is set and the waiting time is less than or equal to 0, it means that you can't wait and need to operate immediately. / / can't wait
                        // Return null
                        return null;
                    if (s == null) // s is null
                        // Create a new node and assign it to s
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s)) // Failed to set the next field of node t. / / failed to link in
                        // Skip the rest and continue
                        continue;
                    // Set new tail node
                    advanceTail(t, s);              // swing tail and wait
                    // Spins/blocks until node s is fulfilled
                    // Spin or block until the s node is matched
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) { // x is equal to s, indicating that it has been cancelled / / wait was cancelled
                        // eliminate
                        clean(t, s);
                        // Return null
                        return null;
                    }

                    if (!s.isOffList()) { // Node s has not left the queue yet. / / not already unlinked
                        // Set new header node
                        advanceHead(t, s);          // unlink if head
                        if (x != null) // x is not null / / and forget fields
                            // Set the item of s node
                            s.item = s;
                        // Set the waiter field of s node to null
                        s.waiter = null;
                    }

                    return (x != null) ? (E)x : e;

                } else { // Complementary mode / / complementary mode
                    // Get the next field of the header node (matching node)
                    QNode m = h.next;                // node to fulfill
                    if (t != tail || m == null || h != head) // t is not the tail node or m is null or h is not the head node (inconsistent)
                        // Skip the rest and continue
                        continue;                   // inconsistent read
                    // Get the element field of m node
                    Object x = m.item;
                    if (isData == (x != null) ||    // M nodes are matched / / m already completed
                            x == m ||                   // m node canceled / / m cancelled
                            !m.casItem(x, e)) {         // CAS operation failed / / lost CAS
                        advanceHead(h, m);          // The queue header node exits the queue and tries again. / / dequeue and retry
                        continue;
                    }
                    // Match succeeded. Set a new header node
                    advanceHead(h, m);              // successfully fulfilled
                    // Wait thread corresponding to unpark m node
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

5. awaitFulfill function

  • This function indicates that the current thread spins or blocks until the node is matched
 //This function indicates that the current thread spins or blocks until the node is matched
        Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            // Calculate the deadline according to the timed ID
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // Get current thread
            Thread w = Thread.currentThread();
            // Calculate spin time
            int spins = ((head.next == s) ?
                    (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) { // Infinite loop to ensure successful operation
                if (w.isInterrupted()) // The current thread was interrupted
                    // cancel
                    s.tryCancel(e);
                // Get element field of s
                Object x = s.item;
                if (x != e) // Element is not e
                    // return
                    return x;
                if (timed) { // timed is set
                    // Calculate the waiting time
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) { // The waiting time is less than or equal to 0
                        // cancel
                        s.tryCancel(e);
                        // Skip the rest and continue
                        continue;
                    }
                }
                if (spins > 0) // Idle time greater than 0
                    // Reduce idling time
                    --spins;
                else if (s.waiter == null) // The waiting thread is null
                    // Set wait thread
                    s.waiter = w;
                else if (!timed) // The timed ID is not set
                    // Disables the current thread and sets the blocker
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold) // The waiting time is greater than the threshold
                    // Disables the current thread to wait up to the specified wait time unless a license is available
                    LockSupport.parkNanos(this, nanos);
            }
        }

6. clean function

  • This function is used to remove nodes that have been cancelled

 // This function is used to remove nodes that have been cancelled
        void clean(QNode pred, QNode s) {
            // Set the wait thread to null
            s.waiter = null;
            /**
             * At any time, the last inserted node cannot be deleted in order to meet this condition
             * If the s node cannot be deleted, we set the precursor of the s node as the cleanMe node
             * Delete the previously saved version. At least s nodes or previously saved nodes can be deleted
             * So it always ends
             */
            while (pred.next == s) { // The next field of pred is s / / return early if already unlinked
                // Get header node
                QNode h = head;
                // Gets the next field of the header node
                QNode hn = h.next;   // Absorb cancelled first node as head
                if (hn != null && hn.isCancelled()) { // hn is not null and hn is cancelled
                    // Set new header node
                    advanceHead(h, hn);
                    // Skip the rest and continue
                    continue;
                }
                // Obtain the tail node to ensure the read consistency of the tail node
                QNode t = tail;      // Ensure consistent read for tail
                if (t == h) // The tail node is the head node, indicating that the queue is empty
                    // return
                    return;
                // Get the next field of the tail node
                QNode tn = t.next;
                if (t != tail) // t is not the tail node. It is inconsistent. Try again
                    // Skip the rest and continue
                    continue;
                if (tn != null) { // tn is not null
                    // Set new tail node
                    advanceTail(t, tn);
                    // Skip the rest and continue
                    continue;
                }
                if (s != t) { // S is not a tail node, remove s / / if not tail, try to unsplice
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn))
                        return;
                }
                // Get cleanMe node
                QNode dp = cleanMe;
                if (dp != null) { // dp is not null. Disconnect the previously cancelled node
                    // Get the next field of dp
                    QNode d = dp.next;
                    QNode dn;
                    if (d == null ||               // d is gone or
                            d == dp ||                 // d is off list or
                            !d.isCancelled() ||        // d not cancelled or
                            (d != t &&                 // d not tail and
                                    (dn = d.next) != null &&  //   has successor
                                    dn != d &&                //   that is on list
                                    dp.casNext(d, dn)))       // d unspliced
                        casCleanMe(dp, null);
                    if (dp == pred)
                        return;      // s is already saved node
                } else if (casCleanMe(null, pred))
                    return;          // Postpone cleaning s
            }
        }

⑦. Core function analysis


// Add the specified element to this queue and wait for another thread to receive it if necessary
    public void put(E e) throws InterruptedException {
        // If e is null, an exception is thrown
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) { // Transfer operation
            // Interrupt current thread
            Thread.interrupted();
            throw new InterruptedException();
        }
    }
    
    // Inserts the specified element into this queue and, if necessary, waits for the specified time for another thread to receive it
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        // If e is null, an exception is thrown
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) // Transfer operation
            return true;
        if (!Thread.interrupted()) // The current thread is not interrupted
            // return
            return false;
        throw new InterruptedException();
    }
    
    // If another thread is waiting to receive the specified element, the specified element is inserted into this queue
    public boolean offer(E e) {
        // If e is null, an exception is thrown
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null; // Transfer operation
    }
    
    // Gets and removes the header of this queue, waiting for another thread to insert it if necessary
    public E take() throws InterruptedException {
        // Transfer operation
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
    
    // Gets and removes the header of this queue and waits for the specified time if necessary for another thread to insert it
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted()) // The element is not null or the current thread is not interrupted
            return e;
        throw new InterruptedException();
    }
    
    // Gets and removes the header of the queue if another thread is currently using an element
    public E poll() {
        return transferer.transfer(null, true, 0);
    }
    
    // Always return true
    public boolean isEmpty() {
        return true;
    }
    
    // Always return 0
    public int size() {
        return 0;
    }
    
    // Always return 0
    public int remainingCapacity() {
        return 0;
    }
    
    // Do nothing
    public void clear() {
    }
    
    // Always return false
    public boolean contains(Object o) {
        return false;
    }
    
    // Always return false
    public boolean remove(Object o) {
        return false;
    }
    
    // Returns false unless the given collection is empty
    public boolean containsAll(Collection<?> c) {
        return c.isEmpty();
    }
    
    // Always return false
    public boolean removeAll(Collection<?> c) {
        return false;
    }
    
    // Always return false
    public boolean retainAll(Collection<?> c) {
        return false;
    }
    
    // Always return null
    public E peek() {
        return null;
    }
    
    // Returns an empty iterator, where hasNext always returns false
    public Iterator<E> iterator() {
        return Collections.emptyIterator();
    }
    
    // 
    public Spliterator<E> spliterator() {
        return Spliterators.emptySpliterator();
    }
    
    // Returns an array of length 0
    public Object[] toArray() {
        return new Object[0];
    }
    
    // Sets the 0th element of the specified array to null (if the array has a length other than 0) and returns it
    public <T> T[] toArray(T[] a) {
        if (a.length > 0)
            a[0] = null;
        return a;
    }
    
    // Remove all available elements from this queue and add them to the given collection
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        for (E e; (e = poll()) != null;) {
            c.add(e);
            ++n;
        }
        return n;
    }
    
    // Removes up to a given number of available elements from this queue and adds them to the given collection
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        for (E e; n < maxElements && (e = poll()) != null;) {
            c.add(e);
            ++n;
        }
        return n;
    }
  • The functions of SynchronousQueue largely rely on the transfer function of TransferStack or TransferQueue. Therefore, you can understand the principle of SynchronousQueue by understanding the transfer function.

IV SynchronousQueue case analysis_ Unfair strategy

package com.xizi.queue;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueDemo {
    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
        Producer p1 = new Producer("p1", queue, 10);
        Producer p2 = new Producer("p2", queue, 50);
        
        Consumer c1 = new Consumer("c1", queue);
        Consumer c2 = new Consumer("c2", queue);
        
        c1.start();
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        c2.start();
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        p1.start();
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        p2.start();
        
    }

    static class Producer extends Thread {
        private SynchronousQueue<Integer> queue;
        private int n;
        public Producer(String name, SynchronousQueue<Integer> queue, int n) {
            super(name);
            this.queue = queue;
            this.n = n;
        }
        
        public void run() {
            System.out.println(getName() + " offer result " + queue.offer(n));
        }
    }
    
    static class Consumer extends Thread {
        private SynchronousQueue<Integer> queue;
        public Consumer(String name, SynchronousQueue<Integer> queue) {
            super(name);
            this.queue = queue;
        }
        
        public void run() {
            try {
                System.out.println(getName() + " take result " + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

  • If two producers p1 and p2 and two consumers c1 and c2 are started in the order of c1, c2, p1 and p2, and each thread sleeps for 100ms after starting, there may be the following timing diagram

①. c1 executes the take operation. The main function calls are as follows

  • Synchronous queue adopts unfair strategy, that is, the bottom layer adopts stack structure.

  • After c1 thread enters awaitcomplete, it will idle and wait until the idle time elapses, and it will call locksupport The park function disables the current thread (c1) until the license is available.

②. c2 executes the take operation. The main function calls are as follows

  • After the c2 thread enters awaitcomplete, it will idle and wait until the idle time elapses, and it will call locksupport The park function disables the current thread (c2) until the license is available. At this time, there are two nodes in the stack, the node where c2 thread is located and the node where c1 thread is located.

③. p1 thread executes the offer(10) operation. The main function calls are as follows

  • After the offer(10) operation, the node where the c2 thread is located matches the head node (production data of the head node and consumption data of the node where the c2 thread is located). The c2 thread is unpark and can continue to run, while the c1 thread is still in the park (unfair policy).
  • After the c2 thread is unparked, it continues to run. The main function calls are as follows (since the c2 thread is unparked in the awaitfull function, the recovery is also in the awaitfull function)

  • When c2 thread recovers from unpark, the structure is as shown in the above figure. First, it returns from awaitcomplete function, then 10 from transfer function, and then 10 from take function.

④. p2 thread executes the offer(50) operation. The main function calls are as follows

  • After the offer(50) operation is executed, the node where the c1 thread is located matches the head node (production data of the head node and consumption data of the node where the c1 thread is located). The c1 thread is unpark and can continue to run.
  • After the c1 thread is unparked, it continues to run. The main function calls are as follows (since the c1 thread is unparked in the awaitfull function, the recovery is also in the awaitfull function)

V SynchronousQueue case analysis_ Fair strategy

Vi SynchronousQueue case analysis_ Change the starting sequence between c1, c2, p1 and p2

  • Start sequence P1 - > C1 - > P2 - > C2
  • Only the c1 thread is matched, the p1 thread stores the elements and returns false directly, because there is no consumer thread waiting at this time, while the p2 thread matches the c1 thread. The p2 thread stores the elements successfully, and the c1 thread obtains the elements successfully. At this time, the c2 thread is still in the park state, and the application cannot end normally.
  • Therefore, it can be seen that there must be a fetch operation and then a save operation before the two can match correctly. If the save operation is followed by the fetch operation, the matching cannot succeed at this time and will be blocked. The fetch operation expects the next save operation to match.

Added by MrPen on Sun, 16 Jan 2022 04:42:08 +0200