[concurrent programming] unbounded blocking queue DelayQueue based on priority queue

What is DelayQueue

  • DelayQueue is a blocking queue that supports delayed acquisition of elements.
  • The internal priority queue is used to store elements, and the elements must implement the Delayed interface;
  • When creating an element, you can specify how long the current element can be obtained from the queue, and the element can be extracted from the queue only when the delay expires.

Usage scenario of DelayQueue

  • The mall order closed out of time.
  • Asynchronous SMS notification function.
  • Close idle connections. In the server, there are many client connections that need to be closed after being idle for a period of time.
  • Cache expiration cleanup. Objects in the cache have exceeded their lifetime and need to be removed from the cache.
  • Task timeout processing. When the sliding window of the network protocol requests the response interaction, the request that is not responded after the timeout is processed.

Characteristics of DelayQueue

  • Instead of first in first out, it will be sorted according to the delay time, and the next task to be executed will be ranked at the top of the queue.
  • Lock used: ReentrantLock.
  • Data structure used: PriorityQueue. However, it has no blocking function similar to blockpriorityqueue.
  • Blocked object: Condition available

Queue in and queue out logic of DelayQueue

  • Queue entry: non blocking, unbounded queue, the same as priority queue, available.
  • Out of line: blocked when empty. If it is not empty, check the expiration time of the top element. If it is less than or equal to 0, it will be out of the queue. If it is greater than 0, it means it has not expired, it will be blocked.

Source code analysis of data structure of DelayQueue

// Thread safety for queue operations
private final transient ReentrantLock lock = new ReentrantLock();
// Priority queue, a storage element, is used to ensure priority execution with low delay
private final PriorityQueue<E> q = new PriorityQueue<E>();
// Used to mark whether a thread is currently queued (only when fetching elements): the leader refers to the first thread that gets blocked elements from the queue
private Thread leader = null;
// Condition, used to indicate whether there is a desirable element now: it is notified when a new element arrives or a new thread may need to become a leader
private final Condition available = lock.newCondition();

Construction method and source code analysis of DelayQueue

/**
 * No parameter structure, do nothing
 */
public DelayQueue() {}

/**
 * Construction method with data to be initialized.
 */
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

/**
 * Add all elements
 */
public boolean addAll(Collection<? extends E> c) {
    // The added element is empty, throwing exception
    if (c == null)
        throw new NullPointerException();
    // The current object is passed in. Throw an exception
    if (c == this)
        throw new IllegalArgumentException();
    // Define modified flags
    boolean modified = false;
    for (E e : c)
        // Add element
        if (add(e))
            // Add successfully, modify flag
            modified = true;
    // Return modified flag
    return modified;
}

Source code analysis of queue joining method of DelayQueue

/**
 * DelayQueue How to join the team
 */
public void put(E e) {
    offer(e);
}

/**
 * Queue method with return value
 */
public boolean offer(E e) {
    // Get the lock of the current queue
    final ReentrantLock lock = this.lock;
    // Acquire lock
    lock.lock();
    try {
        // Call the offer operation of priority queue
        q.offer(e);
        // Call snooping operation of priority queue: the queued element is at the head of the queue, indicating that the current element delay is the smallest
        if (q.peek() == e) {
            // Leave leader blank
            leader = null;
            // The available condition queue is transferred to the synchronization queue to prepare to wake up the threads blocked on available
            available.signal();
        }
        // Return to join the team successfully
        return true;
    } finally {
        // Unlock and really wake up the blocked thread
        lock.unlock();
    }
}

Source code analysis of queue out method of DelayQueue

/**
 * DelayQueue Out of line method
 */
public E take() throws InterruptedException {
    // Get the lock of the current queue
    final ReentrantLock lock = this.lock;
    // Acquire lock operation: priority response interrupt
    lock.lockInterruptibly();
    try {
        for (;;) {
            // Take out the top of heap element (the earliest expired element, but do not pop the object)   
            E first = q.peek();
            // If the top element of the heap is empty, it indicates that there is no element in the queue, and the waiting is blocked directly
            if (first == null)
                available.await();
            else {
                // Get the expiration time of the heap top element
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // If it is less than 0, it indicates that it has expired. Directly call the poll() method to pop up the heap top element
                    return q.poll();

                // The following logic is that delay is greater than 0 and will be blocked
                // Set first to null to facilitate gc
                first = null; // don't retain ref while waiting
                // If there is a Leader thread contested by threads, wait indefinitely.
                if (leader != null)
                    available.await();
                else {
                    // If the leader is null, assign the current thread to it
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // Waiting time remaining
                        available.awaitNanos(delay);
                    } finally {
                        // If the leader is still the current thread, set it to empty, so that other threads have a chance to get elements
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // After exiting the queue successfully, if the leader is empty and there are elements at the top of the heap, the next waiting thread will be awakened
        if (leader == null && q.peek() != null)
            // The available condition queue is transferred to the synchronization queue to prepare to wake up the threads blocked on available
            available.signal();
        // Unlock and really wake up the blocked thread
        lock.unlock();
    }
}

Concluding remarks

  • Get more pre knowledge articles of this article and new valuable articles. Let's become architects together!
  • Paying attention to the official account gives you a deep understanding of MySQL.
  • Pay attention to official account and keep continuous understanding of concurrent programming every day!
  • Pay attention to the official account, and follow the continuous and efficient understanding of spring source code.
  • This official account is not advertising!!! Update daily!!!

Keywords: Concurrent Programming

Added by SnakeO on Fri, 04 Feb 2022 16:16:21 +0200