(juc Series) blocking queue and its implementation

The source code of this article is based on JDK13

Overall class diagram:

BlockingQueue interface

Official annotation translation

What is a blocking queue?

A queue that supports:

  • When getting an element, if the queue is empty, you can wait for the element to queue until the queue is not empty
  • When storing elements, if the queue is full, you can wait for the elements to leave the queue until the queue makes room

This is a blocking queue ~

There are four ways to block the queue. The operation cannot be satisfied immediately, but it may be satisfied at some time in the future:

  • Throw exception
  • Return special values (null/false, etc.)
  • Block until the operation is satisfied
  • Block until a given maximum wait time

The following table is a summary:

Method type

Throw exception

Special value

wait for

Support timeout waiting

insert

add(e)

offer(e)

put(e)

offer(e,time,unit)

remove

remove()

poll()

take()

poll(time,unit)

examine

element()

peek()

I won't support it

I won't support it

The blocking queue does not accept null values. Its implementation will throw NPE when trying to add null values. Null values are used to indicate that the poll operation is wrong

The blocking queue can be set to be bounded. It can have a remaining capacity. Beyond this capacity, the put method without blocking cannot succeed. For blocking queues without a specified capacity, the remaining capacity is integer.max by default_ VALUE.

The implementation class of blocking queue is designed to be used in the generation consumption model and supports the Collection interface. Therefore, it supports deleting a given element from the queue. However, these methods are not very efficient and are only intended to be used occasionally. The messages mainly used in the queue are cancelled

The implementation class of blocking queue is thread safe. All queued methods implement their operations atomically, using internal locks or other forms of synchronization control. However, batch collection operations addAll, containsAll, retainAll and removeAll are not thread safe unless they are specifically implemented. Therefore, the addAll method can throw an exception after adding some of its elements

The blocking queue is essentially idle. It is similar to the closeshutdown method to indicate that no more elements will be added. Such requirements are implemented independently by subclasses

For example, a common strategy is that the producer writes a special value, which will cause all consumers to interrupt, so as to realize the above requirements

Usage example:

In a common production and consumption scenario, note that the blocking queue can be thread safe used by multiple producers and consumers

 class Producer implements Runnable {
    private final BlockingQueue queue;

    Producer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while (true) {
                queue.put(produce());
            }
        } catch (InterruptedException ex) { ...handle ...}
    }

    Object produce() { ...}
}

class Consumer implements Runnable {
    private final BlockingQueue queue;

    Consumer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while (true) {
                consume(queue.take());
            }
        } catch (InterruptedException ex) { ...handle ...}
    }

    void consume(Object x) { ...}
}

class Setup {
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

Interface method

For most methods, there is a table in the comments above, which defines what strategy to use and what operations to do. Only a few interfaces are noted here

  • add
  • offer
  • put
  • offer(time,unit)
  • take
  • poll
  • remainingCapacity remaining capacity
  • remove
  • Does the contains contain
  • drainTo
  • drainTo(collection, int number) removes available elements from the queue and puts them into the given collection. A maximum of a given number can be removed

ArrayBlockingQueue

Official annotation translation

A bounded blocking queue implemented using an array, which provides elements in FIFO order

The first element of the queue, that is, the queue head, is the element that has been in the queue for the longest time, and the queue tail is the element that has been in the queue for the least time. The new element will be inserted into the tail of the queue, and the get operation of the queue will get the element from the queue head

This is a classic bounded buffer, a fixed size array, holding elements, elements inserted by producers and elements obtained by consumers. Once created, the capacity can no longer be changed

Inserting elements into a full queue will cause blocking, and getting elements from an empty queue will also cause blocking

This class supports the optional production and consumption thread blocking fair wait order policy. By default, this order is not guaranteed

However, if a fair policy is specified when creating a queue, threads will be guaranteed to access in FIFO order

Fairness strategies usually reduce throughput but reduce uncertainty and avoid excessive hunger

This class and its iterators implement all optional methods of collection classes and iterators

This class is also part of the Java collection framework

Source code

definition

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

ArrayBlockingQueue inherits from AbstractQueue and has the common methods of queue. At the same time, it implements the BlockingQueue interface and has the characteristics of blocking queue

attribute

    /** The queued items */
// Elements in a queue saved with an array
final Object[]items;

        /** items index for next take, poll, peek or remove */
        // The index of the next element to be removed, referring to the head of the queue
        int takeIndex;

