Concurrency and multithreading -- source code analysis of LinkedBlockingQueue


Blocking queue is rarely used directly in daily development, but it has many applications in many tool classes or frameworks, such as thread pool, message queue and so on. Therefore, it is also necessary to deeply understand the blocking queue. So let's learn about the related source code of LinkedBlockingQueue. From the naming, we can see that it is a data structure implemented by a linked list.

Class definition

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {

    }

We can see from the above that we inherit Queue and implement BlockingQueue. Let's introduce the functions of the methods in the two classes. In order to facilitate memory and comparison, they are displayed in the table.

Queue
effect Method 1 Method 2 difference
newly added add() offer() An exception is thrown when the add() queue is full, and false is returned when the offer() queue is full
View and delete remove() poll() An exception is thrown when the remove() queue is empty, and null is returned when the poll() queue is empty
View without deleting element() peek() An exception is thrown when the element() queue is empty, and null is returned when the peek() queue is empty
BlockingQueue

BlockingQueue, as its name implies, has a blocked Queue. The methods are different. The following methods include Queue. Because they belong to inheritance relationship, the method name in the following table is replaced by sequence number.

effect Method 1 Method 2 Method 3 Method 4 difference
newly added add() offer() put() offer(E e, long timeout, TimeUnit unit) When the queue is full, 1 and 2 have the same function as queue. 3 will be blocked all the time, and 4 will be blocked for a period of time. false is returned
View and delete remove() poll() take() poll(long timeout, TimeUnit unit) The queue is empty, 1 and 2 have no change, 3 will be blocked all the time, 4 will be blocked for a period of time, and null will be returned
View without deleting element() peek() nothing nothing Queue is empty, 1 and 2 have no change

Member variable

    //The capacity of the linked list. The default is integer MAX_ VALUE
    private final int capacity;

    //Number of elements currently present
    private final AtomicInteger count = new AtomicInteger();

    //head node of linked list
    transient Node<E> head;

    //tail node of linked list
    private transient Node<E> last;

    //It is mainly used to lock take, poll and other methods
    private final ReentrantLock takeLock = new ReentrantLock();

    //It is mainly used in blocking scenarios
    private final Condition notEmpty = takeLock.newCondition();

    //It is mainly used for locking put, offer and other methods
    private final ReentrantLock putLock = new ReentrantLock();

    //It is mainly used in new blocking scenarios
    private final Condition notFull = putLock.newCondition();
    
    //Node is relatively simple. It has an item and points to the next node, that is, a one-way linked list
    static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
    }

Constructor

    //The maximum number of elements stored in the default queue is integer MAX_ VALUE
    	public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    	
    	//Customize capacity and initialize head and tail
    	public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    	
    	public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            //Lock, because it is added, it must be putLock
            putLock.lock();
            try {
                int n = 0;
                //Traverse the collection, generate one node at a time, and add it to the tail of the linked list
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    //Judge whether the newly added node exceeds capacity each time. If so, throw an exception
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    //Add node to queue tail
                    enqueue(new Node<E>(e));
                    ++n;
                }
                //Set the number of current elements count
                count.set(n);
                //finally unlock
            } finally {
                putLock.unlock();
            }
        }

offer()

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        //The initial setting is - 1, C < 0, indicating that the addition fails
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

technological process:

  1. If e is empty, an exception is thrown.
  2. Returns false if the current queue is full.
  3. Encapsulate e into a new node, lock, putLock.
  4. Judge the number of queue elements < capacity again, and then add node to the linked list tail.
  5. CAS will count+1. Note that getAndIncrement is called here and returns the value before + 1. If the queue is not full, wake up a thread blocked by addition.
  6. finally, unlock. If c == 0, lock takeLock and wake up to continue adding.
  7. Return C > = 0.

put()

Compared with offer(), the put code will judge whether the current queue is full. If it is full, it will be blocked through Condition, which is no different from others.

take()

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //Note that here c is obtained first and then - 1
        if (c == capacity)
            signalNotFull();
        return x;
    }

technological process:

  1. Take lock.
  2. If the current queue is empty, block it directly through notEmpty and wait for it to wake up.
  3. Take out the first element and delete the element.
  4. If C > 1, it indicates that there are still elements in the queue. Wake up other threads to get them.
  5. finally, unlock. If c == capacity, it means that the queue is not full. Lock takeLock and wake up to continue adding.
  6. Returns x.

enqueue and dequeue

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

Let's talk about the process of adding and deleting data in the linked list, especially dequeue(). I was a little deceived when I first saw it. Let's take a chestnut.

        BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();
        queue.offer(1);
        queue.offer(2);
        queue.offer(3);
        Integer take = queue.take();
        System.out.println(take);

Each step is drawn here, which is easy to understand. The logic of other methods is similar. Let's talk about it briefly.

peek()

The code of peek() is similar to that of take(), except that it does not delete elements. take() passes dequeue(), while peek() passes a sentence of code node first = head next; Get the data of the node and return.

Added by papacostas on Sun, 09 Jan 2022 05:06:14 +0200