The source code of this article is based on JDK13
Overall class diagram:

BlockingQueue interface
Official annotation translation
What is a blocking queue?
A queue that supports:
- When getting an element, if the queue is empty, you can wait for the element to queue until the queue is not empty
- When storing elements, if the queue is full, you can wait for the elements to leave the queue until the queue makes room
This is a blocking queue ~
There are four ways to block the queue. The operation cannot be satisfied immediately, but it may be satisfied at some time in the future:
- Throw exception
- Return special values (null/false, etc.)
- Block until the operation is satisfied
- Block until a given maximum wait time
The following table is a summary:
Method type | Throw exception | Special value | wait for | Support timeout waiting |
---|---|---|---|---|
insert | add(e) | offer(e) | put(e) | offer(e,time,unit) |
remove | remove() | poll() | take() | poll(time,unit) |
examine | element() | peek() | I won't support it | I won't support it |
The blocking queue does not accept null values. Its implementation will throw NPE when trying to add null values. Null values are used to indicate that the poll operation is wrong
The blocking queue can be set to be bounded. It can have a remaining capacity. Beyond this capacity, the put method without blocking cannot succeed. For blocking queues without a specified capacity, the remaining capacity is integer.max by default_ VALUE.
The implementation class of blocking queue is designed to be used in the generation consumption model and supports the Collection interface. Therefore, it supports deleting a given element from the queue. However, these methods are not very efficient and are only intended to be used occasionally. The messages mainly used in the queue are cancelled
The implementation class of blocking queue is thread safe. All queued methods implement their operations atomically, using internal locks or other forms of synchronization control. However, batch collection operations addAll, containsAll, retainAll and removeAll are not thread safe unless they are specifically implemented. Therefore, the addAll method can throw an exception after adding some of its elements
The blocking queue is essentially idle. It is similar to the closeshutdown method to indicate that no more elements will be added. Such requirements are implemented independently by subclasses
For example, a common strategy is that the producer writes a special value, which will cause all consumers to interrupt, so as to realize the above requirements
Usage example:
In a common production and consumption scenario, note that the blocking queue can be thread safe used by multiple producers and consumers
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { ...handle ...} } Object produce() { ...} } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { ...handle ...} } void consume(Object x) { ...} } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
Interface method
For most methods, there is a table in the comments above, which defines what strategy to use and what operations to do. Only a few interfaces are noted here
- add
- offer
- put
- offer(time,unit)
- take
- poll
- remainingCapacity remaining capacity
- remove
- Does the contains contain
- drainTo
- drainTo(collection, int number) removes available elements from the queue and puts them into the given collection. A maximum of a given number can be removed
ArrayBlockingQueue
Official annotation translation
A bounded blocking queue implemented using an array, which provides elements in FIFO order
The first element of the queue, that is, the queue head, is the element that has been in the queue for the longest time, and the queue tail is the element that has been in the queue for the least time. The new element will be inserted into the tail of the queue, and the get operation of the queue will get the element from the queue head
This is a classic bounded buffer, a fixed size array, holding elements, elements inserted by producers and elements obtained by consumers. Once created, the capacity can no longer be changed
Inserting elements into a full queue will cause blocking, and getting elements from an empty queue will also cause blocking
This class supports the optional production and consumption thread blocking fair wait order policy. By default, this order is not guaranteed
However, if a fair policy is specified when creating a queue, threads will be guaranteed to access in FIFO order
Fairness strategies usually reduce throughput but reduce uncertainty and avoid excessive hunger
This class and its iterators implement all optional methods of collection classes and iterators
This class is also part of the Java collection framework
Source code
definition
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
ArrayBlockingQueue inherits from AbstractQueue and has the common methods of queue. At the same time, it implements the BlockingQueue interface and has the characteristics of blocking queue
attribute
/** The queued items */ // Elements in a queue saved with an array final Object[]items; /** items index for next take, poll, peek or remove */ // The index of the next element to be removed, referring to the head of the queue int takeIndex; /** items index for next put, offer, or add */ // The index of the next element to be added, referring to the end of the queue int putIndex; /** Number of elements in the queue */ // Number of elements in the current queue int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ // lock /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ // Waiting conditions, consumer waiting private final Condition notEmpty; /** Condition for waiting puts */ // Wait condition private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ // Iterator? transient Itrs itrs;
Some core attributes are introduced, in which the elements in the queue are saved by the array, and the two subscripts point to the head and tail of the queue respectively
Construction method
// Specified capacity public ArrayBlockingQueue(int capacity){ this(capacity,false); } // Specify the capacity and fairness policy. The default fairness policy is unfair public ArrayBlockingQueue(int capacity,boolean fair){ if(capacity<=0) throw new IllegalArgumentException(); this.items=new Object[capacity]; lock=new ReentrantLock(fair); notEmpty=lock.newCondition(); notFull=lock.newCondition(); } // Initializes the blocking queue with a given collection. In addition to initializing the attributes, all elements in the collection are put into the queue public ArrayBlockingQueue(int capacity,boolean fair, Collection<?extends E> c){ this(capacity,fair); final ReentrantLock lock=this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try{ final Object[]items=this.items; int i=0; try{ for(E e:c) items[i++]=Objects.requireNonNull(e); }catch(ArrayIndexOutOfBoundsException ex){ throw new IllegalArgumentException(); } count=i; putIndex=(i==capacity)?0:i; }finally{ lock.unlock(); } }
You can specify the capacity of the blocking queue and the fairness policy
In addition, it supports putting all elements in a given collection into a queue
Queue operation
- add
- put
- offer
- offer(time,unit)
These four methods correspond to the four strategies of blocking queue processing when the queue is full but still needs to join the queue
add throws an exception
public boolean add(E e){ return super.add(e); }
Call the add method of the parent class AbstractQueue and throw an exception if the queue is full
offer returns a special value
Returns true if successful and false if failed.
public boolean offer(E e){ Objects.requireNonNull(e); final ReentrantLock lock=this.lock; lock.lock(); try{ if(count==items.length) return false; else{ enqueue(e); return true; } }finally{ lock.unlock(); } }
Lock first, and then judge whether the current queue is full. If it is full, return false. Otherwise, enqueue is called to enter the queue operation
private void enqueue(E e){ // assert lock.isHeldByCurrentThread(); // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[]items=this.items; // Put it at the end of the team items[putIndex]=e; // If the array length is exceeded, it returns 0 if(++putIndex==items.length)putIndex=0; // Element + 1 count++; // The queue is not empty. Wake up the waiting person notEmpty.signal(); }
This is a core queue method, which is actually called by many methods to add elements
put timeout
public void put(E e)throws InterruptedException{ Objects.requireNonNull(e); final ReentrantLock lock=this.lock; lock.lockInterruptibly(); try{ while(count==items.length) notFull.await(); enqueue(e); }finally{ lock.unlock(); } }
If the queue is full, wait directly on the notFull condition. After being awakened, join the queue
offer(e,time,unit) supports timeout waiting operation
public boolean offer(E e,long timeout,TimeUnit unit) throws InterruptedException{ Objects.requireNonNull(e); long nanos=unit.toNanos(timeout); final ReentrantLock lock=this.lock; lock.lockInterruptibly(); try{ // If the queue is full while(count==items.length){ // And timeout, return 0 if(nanos<=0L) return false; nanos=notFull.awaitNanos(nanos); } enqueue(e); return true; }finally{ lock.unlock(); } }
If the queue is full, it spins. If it times out, it returns false. Wait on notFull condition without timeout. Queue up after being awakened
Out of line operation
poll returns a special value
public E poll(){ final ReentrantLock lock=this.lock; lock.lock(); try{ return(count==0)?null:dequeue(); }finally{ lock.unlock(); } }
If the queue is empty, null is returned. This is why the blocking queue does not support null elements, because the null value is used to represent that the queue is empty. If it is not empty, the queue is dequeued
private E dequeue(){ // assert lock.isHeldByCurrentThread(); // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[]items=this.items; @SuppressWarnings("unchecked") // Get first element from team leader E e=(E)items[takeIndex]; // The head of the team becomes empty items[takeIndex]=null; // Head of line pointer movement if(++takeIndex==items.length)takeIndex=0; // Number of elements - 1 count--; // Tell iterator if(itrs!=null) itrs.elementDequeued(); // Notify the waiting producer that there is a free position in the queue notFull.signal(); return e; }
This is the core out of queue operation, which completes the changes of multiple related attributes according to the steps in the notes. This method is called at the core of out of queue operation
take blocked
public E take()throws InterruptedException{ final ReentrantLock lock=this.lock; lock.lockInterruptibly(); try{ while(count==0) notEmpty.await(); return dequeue(); }finally{ lock.unlock(); } }
If the queue is empty, wait on the notEmpty condition and perform the dequeue operation after being awakened.
poll(time,unit) timeout version blocking
public E poll(long timeout,TimeUnit unit)throws InterruptedException{ long nanos=unit.toNanos(timeout); final ReentrantLock lock=this.lock; lock.lockInterruptibly(); try{ while(count==0){ if(nanos<=0L) return null; nanos=notEmpty.awaitNanos(nanos); } return dequeue(); }finally{ lock.unlock(); } }
If the queue is empty and times out, null is returned. If there is no timeout, wait for the specified number of milliseconds
View series methods
- size simply returns the count value
- peek returns the element of the team leader, but does not pop up. It can be used to view the element of the current team leader
- remainingCapacity returns the remaining capacity
summary
ArrayBlockingQueue is a relatively simple implementation of blocking queue
The array holds the elements, and the two pointers at the beginning and end of the team are responsible for controlling the position of entering and leaving the team
Thread security is guaranteed by ReentrantLock, and all internal read and change to array need to be locked
The blocking function is realized by the conditions of the lock band. The two conditions are respectively responsible for the blocking of producers and consumers when the queue is not empty and the queue is not full, and the wake-up function after the conditions are met
LinkedBlockingQueue
Official annotation translation
A linked list can be a bounded blocking queue. The elements are sorted as FIFO
The queue head element is the element with the longest time in the queue, and the queue tail element is the element with the shortest time in the queue
The new element is inserted at the end of the queue, and the get operation gets the element from the queue head
The queue implemented by linked list usually has higher throughput than that implemented by array, but in the case of high concurrency, the performance is more unpredictable
The normal capacity boundary provides an operation to expand the queue. By default, the capacity is Integer.MAX_VALUE. The nodes of the linked list are created dynamically at each insertion operation, unless the insertion will exceed the queue capacity
This class and its Iterator implement all the optional methods of the Collection and Iterator interfaces. This class is also part of the Java Collection framework
Source code
definition
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
The basic AbstractQueue interface is implemented, so it has all the common queue methods. At the same time, the BlockingQueue interface is implemented, and it also has all the characteristics of blocking queue
Linked list node definition
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
A relatively simple definition holds the pointer of the current element and the next node
There are three situations for this pointer:
- Node is a real node
- The current node means that the next node of the current node is head.next
- null means there is no successor node. The current node is the last node
attribute
// Initial set capacity private final int capacity; // Current quantity private final AtomicInteger count=new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ // Header node. The item of the header node must be null transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ // The next pointer of the tail node must be null private transient Node<E> last; /** Lock held by take, poll, etc */ // Lock when getting element private final ReentrantLock takeLock=new ReentrantLock(); /** Wait queue for waiting takes */ // Wait condition that is not empty private final Condition notEmpty=takeLock.newCondition(); /** Lock held by put, offer, etc */ // Lock when writing element private final ReentrantLock putLock=new ReentrantLock(); /** Wait queue for waiting puts */ // Queue has idle wait condition private final Condition notFull=putLock.newCondition();
- Firstly, the maximum capacity and current capacity are saved to realize the bounded queue.
- Secondly, the head node and tail node are saved to save the actual elements in the linked list
- Finally, two locks are held to lock the head and tail of the team respectively to ensure thread safety
- Each lock has a corresponding waiting condition, which is used to sleep / wake up threads and realize thread blocking
Construction method
public LinkedBlockingQueue(){ this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity){ if(capacity<=0)throw new IllegalArgumentException(); this.capacity=capacity; last=head=new Node<E>(null); } public LinkedBlockingQueue(Collection<?extends E> c){ this(Integer.MAX_VALUE); final ReentrantLock putLock=this.putLock; putLock.lock(); // Never contended, but necessary for visibility try{ int n=0; for(E e:c){ if(e==null) throw new NullPointerException(); if(n==capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); }finally{ putLock.unlock(); } }
Three construction methods are provided, which can specify the capacity and then initialize the header node
It also supports the initialization of a given set into a queue
Queue operation
add throws an exception
Call the AbstractQueue method of. If the queue is full, throw an exception directly
offer(e) returns true/false
public boolean offer(E e){ if(e==null)throw new NullPointerException(); // Current quantity final AtomicInteger count=this.count; // If the queue is full, false is returned if(count.get()==capacity) return false; final int c; // Initialize current node final Node<E> node=new Node<E>(e); // Write lock final ReentrantLock putLock=this.putLock; putLock.lock(); try{ // Check the capacity again after locking if(count.get()==capacity) return false; // Queue operation enqueue(node); // Quantity + 1 c=count.getAndIncrement(); // If there is still free capacity, wake up other producers waiting if(c+1<capacity) notFull.signal(); }finally{ putLock.unlock(); } // If the capacity is 0 before the current write, that is, the current node is the first element put into the empty queue to wake up the waiting consumers if(c==0) signalNotEmpty(); return true; }
Check the queue capacity twice. If the limit is exceeded, false is returned. Otherwise, call the queue operation. In addition, if the current element is the first element put into the empty queue, wake up other consumers and tell them that the queue is not empty
private void enqueue(Node<E> node){ // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // Links the current node to the next node of the last node last=last.next=node; }
This is the core element queue operation, which is relatively simple, just link the current element at the end of the current linked list
put blocking
public void put(E e)throws InterruptedException{ if(e==null)throw new NullPointerException(); final int c; // Create current node final Node<E> node=new Node<E>(e); final ReentrantLock putLock=this.putLock; final AtomicInteger count=this.count; // Write lock putLock.lockInterruptibly(); try{ /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ // If the queue is full, spin and wait while(count.get()==capacity){ notFull.await(); } // After being awakened, it indicates that the queue is not satisfied and the queue joining operation is executed enqueue(node); c=count.getAndIncrement(); // If the queue is not satisfied, assist in waking up other producers if(c+1<capacity) notFull.signal(); }finally{ putLock.unlock(); } // If the current element is the first element, wake up other consumers if(c==0) signalNotEmpty(); }
The overall situation is similar to that of an offer, except that when the queue is found to be full, false is not returned. Instead, wait on the notFull condition and wait for other threads to wake up. After waking up, you can continue to join the team.
offer(e,time,unit)
public boolean offer(E e,long timeout,TimeUnit unit) throws InterruptedException{ if(e==null)throw new NullPointerException(); long nanos=unit.toNanos(timeout); final int c; final ReentrantLock putLock=this.putLock; final AtomicInteger count=this.count; putLock.lockInterruptibly(); try{ while(count.get()==capacity){ if(nanos<=0L) return false; nanos=notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c=count.getAndIncrement(); if(c+1<capacity) notFull.signal(); }finally{ putLock.unlock(); } if(c==0) signalNotEmpty(); return true; }
It is similar to put, except that when the return queue is full, if it has timed out, it will return false. If it has not timed out, it will let the current thread sleep for a given number of milliseconds to judge whether it can join the queue again
Out of line operation
public E poll(){ // Current quantity final AtomicInteger count=this.count; // If it is currently empty, null is returned if(count.get()==0) return null; final E x; final int c; final ReentrantLock takeLock=this.takeLock; // Read lock and lock takeLock.lock(); try{ // Check again if the queue is empty if(count.get()==0) return null; // Perform queue out operation x=dequeue(); c=count.getAndDecrement(); // If the current element is not the only element in the queue, assist in waking the consumer if(c>1) notEmpty.signal(); }finally{ takeLock.unlock(); } // If the queue is full before leaving the queue, after leaving the queue, the queue is dissatisfied and other producers are awakened if(c==capacity) signalNotFull(); return x; }
If the queue is empty, a null out - of - queue method is returned
First, check the capacity of the array. If it is empty, null will be returned. Then lock the read lock and check the capacity again
If the capacity is not empty, perform the core queue out operation. After leaving the queue, if there are still elements in the queue, it will help wake up consumers
If the queue is full before leaving the queue, the current element is the first element out of the queue and wakes up other producers
private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); // assert head.item == null // Head node of linked list Node<E> h=head; // The first node of the linked list Node<E> first=h.next; h.next=h; // help GC // Change the header node to first head=first; E x=first.item; first.item=null; // Return node return x; }
The core method of the linked list is a little more complex than joining the team
Point the head node (empty placeholder node) to the first real node and return the elements of the real node
take() blocked
public E take()throws InterruptedException{ final E x; final int c; final AtomicInteger count=this.count; final ReentrantLock takeLock=this.takeLock; takeLock.lockInterruptibly(); try{ while(count.get()==0){ notEmpty.await(); } x=dequeue(); c=count.getAndDecrement(); if(c>1) notEmpty.signal(); }finally{ takeLock.unlock(); } if(c==capacity) signalNotFull(); return x; }
Lock first, and then judge whether the queue is empty. If it is empty, block on the notEmpty condition and wait for wake-up
After being woken up, it indicates that the current queue is not empty and the out of queue operation is executed
If the queue is not empty after leaving the queue, help wake up the consumer. If the queue is full before leaving the queue, wake up the producer
poll(time,unit)
public E poll(long timeout,TimeUnit unit)throws InterruptedException{ final E x; final int c; long nanos=unit.toNanos(timeout); final AtomicInteger count=this.count; final ReentrantLock takeLock=this.takeLock; takeLock.lockInterruptibly(); try{ while(count.get()==0){ if(nanos<=0L) return null; nanos=notEmpty.awaitNanos(nanos); } x=dequeue(); c=count.getAndDecrement(); if(c>1) notEmpty.signal(); }finally{ takeLock.unlock(); } if(c==capacity) signalNotFull(); return x; }
It is very similar to the take method, except that when the queue is found to be empty, you need to judge whether it times out. If it times out, null is returned. If there is no timeout, let the current thread block for a given time instead of unlimited blocking
View method
- size returns the current quantity
- remainingCapacity remaining capacity
- peek gets the current team header element, but does not pop up. It is mainly used to view the content of the team header element
summary
LinkedBlockingQueue is another simple implementation of blocking queue. Linked list is used internally to store elements
- Firstly, the maximum capacity and current capacity are saved to realize the bounded queue.
- Secondly, the head node and tail node are saved, which is used to save the actual elements in the linked list, and it is convenient for entering and leaving the team
- Hold two locks, lock the head and tail of the team respectively, to ensure thread safety
- Each lock has a corresponding waiting Condition, which is used to sleep / wake up, incoming and outgoing threads, that is, producer and consumer threads, to realize thread blocking and wake-up after conditions are met
Because the team head and tail nodes are saved, the performance of queue in and out operations is relatively good. Two locks are used to control the synchronization control of queue in and out respectively, which can minimize lock competition and improve performance.
Compared with ArrayBlockingQueue implemented by array, the throughput will be higher, but in the case of high concurrency, there will be some unpredictable performance loss
End.
Contact me
Finally, welcome to my personal official account, Yan Yan ten, which will update many learning notes from the backend engineers. I also welcome direct official account or personal mail or email to contact me.
The above are all personal thoughts. If there are any mistakes, please correct them in the comment area.
Welcome to reprint, please sign and keep the original link.
Contact email: huyanshi2580@gmail.com
For more study notes, see personal blog or WeChat official account, Yan Yan ten > > Huyan ten