        /** items index for next put, offer, or add */
        // The index of the next element to be added, referring to the end of the queue
        int putIndex;

        /** Number of elements in the queue */
        // Number of elements in the current queue
        int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

// lock
/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
// Waiting conditions, consumer waiting
private final Condition notEmpty;

/** Condition for waiting puts */
// Wait condition
private final Condition notFull;

/**
 * Shared state for currently active iterators, or null if there
 * are known not to be any.  Allows queue operations to update
 * iterator state.
 */
// Iterator?
transient Itrs itrs;

Some core attributes are introduced, in which the elements in the queue are saved by the array, and the two subscripts point to the head and tail of the queue respectively

Construction method

// Specified capacity
public ArrayBlockingQueue(int capacity){
        this(capacity,false);
        }

// Specify the capacity and fairness policy. The default fairness policy is unfair
public ArrayBlockingQueue(int capacity,boolean fair){
        if(capacity<=0)
        throw new IllegalArgumentException();
        this.items=new Object[capacity];
        lock=new ReentrantLock(fair);
        notEmpty=lock.newCondition();
        notFull=lock.newCondition();
        }

// Initializes the blocking queue with a given collection. In addition to initializing the attributes, all elements in the collection are put into the queue
public ArrayBlockingQueue(int capacity,boolean fair,
        Collection<?extends E> c){
        this(capacity,fair);

final ReentrantLock lock=this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try{
final Object[]items=this.items;
        int i=0;
        try{
        for(E e:c)
        items[i++]=Objects.requireNonNull(e);
        }catch(ArrayIndexOutOfBoundsException ex){
        throw new IllegalArgumentException();
        }
        count=i;
        putIndex=(i==capacity)?0:i;
        }finally{
        lock.unlock();
        }
        }

You can specify the capacity of the blocking queue and the fairness policy

In addition, it supports putting all elements in a given collection into a queue

Queue operation

