LinkedBlockingQueue of concurrent queues

In the previous article, we looked at the queue concurrent linked queue, which is an unbounded non blocking list. This time, we looked at the LinkedBlockingQueue. The name of this queue shows that it is a blocking queue (that is, a one-way list). It is simple based on exclusive lock implementation;

 

1, LinkedBlockingQueue basic structure

There is also a Node class inside. As shown in the figure below, item stores the actual data, next points to the next Node, and a parameter constructor. There is nothing to say;

 

We can take a look at some attributes of this queue. In fact, we can guess that it is the producer consumer model:

//Actual capacity of queue
private final int capacity;
//This atomic variable records the number of nodes
private final AtomicInteger count = new AtomicInteger();
//Header node
transient Node<E> head;
//Tail node
private transient Node<E> last;
//This lock is used to control multiple threads to get elements from the queue header
private final ReentrantLock takeLock = new ReentrantLock();
//When the queue is empty, the thread executing the out of queue operation will be put into this condition variable
private final Condition notEmpty = takeLock.newCondition();
//Used to control multiple threads to add elements to the end of the queue
private final ReentrantLock putLock = new ReentrantLock();
//If the queue is full, the thread executing the queuing operation will be dropped here
private final Condition notFull = putLock.newCondition();

 

As can be seen from the constructor, the default maximum number is 65536. Although the size can also be specified, we can say that this is a bounded blocking queue to some extent;

//The maximum number of default queues is 65536
public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
//You can also specify the queue size. By default, both the head node and the tail node point to the sentinel node
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
//You can also pass an implementation Collection Interface classes, such as List,And then traverse to encapsulate the elements into nodes and drop them into the queue
//Note here to acquire and release locks
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

 

We briefly introduced the basic structure of this queue, now we can take a look at some important methods

 

2, offer method

This method adds an element to the last queue, and returns true after inserting successfully. If the queue is full, discard the current element and return false;

