Introduction to ArrayBlockingQueue:
-
In the implementation of array based blocking queue, a fixed length array is maintained in ArrayBlockingQueue to cache the data objects in the queue. This is a common blocking queue. In addition to a fixed length array, ArrayBlockingQueue also stores two shaping variables, which respectively identify the position of the head and tail of the queue in the array.
-
ArrayBlockingQueue shares the same lock object when the producer places data and the consumer obtains data, which also means that the two cannot run in parallel, which is especially different from LinkedBlockingQueue; According to the analysis of the implementation principle, the ArrayBlockingQueue can completely adopt the separation lock, so as to realize the complete parallel operation of producer and consumer operations. The reason why Doug Lea didn't do this may be that the data writing and obtaining operations of ArrayBlockingQueue are light enough to introduce an independent locking mechanism. In addition to bringing additional complexity to the code, it can't take any advantage in performance.
-
Another obvious difference between ArrayBlockingQueue and LinkedBlockingQueue is that the former will not generate or destroy any additional object instances when inserting or deleting elements, while the latter will generate an additional Node object. In the system that needs to process large quantities of data efficiently and concurrently for a long time, its impact on GC is still different. When creating ArrayBlockingQueue, we can also control whether the internal lock of the object adopts fair lock, and non fair lock is adopted by default.
ArrayBlockingQueue inheritance diagram
Let's take a look at how this array works, and we've seen one of its out of team and in team operations
The core method of ArrayBlock:
// The core method of BlockingQueue: // Add elements to the queue, and throw an IllegalStateException if the queue is full boolean add(E e) // Add elements to the queue. If the queue is full, it will directly return false. It will not block the thread currently adding elements. It will return true successfully boolean offer(E e) // Add the element to the queue within the specified time. If the queue is full, it will block until the queue has an empty position or wait for the timeout to end the method boolean offer(E e, long timeout, TimeUnit unit) // Add an element to the queue. If the queue is full, it is blocked until there is an empty position in the queue and another thread wakes it up void put(E e) // Get and remove the first element of the queue header. If there is no element in the queue, null will be returned directly and will not be blocked E poll() // Get and remove the elements that fall behind the queue head within the specified time. If there are no elements in the timeout queue, it will directly return null E poll(long timeout, TimeUnit unit) // Get an element from the head of the queue. If the queue is empty, it will be blocked until other threads have added the element and have the opportunity to wake it up. It can be interrupted during the blocking process E take() // Obtain a specified number of available elements from the queue at one time and put them into a new collection. This method can improve efficiency and avoid obtaining multiple locks in batches int drainTo(Collection<? super E> c, int maxElements)
Construction method of ArrayBlockingQueue:
// Pass an initialization capacity and build a new ArrayBlockingQueue queue public ArrayBlockingQueue(int capacity) { // Create an unfair ArrayBlockIngQueue queue based on the passed capacity this(capacity, false); } //Specify the initialization capacity, whether fair lock or non fair lock to build a new ArrayBlockingQueue queue public ArrayBlockingQueue(int capacity, boolean fair) { // If the initialization capacity is less than or equal to 0, an exception is thrown directly if (capacity <= 0)throw new IllegalArgumentException(); // The initial queue array is based on the capacity passed this.items = new Object[capacity]; // Create a specified lock. Whether it is fair or unfair depends on the passed parameters. false means unfair lock = new ReentrantLock(fair); // Create a wait condition variable whose queue is not empty. If there is no data in the queue when take(), the thread of take() will enter the condition variable to wait notEmpty = lock.newCondition(); // Create a waiting condition variable whose queue is not full. For example, when the queue is full when put(), the put thread will enter the condition variable to wait notFull = lock.newCondition(); } /** * Pass a collection and add the elements in the collection to the queue * @param capacity Initial capacity * @param fair Is fair lock used * @param c The collection containing elements cannot be empty. If it is empty, NullPointerException will be thrown */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { // Call the construction method to initialize the queue according to the passed parameters this(capacity, fair); final ReentrantLock lock = this.lock; // Perform locking operation lock.lock(); // Lock only for visibility, not mutual exclusion try { // Define a temporary variable i to traverse the collection int i = 0; try { // Traversing elements in a collection for (E e : c) { // If there are null elements in the collection, an exception is thrown directly checkNotNull(e); // Put the traversed elements into the array inside the queue items[i++] = e; } // If the number of elements in the collection exceeds the initial capacity of the queue, a parameter exception is thrown directly } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } // The number of valid elements in the passed collection is assigned to the counter of the queue count = i; // Record putIndex according to the number of elements in the collection, that is, the position of the subscript to be added in the array next time, //If i==capacity, an exception will be thrown during the next addition. The addition fails because count and capacity will be compared during the next addition putIndex = (i == capacity) ? 0 : i; } finally { // Release lock lock.unlock(); } }
Key core attributes and methods
Next, let's look at the important properties and methods inside ArrayBlockingQueue:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; // itmes for storing specific elements final Object[] items; // takeIndex is the next index of take, poll, peek or remove int takeIndex; // Putindex is used to record the subscript position where the next element needs to be inserted in the array int putIndex; // Number of elements in the queue int count; // Lock lock, which locks the added element and the obtained element. The same lock is used final ReentrantLock lock; // When getting elements, if the queue is empty. This time, the thread that gets the element enters the condition variable and waits private final Condition notEmpty; // When the queue is full when adding elements, the added thread enters the condition variable and waits private final Condition notFull; // An iterator inherited from Itorator transient Itrs itrs = null; final int dec(int i) { return ((i == 0) ? items.length : i) - 1; } // Returns the elements in the itmes array according to the subscript @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; } // Null pointer exception detected private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } // Add an element to the items array and lock it when calling this method private void enqueue(E x) { // First assign the itmes array to a temporary variable and operate in your own thread final Object[] items = this.items; // Add the element to be added to the items array items[putIndex] = x; // If putIndex reaches the maximum capacity of the array, set putIndex to 0 if (++putIndex == items.length) putIndex = 0; // The counter that records the number of elements in the queue + 1 count++; // Wake up the thread that gets the element block after adding notEmpty.signal(); } // Remove the element from the items array and lock it before calling the method private E dequeue() { // First assign the itmes array to a temporary variable and operate in your own thread final Object[] items = this.items; @SuppressWarnings("unchecked") // Get the element from the header of the queue according to takeIndex E x = (E) items[takeIndex]; // Setting the subscript of the element removed this time to null can help GC items[takeIndex] = null; // If takeIndex = = the array length of itmes, it indicates that the end of the array has been reached. Set takeIndex to 0 if (++takeIndex == items.length) takeIndex = 0; // Number of elements in the queue - 1 count--; if (itrs != null) itrs.elementDequeued(); // After removal, wake up the thread that wants to add elements but is blocking notFull.signal(); return x; } // Specify to delete the elements on the index. After deleting, wake up the thread that wants to be added but is blocking. A lock will be performed when this method is executed void removeAt(final int removeIndex) { // First assign the itmes array to a temporary variable and operate in your own thread final Object[] items = this.items; if (removeIndex == takeIndex) { items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal(); }
Queue operation
Add (E): call the offer(e) method to add elements to the queue. If the queue is full, throw an IllegalStateException
public boolean add(E e) { // Directly call the offer(e) method to add this element to the end of the queue. If it returns true successfully, the failure indicates that the opposite side is full and an exception is thrown if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
Offer (E): add elements to the queue. If the queue is full, it will directly return false. It will not block the thread currently adding elements. It will successfully return true
public boolean offer(E e) { // First check whether the passed element is null. If it is null, an exception will be thrown directly checkNotNull(e); final ReentrantLock lock = this.lock; // Lock operation. Whether it is fair or unfair is passed by the construction method. It is unfair by default lock.lock(); try { // If the queue is full, return false directly if (count == items.length) return false; else { // If the queue is not full, directly call the enqueue(e) method to add it to the end of the queue and return true enqueue(e); return true; } } finally { // Release lock lock.unlock(); } }
Offer (E, long timeout, timeunit unit): add elements to the queue within the specified time. If the queue is full, it will be blocked until the queue has an empty position or wait for the timeout to end the method
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // Detect whether the passed element is null, and throw an exception directly if it is null checkNotNull(e); // Convert the transmitted time into nanoseconds long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Add breakable lock lock.lockInterruptibly(); try { // If the queue is full, enter the notFull condition variable for enough time while (count == items.length) { // If the waiting verse arrives and the queue is still full, return false directly if (nanos <= 0) return false; // Wait long enough to enter the notFull condition variable. If another thread gets elements from the queue and wakes them up, it can add elements to the queue nanos = notFull.awaitNanos(nanos); } // If the queue is not full, the enqueue(e) method is called directly to add the element to the end of the queue enqueue(e); return true; } finally { // Release lock lock.unlock(); } }
Put (E): adds an element to the queue. If the queue is full, it will block until there is an empty position in the queue and other threads wake it up. If the added element is null, an exception will be thrown
public void put(E e) throws InterruptedException { // First check whether the passed element is null. If it is null, an exception will be thrown directly checkNotNull(e); final ReentrantLock lock = this.lock; // Add previous breakable lock lock.lockInterruptibly(); try { // Judge whether there is an empty position in the queue. If not, enter the notFull condition variable and wait while (count == items.length) notFull.await(); // If the queue is not full, it is directly added to the end of the queue. After adding, the thread that wants to get but is blocking will wake up enqueue(e); } finally { // Release lock lock.unlock(); }
Out of line operation
remove(): removes an element from the head of the queue
public E remove() { // Directly call the poll() method to remove an element from the head of the queue and return it E x = poll(); // If the element is returned successfully during removal, it means that the element is returned successfully. If it is not found, an exception is thrown if (x != null) return x; else throw new NoSuchElementException(); }
remove(Object o):
public boolean remove(Object o) { // If the passed element is null, return false directly if (o == null) return false; // First assign the itmes array to a temporary variable and operate in your own thread final Object[] items = this.items; final ReentrantLock lock = this.lock; // Lock when removing lock.lock(); try { // Determine whether there are still elements in the queue if (count > 0) { // Get putIndex final int putIndex = this.putIndex; // Get the takeIndex, loop through the items and find the elements to be deleted. The elements before takeIndex are null, so skip directly int i = takeIndex; do { // Judge whether the current element is in items. If found, delete it directly. Return to true if (o.equals(items[i])) { // Deletes the elements in items according to the specified subscript removeAt(i); return true; } // If the maximum capacity is reached, i=0, because when the maximum capacity is reached, putIndex will be set to 0 if (++i == items.length) i = 0; // If takeIndex==putIndex, it indicates that it has traversed the tail of the array (the last valid element in the items array) } while (i != putIndex); } // If the element to be deleted is not found in the items array, false is returned directly return false; } finally { // Release lock lock.unlock(); } }
E poll(): get and remove the first element of the queue header. If there is no element in the queue, null will be returned directly and will not be blocked
public E poll() { final ReentrantLock lock = this.lock; // Lock lock.lock(); try { // If there are still elements in the queue, get an element directly from the pair header, return and remove the element from the queue return (count == 0) ? null : dequeue(); } finally { // Release lock lock.unlock(); } }
**E poll(long timeout, TimeUnit unit): * * get and remove the element of the head of the queue falling behind within the specified time. If there is no element in the timeout queue, it will directly return null
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // Convert the transmitted time into nanoseconds long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Add an interruptible lock lock.lockInterruptibly(); try {// If the queue has no elements, it enters the notEmpty condition variable and waits while (count == 0) { // When the waiting time is up, no element in the queue returns null if (nanos <= 0) return null; // Enter the notEmpty condition variable and wait until the waiting time exceeds the timeout we set nanos = notEmpty.awaitNanos(nanos); } // If the while loop does not hold, it indicates that there are elements in the queue. Directly return the first element in the queue header and remove it return dequeue(); } finally { lock.unlock(); } } // Specific waiting methods (because time is limited, I won't analyze them one by one. I'll analyze them in depth when I have time) public final long awaitNanos(long nanosTimeout) throws InterruptedException { // If you are interrupted while waiting, throw an exception directly if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
E take(): get an element from the head of the queue. If the queue is empty, it will be blocked until another thread has added the element and has the opportunity to wake it up. It can be interrupted during the blocking process
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // Last interruptible lock lock.lockInterruptibly(); try { // If the queue is empty while (count == 0) // When the queue is empty, it enters the notEmpty condition variable and waits until another thread wakes up after adding elements notEmpty.await(); // Gets and removes the first valid element of the fallback header return dequeue(); } finally { // Release lock lock.unlock(); } }
ArrayBlockingQueue summary:
- ArrayBlockQueue is a bounded blocking queue based on array. The elements in the queue are first in first out. The size of the queue is fixed, and initialization cannot be changed.
- There are implementations of fair locks and non fair locks. The default is the implementation of non fair locks
- ArrayBlockingQueue has pointers to integer variables, putIndex and takeIndex, indicating the positions of the head and tail of the queue in the array
- Both put and take elements share a ReentrantLock lock, indicating that the operations of put and take elements cannot be carried out at the same time
- Why is the member variable count of ArrayBlockingQueue not decorated with volatile? Because count changes when adding and deleting elements, and adding and deleting are locked, there is no need to modify volatile to ensure visibility.
- One of his inter thread communication is implemented using Condition. There are two conditional variables: notfull (conditional variable waiting when the queue is full) and notempty (conditional variable waiting when the queue is empty)