[Learning Notes - Java Collection - 17] Queue - DelayQueue Source Analysis

introduce

DelayQueue is a delayed blocking queue under concurrent packages in java, which is often used to implement timing tasks.

Inheritance System

As you can see from the inheritance system, DelayQueue implements BlockingQueue, so it is a blocking queue.

In addition, DelayQueue combines an interface called Delayed, and all elements stored in DelayQueue must implement the Delayed interface.

So what is Delayed?

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

Delayed is an interface inherited from Comparable, and defines a getDelay() method to indicate how much time it still has to expire and return a value less than or equal to zero when it expires.

Source code analysis

Main attributes

// Locks used to control concurrency
private final transient ReentrantLock lock = new ReentrantLock();
// Priority queue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// Used to mark whether threads are currently queuing (only for element fetching)
private Thread leader = null;
// Conditions to indicate whether there are currently desirable elements
private final Condition available = lock.newCondition();

From the attributes, we can see that the delay queue is mainly implemented by priority queue, with re-entry locks and conditions to control concurrency security.

Because the priority queue is unbounded, only one condition is needed here.

Main construction methods

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

The construction method is relatively simple, a default construction method, and an initialization method for adding all elements in set c.

Join the team

Because DelayQueue is a blocking queue and the priority queue is unbounded, it will not be blocked and will not overtime, so its four entry methods are the same.

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

The entry method is relatively simple:

  1. Locking;
  2. Adding elements to the priority queue;
  3. If the added element is the heap top element, leave the leader empty and wake up the thread waiting on the conditional available.
  4. Unlock;

Team out

Because DelayQueue is a blocking queue, there are four different ways of queuing: throwing exceptions, blocking, non-blocking, and timeouts.

Here we mainly analyze two methods, poll() and take().

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

The poll() method is relatively simple:

  1. Locking;
  2. Check the first element and return null if it is empty or not yet expired.
  3. If the first element expires, poll() is called to pop up the first element.
  4. Unlock.
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // Heap top element
            E first = q.peek();
            // If the heap top element is empty, it means that there are no elements in the queue, and the waiting is blocked directly.
            if (first == null)
                available.await();
            else {
                // The expiration time of heap top elements
                long delay = first.getDelay(NANOSECONDS);
                // If less than 0 indicates expiration, call poll() directly to pop up the top element
                if (delay <= 0)
                    return q.poll();

                // If the delay is greater than 0, then the following is blocked

                // Leaving first empty facilitates gc, because it is possible that other elements pop up this element
                // It also holds that references will not be cleaned up.
                first = null; // don't retain ref while waiting
                // If there are other threads waiting ahead, go straight to wait
                if (leader != null)
                    available.await();
                else {
                    // If leader is null, assign the current thread to it
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // Waiting for delay time to wake up automatically
                        // When you wake up, leave the leader empty and re-enter the loop to determine whether the top element is due or not.
                        // Even when you wake up, you don't necessarily get elements here.
                        // Because it's possible that other threads get the lock first and pop up the top element.
                        // The awakening of conditional locks is divided into two steps, starting with a queue of Condition s.
                        // Really wake up when other threads call LockSupport.unpark(t) when they re-enter the AQS queue
                        // We'll talk about AQS later.^^
                        available.awaitNanos(delay);
                    } finally {
                        // If the leader is still the current thread, leave it empty, giving other threads the opportunity to get elements
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // If the leader is empty and there are elements on the top of the heap, wake up the next waiting thread
        if (leader == null && q.peek() != null)
            // signal() just puts the waiting threads in the AQS queue, not the real wake-up.
            available.signal();
        // Unlock, that's the real Awakening
        lock.unlock();
    }
}

The take() method is slightly more complex:

  1. Locking;
  2. Determine whether the top element of the heap is empty, if it is empty, block and wait directly.
  3. Determine whether the top element expires and directly poll() out the element when it expires.
  4. If it does not expire, then we can judge whether there are other threads waiting in front of us, or whether there are other threads waiting directly.
  5. There are no other threads waiting in front of us, so we regard ourselves as the first thread waiting for the delay time to wake up, and then try to get the elements.
  6. Get the element and wake up the next waiting thread.
  7. Unlock;

Usage method

Look at the following example. The earlier the elements expire, the earlier they will be in line.

public class DelayQueueTest {
    public static void main(String[] args) {
        DelayQueue<Message> queue = new DelayQueue<>();

        long now = System.currentTimeMillis();

        // Start a thread to fetch elements from the queue
        new Thread(()->{
            while (true) {
                try {
                    // Print 1000, 2000, 5000, 7000, 8000 in turn
                    System.out.println(queue.take().deadline - now);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // Add 5 elements to the queue
        queue.add(new Message(now + 5000));
        queue.add(new Message(now + 8000));
        queue.add(new Message(now + 2000));
        queue.add(new Message(now + 1000));
        queue.add(new Message(now + 7000));
    }
}

class Message implements Delayed {
    long deadline;

    public Message(long deadline) {
        this.deadline = deadline;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return deadline - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return String.valueOf(deadline);
    }
}

summary

  1. DelayQueue is a blocking queue;
  2. DelayQueue internal storage structure uses priority queues;
  3. DelayQueue uses reentrant locks and conditions to control concurrency security.
  4. DelayQueue is often used for timing tasks.

DelayQueue is not directly used to implement timing tasks in thread pools in java

Scheduled ThreadPool Executor uses its own internal class DelayedWorkQueue. In fact, the implementation logic is basically the same, but DelayedWorkQueue does not use the current PriorityQueue, but uses an array to implement a priority queue again, which is essentially no difference.

Keywords: Java less

Added by samsbox on Sat, 24 Aug 2019 12:10:51 +0300