public boolean offer(E e) {
    //If so null,Just throw it wrong
    if (e == null) throw new NullPointerException();
    //count Represents the actual number in the queue
    final AtomicInteger count = this.count;
    //If the actual quantity is the same as the maximum capacity of the queue, then it can't be added any more. Return false
    if (count.get() == capacity)
        return false;
    int c = -1;
    //Encapsulate elements as Node node
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //Get lock
    putLock.lock();
    try {
        //If the queue is not full, throw in the new node and increase the counter. Why do you want to judge here? Isn't that a judgment?
        //Because of concurrency, if the current thread has not acquired the lock after judging the capacity from the above, a other thread program will acquire the lock first and then execute it offer Method and release the lock, then
        //It needs to be judged again
        if (count.get() < capacity) {
            enqueue(node);
        //Note the difference between getAndIncrement and incrementAndGet methods. The former is to return the value before auto increment, and the latter is to return the value after auto increment c
= count.getAndIncrement(); //Here, if the queue is not full, wake up before notFully Threads in the condition queue, as mentioned before notfull What thread is stored in if (c + 1 < capacity) notFull.signal(); } } finally { //Release lock putLock.unlock(); } //If c==0,Indicates that there is a node in the queue before the node is added. Wake up the condition variable notEmpty The threads in the, these threads will get the data from the queue if (c == 0) signalNotEmpty(); return c >= 0; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }

 

3, put method

This method inserts an element at the end of the queue. If the queue is idle, it will directly return after inserting. Otherwise, it will block the current thread until the queue is idle and then insert. In addition, if the current thread is blocked and other threads call the interrupt method, it will throw an exception;

public void put(E e) throws InterruptedException {
    //If the inserted element is null,Direct throw anomaly
    if (e == null) throw new NullPointerException();
    int c = -1;
    //Encapsulate as a node
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    //Number of nodes in atomic variable record queue
    final AtomicInteger count = this.count;
    //This method is followed by Interruptibly,It indicates that the current thread can be interrupted after obtaining the lock
    putLock.lockInterruptibly();
    try {
        //When the number of nodes reaches the maximum capacity, put the current thread into the condition variable notFull In the queue
        while (count.get() == capacity) {
            notFull.await();
        }
        //If the number of nodes does not reach the maximum, add nodes at the end of the list
        enqueue(node);
        //Counter plus one, note if count Four, then c It's still equal to 4. This method is atomic increment. It returns the original value. Note and incrementAndGet Differences in methods
        c = count.getAndIncrement();
        //Here if c+1<capacity If the queue is not full, wake up notFull Threads in can add elements to the queue
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //Release lock
        putLock.unlock();
    }
    //If c==0 Indicates that there is a node before the node is added to the queue. Wake up notEmpty Thread in to get data from the queue
    if (c == 0)
        signalNotEmpty();
}

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

 

4, poll method

This method removes an element from the header. If the queue is empty, null will be returned. This method does not block;

public E poll() {
    //Record the number of nodes in the queue
    final AtomicInteger count = this.count;
    //Return if the queue is empty null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    //Get lock
    takeLock.lock();
    try {
        //This is to prevent other threads from calling before acquiring the lock poll Method takes node
        //If the queue is not empty, the counter is decremented by one
        if (count.get() > 0) {
            //Delete the first node with data (since the first node is the sentinel node, it is equivalent to delete the second node). The method is implemented as follows
            x = dequeue();
            c = count.getAndDecrement();
            //If c>1 Description after removing the head node, the queue will wake up if it is not empty notEmpty The thread in the condition queue in the
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        //Release lock
        takeLock.unlock();
    }
    //Here's the judgment. Notice that in the atomic class AtomicInteger For example, if the initial value is 5, call decrementAndGet Method 4,
    //And call getAndDecrement Method returns 5. Here we are c The latter is called, so it indicates the number of queues before the node is deleted
    //So the meaning here is: if the number before deleting the queue is equal to the maximum capacity of the queue, there must be a free position in the queue after deleting, so it wakes up notFull Threads in the condition queue
    //Add a new node to the queue
    if (c == capacity)
        signalNotFull();
    return x;
}
//This method is very simple to mention a little. It is to reference the first sentinel node to itself, and better gc collect
//take head Point to the second node, take the value of the node, and then item Set as null,This node becomes a sentinel node
private E dequeue() {
    //Delete head node
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

 

 

V. peek method

This method gets the queue header element but does not remove the node;

public E peek() {
    //Queue is empty, return null
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //If the next node of the header node is empty, return null,Because the head node is the sentinel node
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
        //The next node of the header node is not empty, so the next node must have data. Just take it here
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

 

 

6, take method

The current method is basically the same as the peek method, except that this method is blocked: delete a node from the queue head, if the queue is empty, block the current thread until the queue is not empty, and then perform the operation. If other threads modify the interrupt flag when blocking, the thread will throw an error;

//This method is actually the same as poll The method is basically the same. There's nothing to say. Pay attention to throwing exceptions
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //Notice how the lock is acquired here
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

 

 

7, remove method

Delete a specified element in the queue. If the deletion succeeds, true will be returned. If the deletion fails, false will be returned;

public boolean remove(Object o) {
    //Incoming is null,Direct return false
    if (o == null) return false;
    //Obtain two locks. The method is as follows. When deleting a node, you cannot enter or leave the queue. Those threads will be blocked and lost AQS In the queue
    fullyLock();
    try {
        //Traverse to find the corresponding node, delete it, no return false
        for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
            if (o.equals(p.item)) {
                //The specific method to delete a node is as follows
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        //Release two locks. Note that the order of releasing locks is opposite to that of acquiring locks
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

//In fact, it is easy to delete. Just like the deletion of common linked list, it is to point the node in front of the node to be deleted to the node behind
void unlink(Node<E> p, Node<E> trail) {
    p.item = null;
    trail.next = p.next;
    if (last == p)
        last = trail;
    //Still pay attention to getAndDecrement Method returns the value before minus one. If the queue is not full after minus one, it wakes up notFull The thread in the condition queue in adds a node to the queue
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

 

  

Eight. Conclusion

We simply looked at some of the more important methods in LinkedBlockingQueue, which is much easier than the last article's ConcurrentLinkedQueue!

Among them, ConcurrentLinkedQueue is an unbounded non blocking queue, the bottom layer is a one-way linked queue, and the incoming and outgoing queues are realized by CAS; LinkedBlockingQueue is a bounded blocking queue, the bottom layer is a one-way linked queue, and the incoming and outgoing queues are respectively handled by exclusive locks, so the incoming and outgoing queues can be carried out at the same time, and two exclusive locks are configured Conditional queue is used to store the blocked line layer. Note that there are several queues involved here. One is the AQS queue with exclusive lock, the other is the conditional queue, and the other is the queue with data. Don't confuse!

Use the following chart to enhance memory:

Keywords: Java

Added by worldofcarp on Sat, 08 Feb 2020 18:22:51 +0200