  • add
  • put
  • offer
  • offer(time,unit)

These four methods correspond to the four strategies of blocking queue processing when the queue is full but still needs to join the queue

add throws an exception
public boolean add(E e){
        return super.add(e);
        }

Call the add method of the parent class AbstractQueue and throw an exception if the queue is full

offer returns a special value

Returns true if successful and false if failed.

public boolean offer(E e){
        Objects.requireNonNull(e);
final ReentrantLock lock=this.lock;
        lock.lock();
        try{
        if(count==items.length)
        return false;
        else{
        enqueue(e);
        return true;
        }
        }finally{
        lock.unlock();
        }
        }

Lock first, and then judge whether the current queue is full. If it is full, return false. Otherwise, enqueue is called to enter the queue operation

private void enqueue(E e){
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[]items=this.items;
        // Put it at the end of the team
        items[putIndex]=e;
        // If the array length is exceeded, it returns 0
        if(++putIndex==items.length)putIndex=0;
        // Element + 1
        count++;
        // The queue is not empty. Wake up the waiting person
        notEmpty.signal();
        }

This is a core queue method, which is actually called by many methods to add elements

put timeout
public void put(E e)throws InterruptedException{
        Objects.requireNonNull(e);
final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();
        try{
        while(count==items.length)
        notFull.await();
        enqueue(e);
        }finally{
        lock.unlock();
        }
        }

If the queue is full, wait directly on the notFull condition. After being awakened, join the queue

offer(e,time,unit) supports timeout waiting operation
public boolean offer(E e,long timeout,TimeUnit unit)
        throws InterruptedException{

        Objects.requireNonNull(e);
        long nanos=unit.toNanos(timeout);
final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();
        try{
        // If the queue is full
        while(count==items.length){
        // And timeout, return 0
        if(nanos<=0L)
        return false;
        nanos=notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
        }finally{
        lock.unlock();
        }
        }

If the queue is full, it spins. If it times out, it returns false. Wait on notFull condition without timeout. Queue up after being awakened

Out of line operation

poll returns a special value
    public E poll(){
final ReentrantLock lock=this.lock;
        lock.lock();
        try{
        return(count==0)?null:dequeue();
        }finally{
        lock.unlock();
        }
        }

If the queue is empty, null is returned. This is why the blocking queue does not support null elements, because the null value is used to represent that the queue is empty. If it is not empty, the queue is dequeued

    private E dequeue(){
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[]items=this.items;
@SuppressWarnings("unchecked")
// Get first element from team leader
        E e=(E)items[takeIndex];
                // The head of the team becomes empty
                items[takeIndex]=null;
                // Head of line pointer movement
                if(++takeIndex==items.length)takeIndex=0;
                // Number of elements - 1
                count--;
                // Tell iterator
                if(itrs!=null)
                itrs.elementDequeued();
                // Notify the waiting producer that there is a free position in the queue
                notFull.signal();
                return e;
                }

This is the core out of queue operation, which completes the changes of multiple related attributes according to the steps in the notes. This method is called at the core of out of queue operation

take blocked
public E take()throws InterruptedException{
final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();
        try{
        while(count==0)
        notEmpty.await();
        return dequeue();
        }finally{
        lock.unlock();
        }
        }

If the queue is empty, wait on the notEmpty condition and perform the dequeue operation after being awakened.

poll(time,unit) timeout version blocking
public E poll(long timeout,TimeUnit unit)throws InterruptedException{
        long nanos=unit.toNanos(timeout);
final ReentrantLock lock=this.lock;
        lock.lockInterruptibly();
        try{
        while(count==0){
        if(nanos<=0L)
        return null;
        nanos=notEmpty.awaitNanos(nanos);
        }
        return dequeue();
        }finally{
        lock.unlock();
        }
        }

If the queue is empty and times out, null is returned. If there is no timeout, wait for the specified number of milliseconds

View series methods

  • size simply returns the count value
  • peek returns the element of the team leader, but does not pop up. It can be used to view the element of the current team leader
  • remainingCapacity returns the remaining capacity

summary

ArrayBlockingQueue is a relatively simple implementation of blocking queue

The array holds the elements, and the two pointers at the beginning and end of the team are responsible for controlling the position of entering and leaving the team

Thread security is guaranteed by ReentrantLock, and all internal read and change to array need to be locked

The blocking function is realized by the conditions of the lock band. The two conditions are respectively responsible for the blocking of producers and consumers when the queue is not empty and the queue is not full, and the wake-up function after the conditions are met

LinkedBlockingQueue

Official annotation translation

A linked list can be a bounded blocking queue. The elements are sorted as FIFO

The queue head element is the element with the longest time in the queue, and the queue tail element is the element with the shortest time in the queue

The new element is inserted at the end of the queue, and the get operation gets the element from the queue head

The queue implemented by linked list usually has higher throughput than that implemented by array, but in the case of high concurrency, the performance is more unpredictable

The normal capacity boundary provides an operation to expand the queue. By default, the capacity is Integer.MAX_VALUE. The nodes of the linked list are created dynamically at each insertion operation, unless the insertion will exceed the queue capacity

This class and its Iterator implement all the optional methods of the Collection and Iterator interfaces. This class is also part of the Java Collection framework

Source code

definition

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

The basic AbstractQueue interface is implemented, so it has all the common queue methods. At the same time, the BlockingQueue interface is implemented, and it also has all the characteristics of blocking queue

Linked list node definition

    static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) {
        item = x;
    }
}

A relatively simple definition holds the pointer of the current element and the next node

There are three situations for this pointer:

  • Node is a real node
  • The current node means that the next node of the current node is head.next
  • null means there is no successor node. The current node is the last node

attribute

// Initial set capacity
private final int capacity;

// Current quantity
private final AtomicInteger count=new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
// Header node. The item of the header node must be null
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
// The next pointer of the tail node must be null
private transient Node<E> last;

/** Lock held by take, poll, etc */
// Lock when getting element
private final ReentrantLock takeLock=new ReentrantLock();

/** Wait queue for waiting takes */
// Wait condition that is not empty
private final Condition notEmpty=takeLock.newCondition();

/** Lock held by put, offer, etc */
// Lock when writing element
private final ReentrantLock putLock=new ReentrantLock();

/** Wait queue for waiting puts */
// Queue has idle wait condition
private final Condition notFull=putLock.newCondition();
  • Firstly, the maximum capacity and current capacity are saved to realize the bounded queue.
  • Secondly, the head node and tail node are saved to save the actual elements in the linked list
  • Finally, two locks are held to lock the head and tail of the team respectively to ensure thread safety
  • Each lock has a corresponding waiting condition, which is used to sleep / wake up threads and realize thread blocking

Construction method

public LinkedBlockingQueue(){
        this(Integer.MAX_VALUE);
        }

