The source code of this article is based on JDK13
PriorityBlockingQueue priority blocking queue
Official annotation translation
An unbounded blocking queue uses the same queuing rule PriorityQueue and provides blocking operations. Because the queue is logically misunderstood, the attempt to add may fail because the resource ring is exhausted (such as OOM)
This class does not accept null elements. A priority queue depends on natural order and does not guarantee the element order of non comparable elements
This class and its Iterator implement all optional methods of the Collection and Iterator interfaces. This Iterator provides iterator() and splitter (), and does not guarantee the order of traversing elements
If you need to sort the traversal, you can use Arrays.sort(pq.toArray()). In addition, the method drawnto can be used to remove some elements and put them into another collection
The operation of this class does not guarantee the order of elements with the same priority. If you need to enforce an order, you can define a customized class or comparator and use the second key to break the case where the first key is the same
For example, here is a class that provides FIFO order to compare elements.
class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { static final AtomicLong seq = new AtomicLong(0); final long seqNum; final E entry; public FIFOEntry(E entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } public E getEntry() { return entry; } public int compareTo(FIFOEntry<E> other) { // First call 'CompareTo' to get priority int res = entry.compareTo(other.entry); // If the first priority is the same, another priority is given according to seqNum if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); return res; } }
CompareTo is implemented. First, the CompareTo of the original class is used. If the priorities are equal, the internally customized seqNum is used to compare the priorities
This class is also a member of the java collection framework
Source code
definition
@SuppressWarnings("unchecked") public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
It implements the interface of queue and the interface of blocking queue
attribute
// The array that actually holds the data private transient Object[] queue; // Number of elements private transient int size; // Comparator, which defines the priority of the element private transient Comparator<? super E> comparator; // lock private final ReentrantLock lock = new ReentrantLock(); // Wait condition that is not empty private final Condition notEmpty = lock.newCondition(); // lock private transient volatile int allocationSpinLock; // A class used to help serialize is useless private PriorityQueue<E> q;
Use an array to save elements, save the current number, and a comparator to define the priority between elements
Constructor
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.comparator = comparator; this.queue = new Object[Math.max(1, initialCapacity)]; } public PriorityBlockingQueue(Collection<? extends E> c) { boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] es = c.toArray(); int n = es.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (es.getClass() != Object[].class) es = Arrays.copyOf(es, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (Object e : es) if (e == null) throw new NullPointerException(); } this.queue = ensureNonEmpty(es); this.size = n; if (heapify) heapify(); }
Four construction methods are implemented. The first three are the assignment of initial capacity and comparator. The fourth constructor supports the initialization of elements in a given set into the queue
Queue operation
public boolean add(E e) { return offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); // Lock final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] es; // Capacity expansion while ((n = size) >= (cap = (es = queue).length)) tryGrow(es, cap); try { // Float the current element to the correct priority position according to whether there is a specific comparator final Comparator<? super E> cmp; if ((cmp = comparator) == null) siftUpComparable(n, e, es); else siftUpUsingComparator(n, e, es, cmp); // Number + 1, notifying waiting threads that are not empty size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; } public void put(E e) { offer(e); // never need to block } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block }
The four methods add, offer, put and offer (time, unit) are essentially the same offer called. Why?
Because this priority queue is essentially unbounded, that is, there is no case that the queue is full, so the previous waiting condition is only notEmpty, not notFull like other queues.
This method is relatively simple:
- If the capacity is not enough, expand the capacity
- Put it directly into the queue, and then float up according to whether there is a specific comparison until your priority should be in the position
- Just notify all threads whose waiting queue is not empty
Two float up operations:
private static <T> void siftUpComparable(int k, T x, Object[] es) { Comparable<? super T> key = (Comparable<? super T>) x; // ergodic while (k > 0) { // Parent node int parent = (k - 1) >>> 1; Object e = es[parent]; // Comparison between parent node and current node if (key.compareTo((T) e) >= 0) break; es[k] = e; k = parent; } // Find the location for the new node es[k] = key; } // It is the same as the above method, except that the comparator is given, not the CompareTo of the element itself. private static <T> void siftUpUsingComparator( int k, T x, Object[] es, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = es[parent]; if (cmp.compare(x, (T) e) >= 0) break; es[k] = e; k = parent; } es[k] = x; }
Because the elements in the queue are actually a balanced binary heap, when looking for the priority position of a given element, you can use a floating operation similar to the heap
Out of line operation
// If it is empty, null is returned public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; }
The core of several outgoing methods of the queue is to call the dequeue() method, but the processing strategies are inconsistent when the acquisition element is empty
- poll returned null
- take permanent blocking
- poll(time,unit) blocks a given time
The core team exit method is as follows:
private E dequeue() { // assert lock.isHeldByCurrentThread(); final Object[] es; final E result; // Get the first element of the array, that is, the element at the top of the heap if ((result = (E) ((es = queue)[0])) != null) { final int n; // Last element final E x = (E) es[(n = --size)]; es[n] = null; if (n > 0) { // Put him on top of the pile and sink to make the pile accord with priority final Comparator<? super E> cmp; if ((cmp = comparator) == null) siftDownComparable(0, x, es, n); else siftDownUsingComparator(0, x, es, n, cmp); } } return result; }
It is also a common out of heap code. First get the top element of the heap, and then put the last element of the heap on the top of the heap to sink, so that the whole heap meets the priority
Sinking Code:
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) { // assert n > 0; Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { // Child node of the heap top element int child = (k << 1) + 1; // assume left child is least Object c = es[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) es[right]) > 0) c = es[child = right]; if (key.compareTo((T) c) <= 0) break; es[k] = c; k = child; } es[k] = key; }
Compare the given node with the child node on the right. If it does not meet the priority, exchange the position. Execute recursively
summary
A blocking queue with priority. The CompareTo of the element itself and the given Comparator are supported
The implementation of priority uses heap. Therefore, the carrier of internal storage elements is an array
Because the design is an unbounded queue, the queueing method will never block, but will gradually burst the memory. The put method will not block. The queueing method, like other blocking queues, will block
ReentrantLock is used to read and write arrays to ensure thread safety
Blocking operation uses Condition to realize blocking waiting and wake - up
Reference articles
End.
Contact me
Finally, welcome to my personal official account, Yan Yan ten, which will update many learning notes from the backend engineers. I also welcome direct official account or personal mail or email to contact me.
The above are all personal thoughts. If there are any mistakes, please correct them in the comment area.
Welcome to reprint, please sign and keep the original link.
Contact email: huyanshi2580@gmail.com
For more study notes, see personal blog or WeChat official account, Yan Yan ten > > Huyan ten