introduce
ArrayBlockingQueue is the next blocked queue implemented as an array in a java Concurrent package and is thread-safe. See the analysis below for the need for scaling up.
queue
Queues, a linear table, are characterized by FIFO, or FIFO. As we usually queue, first come first, that is, the first person who enters the queue first leaves the queue.
Source Code Analysis
Main attributes
// Storing elements using arrays final Object[] items; // Pointer to element int takeIndex; // Pointer to Place Element int putIndex; // Number of elements int count; // Locks guaranteeing concurrent access final ReentrantLock lock; // Non-empty condition private final Condition notEmpty; // Non-full condition private final Condition notFull;
From the attributes, we can get the following important information:
- Store elements using arrays;
- Mark the position of the next operation by placing and taking the pointer;
- Use reentrant locks to ensure concurrent security;
Main construction methods
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // Initialize Array this.items = new Object[capacity]; // Create reentrant locks and two conditions lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
Two conclusions can be drawn from the construction method:
- ArrayBlockingQueue initialization must pass in capacity, which is the size of the array;
- Constructive methods can be used to control whether the type of re-entry lock is fair or unfair.
Entry
There are four ways to join the team: add (E), offer (E), put (E), offer (E), long timeout (TimeUnit unit). What are the differences between them?
public boolean add(E e) { // Call the add(e) method of the parent class return super.add(e); } // super.add(e) public boolean add(E e) { // Call offer(e) If true is returned successfully, throw an exception if failed if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { // Element cannot be empty checkNotNull(e); final ReentrantLock lock = this.lock; // Locking lock.lock(); try { if (count == items.length) // Return false if the array is full return false; else { // Invoke the queue-entry method and return true if the array is not full enqueue(e); return true; } } finally { // Unlock lock.unlock(); } } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // Lock, throw an exception if the thread interrupts lock.lockInterruptibly(); try { // If the array is full, use notFull to wait // notFull waits, which means the queue is full now // Queues are not satisfied until an element is removed // Then wake up notFull and continue with the logic now // Why use while instead of if here // It's possible that multiple threads are blocking on the lock // Even if it wakes up, it's possible that other threads modified the queue one step before it became full again // You need to wait again at this time while (count == items.length) notFull.await(); // Entry enqueue(e); } finally { // Unlock lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Locking lock.lockInterruptibly(); try { // If the array is full, block nanos for nanoseconds // Return false if there is still no space to wake up this thread and time is up while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } // Entry enqueue(e); return true; } finally { // Unlock lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; // Place the element directly at the pointer position items[putIndex] = x; // If you place the pointer at the end of the array, return to the head if (++putIndex == items.length) putIndex = 0; // Quantity plus 1 count++; // Wake up notEmpty, because there is an element in the queue, so it must not be empty notEmpty.signal(); }
- Throw an exception if the queue is full when add(e);
- Return false if the queue is full when offer(e);
- Use notFull to wait when the queue is full;
- When offer(e, timeout, unit), wait a while if the queue is full and return false if the queue is still full.
- Store elements using a pointer-release loop using arrays;
Queue
There are four ways to get out of the queue: remove(), poll(), take(), poll(long timeout, TimeUnit unit). What is the difference between them?
public E remove() { // Call poll() method to queue E x = poll(); if (x != null) // Return an element if it is queued return x; else // Throw an exception if no elements are queued throw new NoSuchElementException(); } public E poll() { final ReentrantLock lock = this.lock; // Locking lock.lock(); try { // Returns null if the queue has no elements or leaves the queue return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // Locking lock.lockInterruptibly(); try { // Blocking waits on condition notEmpty if queue has no elements while (count == 0) notEmpty.await(); // There are elements before you leave the team return dequeue(); } finally { // Unlock lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // Locking lock.lockInterruptibly(); try { // Blocking waits for nanos nanoseconds if the queue has no elements // Return null if the next time this thread acquires a lock but the queue remains empty and has timed out while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") // Take element of pointer position E x = (E) items[takeIndex]; // Set the fetch pointer position to null items[takeIndex] = null; // Move the pointer forward to return the array front end recycling if the array ends if (++takeIndex == items.length) takeIndex = 0; // Number of elements minus 1 count--; if (itrs != null) itrs.elementDequeued(); // Wake up notFull condition notFull.signal(); return x; }
- Throw an exception if the queue is empty when remove();
- null is returned if the queue is empty when poll();
- take() if the queue is empty, blocking waits on condition notEmpty;
- When poll(timeout, unit), if the queue is empty, it will block and wait for a period of time and return null if it is still empty.
- Use the fetch pointer loop to fetch elements from the array;
summary
- ArrayBlockingQueue does not need to be expanded because it specifies capacity at initialization and recycles arrays;
- ArrayBlockingQueue recycles arrays using takeIndex and putIndex;
- Entry and Exit define four groups of methods for different purposes.
- Use reentrant locks and two conditions to ensure concurrent security;
- Queue length is fixed and must be specified at initialization, so capacity must be carefully considered before use.
- If the speed of consumption does not keep up with the speed of queue entry, the provider thread will be blocked all the time, and the more blocked, the more dangerous it will be;