ArrayBlockingQueue graphical source code analysis

Introduction to ArrayBlockingQueue:

  • In the implementation of array based blocking queue, a fixed length array is maintained in ArrayBlockingQueue to cache the data objects in the queue. This is a common blocking queue. In addition to a fixed length array, ArrayBlockingQueue also stores two shaping variables, which respectively identify the position of the head and tail of the queue in the array.

  • ArrayBlockingQueue shares the same lock object when the producer places data and the consumer obtains data, which also means that the two cannot run in parallel, which is especially different from LinkedBlockingQueue; According to the analysis of the implementation principle, the ArrayBlockingQueue can completely adopt the separation lock, so as to realize the complete parallel operation of producer and consumer operations. The reason why Doug Lea didn't do this may be that the data writing and obtaining operations of ArrayBlockingQueue are light enough to introduce an independent locking mechanism. In addition to bringing additional complexity to the code, it can't take any advantage in performance.

  • Another obvious difference between ArrayBlockingQueue and LinkedBlockingQueue is that the former will not generate or destroy any additional object instances when inserting or deleting elements, while the latter will generate an additional Node object. In the system that needs to process large quantities of data efficiently and concurrently for a long time, its impact on GC is still different. When creating ArrayBlockingQueue, we can also control whether the internal lock of the object adopts fair lock, and non fair lock is adopted by default.

ArrayBlockingQueue inheritance diagram

Let's take a look at how this array works, and we've seen one of its out of team and in team operations

The core method of ArrayBlock:

 // The core method of BlockingQueue:

// Add elements to the queue, and throw an IllegalStateException if the queue is full
 boolean add(E e)
 
// Add elements to the queue. If the queue is full, it will directly return false. It will not block the thread currently adding elements. It will return true successfully
 boolean offer(E e)

// Add the element to the queue within the specified time. If the queue is full, it will block until the queue has an empty position or wait for the timeout to end the method
boolean offer(E e, long timeout, TimeUnit unit)

// Add an element to the queue. If the queue is full, it is blocked until there is an empty position in the queue and another thread wakes it up
void put(E e) 

// Get and remove the first element of the queue header. If there is no element in the queue, null will be returned directly and will not be blocked
E poll()

// Get and remove the elements that fall behind the queue head within the specified time. If there are no elements in the timeout queue, it will directly return null
E poll(long timeout, TimeUnit unit)
 
// Get an element from the head of the queue. If the queue is empty, it will be blocked until other threads have added the element and have the opportunity to wake it up. It can be interrupted during the blocking process
E take()

// Obtain a specified number of available elements from the queue at one time and put them into a new collection. This method can improve efficiency and avoid obtaining multiple locks in batches
int drainTo(Collection<? super E> c, int maxElements)

Construction method of ArrayBlockingQueue:

// Pass an initialization capacity and build a new ArrayBlockingQueue queue    
public ArrayBlockingQueue(int capacity) {
      // Create an unfair ArrayBlockIngQueue queue based on the passed capacity
        this(capacity, false);
    }

//Specify the initialization capacity, whether fair lock or non fair lock to build a new ArrayBlockingQueue queue
   public ArrayBlockingQueue(int capacity, boolean fair) {
        // If the initialization capacity is less than or equal to 0, an exception is thrown directly 
        if (capacity <= 0)throw new IllegalArgumentException();
        	// The initial queue array is based on the capacity passed
        	this.items = new Object[capacity];
        	// Create a specified lock. Whether it is fair or unfair depends on the passed parameters. false means unfair
        	lock = new ReentrantLock(fair);
        	// Create a wait condition variable whose queue is not empty. If there is no data in the queue when take(), the thread of take() will enter the condition variable to wait
        	notEmpty = lock.newCondition();
        	// Create a waiting condition variable whose queue is not full. For example, when the queue is full when put(), the put thread will enter the condition variable to wait
       		notFull =  lock.newCondition();
    }

    /**
     * Pass a collection and add the elements in the collection to the queue
     * @param capacity Initial capacity
     * @param fair Is fair lock used
     * @param c The collection containing elements cannot be empty. If it is empty, NullPointerException will be thrown
     */
    public ArrayBlockingQueue(int capacity, boolean fair,  Collection<? extends E> c) {
        // Call the construction method to initialize the queue according to the passed parameters
        this(capacity, fair);
        final ReentrantLock lock = this.lock;
        // Perform locking operation
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            // Define a temporary variable i to traverse the collection
            int i = 0;
            try {
                // Traversing elements in a collection
                for (E e : c) {
                    // If there are null elements in the collection, an exception is thrown directly 
                    checkNotNull(e);
                    // Put the traversed elements into the array inside the queue
                    items[i++] = e;
                }
                // If the number of elements in the collection exceeds the initial capacity of the queue, a parameter exception is thrown directly
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            // The number of valid elements in the passed collection is assigned to the counter of the queue
            count = i;
            // Record putIndex according to the number of elements in the collection, that is, the position of the subscript to be added in the array next time,
            //If i==capacity, an exception will be thrown during the next addition. The addition fails because count and capacity will be compared during the next addition
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            // Release lock
            lock.unlock();
        }
}

