Java Review - Concurrent Programming_ Principle of PriorityBlockingQueue & source code analysis

Article catalog

summary

PriorityBlockingQueue is an unbounded blocking queue with priority. Each time out of the queue, the element with the highest or lowest priority is returned.

It is internally implemented by using a balanced binary tree heap, so directly traversing the queue elements does not guarantee order.

By default, the compareTo method of the object is used to provide comparison rules. If you need to customize comparison rules, you can customize comparators.

Class diagram structure

As can be seen from the figure,

  • There is an array queue inside PriorityBlockingQueue, which is used to store queue elements, and size is used to store the number of queue elements.
  • allocationSpinLock is a spin lock. It uses CAS operation to ensure that only one thread can expand the queue at the same time. The status is 0 or 1, where 0 indicates that there is no capacity expansion currently and 1 indicates that capacity expansion is currently in progress.
  • Since this is a priority queue, there is a comparator comparator to compare element sizes.
  • The lock exclusive lock object is used to control that only one thread can enter or leave the queue at the same time.
  • The notEmpty condition variable is used to implement the take method blocking mode. There is no notFull condition variable here because the put operation here is non blocking. Why should it be designed as non blocking is because it is an unbounded queue.
  • In the following constructor, the default queue capacity is 11 and the default comparator is null, that is, the compareTo method of the element is used to determine the priority of the element, which means that the queue element must implement the Comparable interface.
   /**
     * Default array capacity.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

   
    /**
     * Creates a {@code PriorityBlockingQueue} with the default
     * initial capacity (11) that orders its elements according to
     * their {@linkplain Comparable natural ordering}.
     */
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }


    /**
     * Creates a {@code PriorityBlockingQueue} with the specified
     * initial capacity that orders its elements according to their
     * {@linkplain Comparable natural ordering}.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }



    /**
     * Creates a {@code PriorityBlockingQueue} with the specified initial
     * capacity that orders its elements according to the specified
     * comparator.
     *
     * @param initialCapacity the initial capacity for this priority queue
     * @param  comparator the comparator that will be used to order this
     *         priority queue.  If {@code null}, the {@linkplain Comparable
     *         natural ordering} of the elements will be used.
     * @throws IllegalArgumentException if {@code initialCapacity} is less
     *         than 1
     */
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

Small Demo

First, make a Demo to understand the usage of PriorityBlockingQueue. In this Demo, the tasks with priority will be put into the queue, and then the tasks with the highest priority will be obtained one by one from the queue for execution

Core method & source code analysis

offer

poll

put

take

size

Added by mackevin on Sat, 01 Jan 2022 08:38:21 +0200