[Learning Notes - Java Collection - 15] Queue - ArrayBlockingQueue Source Analysis

introduce

ArrayBlockingQueue is the next blocked queue implemented as an array in a java Concurrent package and is thread-safe. See the analysis below for the need for scaling up.

queue

Queues, a linear table, are characterized by FIFO, or FIFO. As we usually queue, first come first, that is, the first person who enters the queue first leaves the queue.

Source Code Analysis

Main attributes

// Storing elements using arrays
final Object[] items;

// Pointer to element
int takeIndex;

// Pointer to Place Element
int putIndex;

// Number of elements
int count;

// Locks guaranteeing concurrent access
final ReentrantLock lock;

// Non-empty condition
private final Condition notEmpty;

// Non-full condition
private final Condition notFull;

From the attributes, we can get the following important information:

  1. Store elements using arrays;
  2. Mark the position of the next operation by placing and taking the pointer;
  3. Use reentrant locks to ensure concurrent security;

Main construction methods

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    // Initialize Array
    this.items = new Object[capacity];
    // Create reentrant locks and two conditions
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

Two conclusions can be drawn from the construction method:

  1. ArrayBlockingQueue initialization must pass in capacity, which is the size of the array;
  2. Constructive methods can be used to control whether the type of re-entry lock is fair or unfair.

Entry

There are four ways to join the team: add (E), offer (E), put (E), offer (E), long timeout (TimeUnit unit). What are the differences between them?

public boolean add(E e) {
    // Call the add(e) method of the parent class
    return super.add(e);
}

// super.add(e)
public boolean add(E e) {
    // Call offer(e) If true is returned successfully, throw an exception if failed
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public boolean offer(E e) {
    // Element cannot be empty
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // Locking
    lock.lock();
    try {
        if (count == items.length)
            // Return false if the array is full
            return false;
        else {
            // Invoke the queue-entry method and return true if the array is not full
            enqueue(e);
            return true;
        }
    } finally {
        // Unlock
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // Lock, throw an exception if the thread interrupts
    lock.lockInterruptibly();
    try {
        // If the array is full, use notFull to wait
        // notFull waits, which means the queue is full now
        // Queues are not satisfied until an element is removed
        // Then wake up notFull and continue with the logic now
        // Why use while instead of if here
        // It's possible that multiple threads are blocking on the lock
        // Even if it wakes up, it's possible that other threads modified the queue one step before it became full again
        // You need to wait again at this time
        while (count == items.length)
            notFull.await();
        // Entry
        enqueue(e);
    } finally {
        // Unlock
        lock.unlock();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // Locking
    lock.lockInterruptibly();
    try {
        // If the array is full, block nanos for nanoseconds
        // Return false if there is still no space to wake up this thread and time is up
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        // Entry
        enqueue(e);
        return true;
    } finally {
        // Unlock
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    // Place the element directly at the pointer position
    items[putIndex] = x;
    // If you place the pointer at the end of the array, return to the head
    if (++putIndex == items.length)
        putIndex = 0;
    // Quantity plus 1
    count++;
    // Wake up notEmpty, because there is an element in the queue, so it must not be empty
    notEmpty.signal();
}
  1. Throw an exception if the queue is full when add(e);
  2. Return false if the queue is full when offer(e);
  3. Use notFull to wait when the queue is full;
  4. When offer(e, timeout, unit), wait a while if the queue is full and return false if the queue is still full.
  5. Store elements using a pointer-release loop using arrays;

Queue

There are four ways to get out of the queue: remove(), poll(), take(), poll(long timeout, TimeUnit unit). What is the difference between them?

public E remove() {
    // Call poll() method to queue
    E x = poll();
    if (x != null)
        // Return an element if it is queued
        return x;
    else
        // Throw an exception if no elements are queued
        throw new NoSuchElementException();
}

public E poll() {
    final ReentrantLock lock = this.lock;
    // Locking
    lock.lock();
    try {
        // Returns null if the queue has no elements or leaves the queue
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // Locking
    lock.lockInterruptibly();
    try {
        // Blocking waits on condition notEmpty if queue has no elements
        while (count == 0)
            notEmpty.await();
        // There are elements before you leave the team
        return dequeue();
    } finally {
        // Unlock
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // Locking
    lock.lockInterruptibly();
    try {
        // Blocking waits for nanos nanoseconds if the queue has no elements
        // Return null if the next time this thread acquires a lock but the queue remains empty and has timed out
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // Take element of pointer position
    E x = (E) items[takeIndex];
    // Set the fetch pointer position to null
    items[takeIndex] = null;
    // Move the pointer forward to return the array front end recycling if the array ends
    if (++takeIndex == items.length)
        takeIndex = 0;
    // Number of elements minus 1
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // Wake up notFull condition
    notFull.signal();
    return x;
}
  1. Throw an exception if the queue is empty when remove();
  2. null is returned if the queue is empty when poll();
  3. take() if the queue is empty, blocking waits on condition notEmpty;
  4. When poll(timeout, unit), if the queue is empty, it will block and wait for a period of time and return null if it is still empty.
  5. Use the fetch pointer loop to fetch elements from the array;

summary

  1. ArrayBlockingQueue does not need to be expanded because it specifies capacity at initialization and recycles arrays;
  2. ArrayBlockingQueue recycles arrays using takeIndex and putIndex;
  3. Entry and Exit define four groups of methods for different purposes.
  4. Use reentrant locks and two conditions to ensure concurrent security;
  5. Queue length is fixed and must be specified at initialization, so capacity must be carefully considered before use.
  6. If the speed of consumption does not keep up with the speed of queue entry, the provider thread will be blocked all the time, and the more blocked, the more dangerous it will be;

Keywords: Java

Added by Negligence on Wed, 21 Aug 2019 19:12:44 +0300