public LinkedBlockingQueue(int capacity){
        if(capacity<=0)throw new IllegalArgumentException();
        this.capacity=capacity;
        last=head=new Node<E>(null);
        }

public LinkedBlockingQueue(Collection<?extends E> c){
        this(Integer.MAX_VALUE);
final ReentrantLock putLock=this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try{
        int n=0;
        for(E e:c){
        if(e==null)
        throw new NullPointerException();
        if(n==capacity)
        throw new IllegalStateException("Queue full");
        enqueue(new Node<E>(e));
        ++n;
        }
        count.set(n);
        }finally{
        putLock.unlock();
        }
        }

Three construction methods are provided, which can specify the capacity and then initialize the header node

It also supports the initialization of a given set into a queue

Queue operation

add throws an exception

Call the AbstractQueue method of. If the queue is full, throw an exception directly

offer(e) returns true/false
    public boolean offer(E e){
        if(e==null)throw new NullPointerException();
// Current quantity
final AtomicInteger count=this.count;
        // If the queue is full, false is returned
        if(count.get()==capacity)
        return false;
final int c;
// Initialize current node
final Node<E> node=new Node<E>(e);
// Write lock
final ReentrantLock putLock=this.putLock;
        putLock.lock();
        try{
        // Check the capacity again after locking
        if(count.get()==capacity)
        return false;
        // Queue operation
        enqueue(node);
        // Quantity + 1
        c=count.getAndIncrement();
        // If there is still free capacity, wake up other producers waiting
        if(c+1<capacity)
        notFull.signal();
        }finally{
        putLock.unlock();
        }
        // If the capacity is 0 before the current write, that is, the current node is the first element put into the empty queue to wake up the waiting consumers
        if(c==0)
        signalNotEmpty();
        return true;
        }

Check the queue capacity twice. If the limit is exceeded, false is returned. Otherwise, call the queue operation. In addition, if the current element is the first element put into the empty queue, wake up other consumers and tell them that the queue is not empty

    private void enqueue(Node<E> node){
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        // Links the current node to the next node of the last node
        last=last.next=node;
        }

This is the core element queue operation, which is relatively simple, just link the current element at the end of the current linked list

put blocking
public void put(E e)throws InterruptedException{
        if(e==null)throw new NullPointerException();
final int c;
// Create current node
final Node<E> node=new Node<E>(e);
final ReentrantLock putLock=this.putLock;
final AtomicInteger count=this.count;
        // Write lock
        putLock.lockInterruptibly();
        try{
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        // If the queue is full, spin and wait
        while(count.get()==capacity){
        notFull.await();
        }
        // After being awakened, it indicates that the queue is not satisfied and the queue joining operation is executed
        enqueue(node);
        c=count.getAndIncrement();
        // If the queue is not satisfied, assist in waking up other producers
        if(c+1<capacity)
        notFull.signal();
        }finally{
        putLock.unlock();
        }
        // If the current element is the first element, wake up other consumers
        if(c==0)
        signalNotEmpty();
        }

The overall situation is similar to that of an offer, except that when the queue is found to be full, false is not returned. Instead, wait on the notFull condition and wait for other threads to wake up. After waking up, you can continue to join the team.