Key core attributes and methods

Next, let's look at the important properties and methods inside ArrayBlockingQueue:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {

    
    private static final long serialVersionUID = -817911632652898426L;

    // itmes for storing specific elements
    final Object[] items;

    // takeIndex is the next index of take, poll, peek or remove
    int takeIndex;

    // Putindex is used to record the subscript position where the next element needs to be inserted in the array
    int putIndex;

    // Number of elements in the queue
    int count;
	
    // Lock lock, which locks the added element and the obtained element. The same lock is used
    final ReentrantLock lock;

    // When getting elements, if the queue is empty. This time, the thread that gets the element enters the condition variable and waits
    private final Condition notEmpty;

    // When the queue is full when adding elements, the added thread enters the condition variable and waits
    private final Condition notFull;

    // An iterator inherited from Itorator
    transient Itrs itrs = null;

    
    final int dec(int i) {
        return ((i == 0) ? items.length : i) - 1;
    }

    // Returns the elements in the itmes array according to the subscript
    @SuppressWarnings("unchecked")
    final E itemAt(int i) {
        return (E) items[i];
    }

   // Null pointer exception detected
    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }

    // Add an element to the items array and lock it when calling this method
    private void enqueue(E x) {
        // First assign the itmes array to a temporary variable and operate in your own thread
        final Object[] items = this.items;
        // Add the element to be added to the items array
        items[putIndex] = x;
        // If putIndex reaches the maximum capacity of the array, set putIndex to 0
        if (++putIndex == items.length)
            putIndex = 0;
        // The counter that records the number of elements in the queue + 1
        count++;
        // Wake up the thread that gets the element block after adding
        notEmpty.signal();
    }

    // Remove the element from the items array and lock it before calling the method
    private E dequeue() {
     	// First assign the itmes array to a temporary variable and operate in your own thread
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // Get the element from the header of the queue according to takeIndex
        E x = (E) items[takeIndex];
        // Setting the subscript of the element removed this time to null can help GC
        items[takeIndex] = null;
        // If takeIndex = = the array length of itmes, it indicates that the end of the array has been reached. Set takeIndex to 0
        if (++takeIndex == items.length)
            takeIndex = 0;
        // Number of elements in the queue - 1
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // After removal, wake up the thread that wants to add elements but is blocking
        notFull.signal();
        return x;
    }

    // Specify to delete the elements on the index. After deleting, wake up the thread that wants to be added but is blocking. A lock will be performed when this method is executed
    void removeAt(final int removeIndex) {
       // First assign the itmes array to a temporary variable and operate in your own thread
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
    
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }

Queue operation

Add (E): call the offer(e) method to add elements to the queue. If the queue is full, throw an IllegalStateException

    public boolean add(E e) {
        // Directly call the offer(e) method to add this element to the end of the queue. If it returns true successfully, the failure indicates that the opposite side is full and an exception is thrown
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

Offer (E): add elements to the queue. If the queue is full, it will directly return false. It will not block the thread currently adding elements. It will successfully return true

    public boolean offer(E e) {
        // First check whether the passed element is null. If it is null, an exception will be thrown directly
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // Lock operation. Whether it is fair or unfair is passed by the construction method. It is unfair by default
        lock.lock();
        try {
            // If the queue is full, return false directly
            if (count == items.length)
                return false;
            else {
                // If the queue is not full, directly call the enqueue(e) method to add it to the end of the queue and return true
                enqueue(e);
                return true;
            }
        } finally {
            // Release lock
            lock.unlock();
        }
    }

Offer (E, long timeout, timeunit unit): add elements to the queue within the specified time. If the queue is full, it will be blocked until the queue has an empty position or wait for the timeout to end the method

    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
		// Detect whether the passed element is null, and throw an exception directly if it is null
        checkNotNull(e);
        // Convert the transmitted time into nanoseconds
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // Add breakable lock
        lock.lockInterruptibly();
        try {
            // If the queue is full, enter the notFull condition variable for enough time
            while (count == items.length) {
                // If the waiting verse arrives and the queue is still full, return false directly
                if (nanos <= 0)
                    return false;
			// Wait long enough to enter the notFull condition variable. If another thread gets elements from the queue and wakes them up, it can add elements to the queue
                nanos = notFull.awaitNanos(nanos);
            }
            // If the queue is not full, the enqueue(e) method is called directly to add the element to the end of the queue
            enqueue(e);
            return true;
        } finally {
            // Release lock
            lock.unlock();
        }
    }

Put (E): adds an element to the queue. If the queue is full, it will block until there is an empty position in the queue and other threads wake it up. If the added element is null, an exception will be thrown