offer(e,time,unit)
public boolean offer(E e,long timeout,TimeUnit unit)
        throws InterruptedException{

        if(e==null)throw new NullPointerException();
        long nanos=unit.toNanos(timeout);
final int c;
final ReentrantLock putLock=this.putLock;
final AtomicInteger count=this.count;
        putLock.lockInterruptibly();
        try{
        while(count.get()==capacity){
        if(nanos<=0L)
        return false;
        nanos=notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c=count.getAndIncrement();
        if(c+1<capacity)
        notFull.signal();
        }finally{
        putLock.unlock();
        }
        if(c==0)
        signalNotEmpty();
        return true;
        }

It is similar to put, except that when the return queue is full, if it has timed out, it will return false. If it has not timed out, it will let the current thread sleep for a given number of milliseconds to judge whether it can join the queue again

Out of line operation

public E poll(){
// Current quantity
final AtomicInteger count=this.count;
        // If it is currently empty, null is returned
        if(count.get()==0)
        return null;
final E x;
final int c;
final ReentrantLock takeLock=this.takeLock;
        // Read lock and lock
        takeLock.lock();
        try{
        // Check again if the queue is empty
        if(count.get()==0)
        return null;
        // Perform queue out operation
        x=dequeue();
        c=count.getAndDecrement();
        // If the current element is not the only element in the queue, assist in waking the consumer
        if(c>1)
        notEmpty.signal();
        }finally{
        takeLock.unlock();
        }
        // If the queue is full before leaving the queue, after leaving the queue, the queue is dissatisfied and other producers are awakened
        if(c==capacity)
        signalNotFull();
        return x;
        }

If the queue is empty, a null out - of - queue method is returned

First, check the capacity of the array. If it is empty, null will be returned. Then lock the read lock and check the capacity again

If the capacity is not empty, perform the core queue out operation. After leaving the queue, if there are still elements in the queue, it will help wake up consumers

If the queue is full before leaving the queue, the current element is the first element out of the queue and wakes up other producers

private E dequeue(){
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null
        // Head node of linked list
        Node<E> h=head;
        // The first node of the linked list
        Node<E> first=h.next;

        h.next=h; // help GC
        // Change the header node to first
        head=first;
        E x=first.item;
        first.item=null;
        // Return node
        return x;
        }

The core method of the linked list is a little more complex than joining the team

Point the head node (empty placeholder node) to the first real node and return the elements of the real node

take() blocked
public E take()throws InterruptedException{
final E x;
final int c;
final AtomicInteger count=this.count;
final ReentrantLock takeLock=this.takeLock;
        takeLock.lockInterruptibly();
        try{
        while(count.get()==0){
        notEmpty.await();
        }
        x=dequeue();
        c=count.getAndDecrement();
        if(c>1)
        notEmpty.signal();
        }finally{
        takeLock.unlock();
        }
        if(c==capacity)
        signalNotFull();
        return x;
        }

Lock first, and then judge whether the queue is empty. If it is empty, block on the notEmpty condition and wait for wake-up

After being woken up, it indicates that the current queue is not empty and the out of queue operation is executed

If the queue is not empty after leaving the queue, help wake up the consumer. If the queue is full before leaving the queue, wake up the producer

poll(time,unit)
public E poll(long timeout,TimeUnit unit)throws InterruptedException{
final E x;
final int c;
        long nanos=unit.toNanos(timeout);
final AtomicInteger count=this.count;
final ReentrantLock takeLock=this.takeLock;
        takeLock.lockInterruptibly();
        try{
        while(count.get()==0){
        if(nanos<=0L)
        return null;
        nanos=notEmpty.awaitNanos(nanos);
        }
        x=dequeue();
        c=count.getAndDecrement();
        if(c>1)
        notEmpty.signal();
        }finally{
        takeLock.unlock();
        }
        if(c==capacity)
        signalNotFull();
        return x;
        }

It is very similar to the take method, except that when the queue is found to be empty, you need to judge whether it times out. If it times out, null is returned. If there is no timeout, let the current thread block for a given time instead of unlimited blocking

View method

  • size returns the current quantity
  • remainingCapacity remaining capacity
  • peek gets the current team header element, but does not pop up. It is mainly used to view the content of the team header element

summary

LinkedBlockingQueue is another simple implementation of blocking queue. Linked list is used internally to store elements

  • Firstly, the maximum capacity and current capacity are saved to realize the bounded queue.
  • Secondly, the head node and tail node are saved, which is used to save the actual elements in the linked list, and it is convenient for entering and leaving the team
  • Hold two locks, lock the head and tail of the team respectively, to ensure thread safety
  • Each lock has a corresponding waiting Condition, which is used to sleep / wake up, incoming and outgoing threads, that is, producer and consumer threads, to realize thread blocking and wake-up after conditions are met

Because the team head and tail nodes are saved, the performance of queue in and out operations is relatively good. Two locks are used to control the synchronization control of queue in and out respectively, which can minimize lock competition and improve performance.

Compared with ArrayBlockingQueue implemented by array, the throughput will be higher, but in the case of high concurrency, there will be some unpredictable performance loss

End.

Contact me

Finally, welcome to my personal official account, Yan Yan ten, which will update many learning notes from the backend engineers. I also welcome direct official account or personal mail or email to contact me.

The above are all personal thoughts. If there are any mistakes, please correct them in the comment area.

Welcome to reprint, please sign and keep the original link.

Contact email: huyanshi2580@gmail.com

For more study notes, see personal blog or WeChat official account, Yan Yan ten > > Huyan ten

Added by mattcairns on Wed, 10 Nov 2021 18:15:13 +0200