public void put(E e) throws InterruptedException {
        // First check whether the passed element is null. If it is null, an exception will be thrown directly
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // Add previous breakable lock
        lock.lockInterruptibly();
        try {
           // Judge whether there is an empty position in the queue. If not, enter the notFull condition variable and wait
            while (count == items.length)
                notFull.await();
            // If the queue is not full, it is directly added to the end of the queue. After adding, the thread that wants to get but is blocking will wake up
            enqueue(e);
        } finally {
        // Release lock
            lock.unlock();
        }

Out of line operation

remove(): removes an element from the head of the queue

   public E remove() {
        // Directly call the poll() method to remove an element from the head of the queue and return it
        E x = poll();
       // If the element is returned successfully during removal, it means that the element is returned successfully. If it is not found, an exception is thrown
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

remove(Object o):

    public boolean remove(Object o) {
        // If the passed element is null, return false directly
        if (o == null) return false;
        // First assign the itmes array to a temporary variable and operate in your own thread
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        // Lock when removing
        lock.lock();
        try {
            // Determine whether there are still elements in the queue
            if (count > 0) {
                // Get putIndex
                final int putIndex = this.putIndex;
                // Get the takeIndex, loop through the items and find the elements to be deleted. The elements before takeIndex are null, so skip directly
                int i = takeIndex;
                do {
                    // Judge whether the current element is in items. If found, delete it directly. Return to true
                    if (o.equals(items[i])) {
                        // Deletes the elements in items according to the specified subscript
                        removeAt(i);
                        return true;
                    }
                    // If the maximum capacity is reached, i=0, because when the maximum capacity is reached, putIndex will be set to 0
                    if (++i == items.length)
                        i = 0;
                    // If takeIndex==putIndex, it indicates that it has traversed the tail of the array (the last valid element in the items array)
                } while (i != putIndex);
            }
            // If the element to be deleted is not found in the items array, false is returned directly
            return false;
        } finally {
            // Release lock
            lock.unlock();
        }
    }

E poll(): get and remove the first element of the queue header. If there is no element in the queue, null will be returned directly and will not be blocked

    public E poll() {
        final ReentrantLock lock = this.lock;
        // Lock
        lock.lock();
        try {
            // If there are still elements in the queue, get an element directly from the pair header, return and remove the element from the queue
            return (count == 0) ? null : dequeue();
        } finally {
            // Release lock
            lock.unlock();
        }
    }

**E poll(long timeout, TimeUnit unit): * * get and remove the element of the head of the queue falling behind within the specified time. If there is no element in the timeout queue, it will directly return null

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // Convert the transmitted time into nanoseconds
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // Add an interruptible lock
        lock.lockInterruptibly();
        try {// If the queue has no elements, it enters the notEmpty condition variable and waits
            while (count == 0) {
                // When the waiting time is up, no element in the queue returns null
                if (nanos <= 0)
                    return null;
                // Enter the notEmpty condition variable and wait until the waiting time exceeds the timeout we set
                nanos = notEmpty.awaitNanos(nanos);
            }
            // If the while loop does not hold, it indicates that there are elements in the queue. Directly return the first element in the queue header and remove it
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  // Specific waiting methods (because time is limited, I won't analyze them one by one. I'll analyze them in depth when I have time)	
  public final long awaitNanos(long nanosTimeout) throws InterruptedException {
      		// If you are interrupted while waiting, throw an exception directly
            if (Thread.interrupted()) throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

E take(): get an element from the head of the queue. If the queue is empty, it will be blocked until another thread has added the element and has the opportunity to wake it up. It can be interrupted during the blocking process

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // Last interruptible lock
        lock.lockInterruptibly();
        try {
            // If the queue is empty
            while (count == 0)
                // When the queue is empty, it enters the notEmpty condition variable and waits until another thread wakes up after adding elements
                notEmpty.await();
            // Gets and removes the first valid element of the fallback header
            return dequeue();
        } finally {
            // Release lock
            lock.unlock();
        }
    }

ArrayBlockingQueue summary:

  • ArrayBlockQueue is a bounded blocking queue based on array. The elements in the queue are first in first out. The size of the queue is fixed, and initialization cannot be changed.
  • There are implementations of fair locks and non fair locks. The default is the implementation of non fair locks
  • ArrayBlockingQueue has pointers to integer variables, putIndex and takeIndex, indicating the positions of the head and tail of the queue in the array
  • Both put and take elements share a ReentrantLock lock, indicating that the operations of put and take elements cannot be carried out at the same time
  • Why is the member variable count of ArrayBlockingQueue not decorated with volatile? Because count changes when adding and deleting elements, and adding and deleting are locked, there is no need to modify volatile to ensure visibility.
  • One of his inter thread communication is implemented using Condition. There are two conditional variables: notfull (conditional variable waiting when the queue is full) and notempty (conditional variable waiting when the queue is empty)

Keywords: Java Back-end Multithreading

Added by Mgccl on Sun, 12 Dec 2021 11:43:46 +0200