Blocking queue LinkedBlockingQueue
introduce
The previous article introduces the non blocking queue concurrent linkedqueue implemented by CAS algorithm, and this article introduces the blocking queue LinkedBlockingQueue implemented by exclusive lock.
The class diagram shows that the LinkedBlockingQueue is also implemented using a one-way linked list, including head Node and last Node, which are used to store head and tail nodes; There is also an atomic variable count with an initial value of 0, which is used to record the number of queue elements; In addition, it also contains two ReentrantLock instances, which are used to control the atomicity of elements entering and leaving the queue respectively. takeLock is used to control that only one thread can obtain elements from the queue head at the same time, and putLock is to control that only one thread can add elements from the tail of the queue at the same time.
notEmpty and notFull are conditional variables. They both have a conditional queue to store threads that are blocked when entering and leaving the queue.
The constructor of LinkedBlockingQueue is as follows:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
We can see that by default, the queue capacity of LinkedBlockingQueue is the maximum value of int. of course, we can also specify the capacity directly, so it can explain that LinkedBlockingQueue is a bounded blocking queue to a certain extent.
offer operation
Insert an element into the end of the queue. If the queue is free, the insertion succeeds and returns true. If the queue is full, the current element is discarded and returns false. Throw an exception if the inserted element is null. And the method is non blocking.
public boolean offer(E e) { //(1) An exception is thrown if the element is empty if (e == null) throw new NullPointerException(); //(2) If the queue is full, the element is discarded final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; //(3) Build a new node and obtain the putLock exclusive lock Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //(4) If the queue is not satisfied, enter the queue and increase the count if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } } finally { //(6) Release lock putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); //(8) return c >= 0; }
Code (1) first determines whether the queued element is empty, and throws an exception if it is empty.
Code (2) determines whether the queue is full. If it is full, it will be discarded and false will be returned.
Code (3) build a new Node node and obtain the putLock lock lock. After obtaining the lock, other threads calling the put or offer method will be blocked and put into the AQS blocking queue of putLock.
Code (4) determines whether the queue is full. Why do you want to judge it again? Since other threads may add new elements through put or offer between the execution of code (2) and the acquisition of lock, judge again whether the queue is full. If not, the new element will join the queue and increase the counter.
Code (5) judges that if there is still space left after the new element is queued, wake up a thread blocking in the notFull condition queue (wake up the thread calling the await operation of notFull, such as when the put method is executed and the queue is full).
Code (6) releases the obtained putLock lock lock. Note here that the release of the lock must be done in finally, because finally will be executed even if the try block throws an exception. In addition, after releasing the lock, another thread blocked by calling the put operation will acquire the lock.
Code (7) c == 0 indicates that there is at least one element in the queue when executing code (6) to release the lock. If there are elements in the queue, the signalNotEmpty method is executed.
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
The function of this method is to activate a thread in the condition queue of notEmpty that is blocked by calling the await method of notEmpty (for example, when calling the take method and the queue is empty), which also indicates that the corresponding lock should be obtained before calling the method of condition variable.
Summary: the offer method ensures the atomicity of the addition through the putLock lock lock. In addition, it should be noted that when calling the condition variable, the corresponding lock should be obtained first, and the queue only operates the tail node of the linked list.
put operation
This method inserts an element into the end of the queue. If the queue is idle, it will return directly after the insertion is successful. If the queue is full, it will block the current thread until the queue is idle. If the interrupt flag is set by another thread when blocking, the blocked thread will throw an exception and return. Throw an exception if the passed in element is null.
public void put(E e) throws InterruptedException { //(1) An exception is thrown when the inserted element is empty if (e == null) throw new NullPointerException(); int c = -1; //(2) Build a new node and obtain the exclusive lock putLock Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //(3) Wait if queue is full while (count.get() == capacity) { notFull.await(); } //(4) Insert queue and increment count enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } finally { //(6) putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); }
We can see that putlock is used in code (2) Lockinterruptibly() obtains exclusive lock, which can be interrupted compared with the method of obtaining exclusive lock in offer method. Specifically, if the interrupt flag is set by other threads during the process of obtaining the lock, the current thread will throw an InterruptedException exception, so the put operation can be interrupted during the process of obtaining the lock.
Code (3) determines that the current queue is full, then call notFull The await () method puts the current thread into the condition queue of notFull, and then the current queue will release the putLock lock lock. Because the putLock lock is released, other threads will have the opportunity to obtain the lock.
Using while instead of if is to prevent false wake-up problems
poll operation
Get and remove an element from the queue header, and return null if the queue is empty. This method is a non blocking method.
public E poll() { final AtomicInteger count = this.count; //(1) If the queue is empty, null is returned if (count.get() == 0) return null; //(2) Get exclusive lock E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //(3) If the queue is not empty, exit the queue and decrement the count if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); //(4) if (c > 1) notEmpty.signal(); } } finally { //(5) takeLock.unlock(); } //(6) if (c == capacity) signalNotFull(); //(7) return x; }
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
First, the code (1) is very simple. First, judge whether the current queue is empty. If it is empty, it will directly return null.
Code (2) acquires the exclusive lock takeLock. After the current thread acquires the lock, other threads will be blocked when calling poll or take methods.
Code (3) judges that if the current queue is not empty, the queue out operation is carried out, and then the counter is decremented.
Although judging whether the queue is empty and obtaining queue elements in code (3) are not atomic, the count value will be decremented only in the place of poll, take or remove operations. However, these three methods all need to obtain the takeLock lock lock to operate, and the current thread has obtained the takeLock lock lock, So other threads have no chance to decrement the count value in the current situation, so it doesn't seem atomic, but they are thread safe.
Code (4) judges that if c > 1, it means that the queue is not empty after the current thread removes an element in the queue (c is the number of queue elements before deleting the element), then a thread blocked in the condition queue of notEmpty due to calling the take method can be activated at this time.
Code (6) indicates that the current queue is full before the current thread removes the queue header element. After removing the queue header element, the current queue has at least one free position. At this time, you can call signalNotFull to activate a thread blocked in the condition queue of notFull due to the call of put method. The code of signalNotFull is as follows.
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
peek operation
Gets the queue header element but does not remove it from the queue. If the queue is empty, null is returned. This method is non blocking.
public E peek() { //(1) if (count.get() == 0) return null; //(2) final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; //(3) if (first == null) return null; else //(4) return first.item; } finally { takeLock.unlock(); } }
peek method is not very complicated. It should be noted that first == null should be judged here. First cannot be returned directly Item, because code (1) and code (2) are not atomic, it is likely that after code (1) determines that the queue is not empty, another thread performs a poll or take operation before obtaining the lock, resulting in the queue being empty, and then directly returns fist Item will be null pointer exception.
take operation
This method obtains the header element of the current queue and removes it from the queue. If the queue is empty, block the current thread until the queue is not empty, and then obtain and return the element. If the interrupt flag is set by other threads during blocking, the blocking thread will throw an exception.
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //(1) Acquire lock takeLock.lockInterruptibly(); try { //(2) If the queue is empty, blocking is suspended while (count.get() == 0) { notEmpty.await(); } //(3) Out of line and count down x = dequeue(); c = count.getAndDecrement(); //(4) if (c > 1) notEmpty.signal(); } finally { //(5) takeLock.unlock(); } //(6) if (c == capacity) signalNotFull(); return x; }
In code (1), the current thread obtains the exclusive lock, and other threads calling take or poll operations will be blocked and suspended.
Code (2) judges that if the queue is empty, the current thread will be blocked and suspended, and the current thread will be put into the condition queue of notEmpty.
Code (3) performs an out of line operation and decrements the count.
Code (4) judges that if C > 1, it indicates that the current queue is not empty, and then wakes up a thread in the conditional queue of notEmpty that is blocked due to calling the take operation.
Code (5) releases the lock.
Code (6) judges that if c == capacity, it means that there is at least one free position in the current queue, then activate a thread in the notFull condition queue that is blocked due to the call of put operation.
remove operation
Delete the specified elements in the queue, if any, delete and return true, if not, return false.
public boolean remove(Object o) { if (o == null) return false; //(1) Get putLock and takeLock fullyLock(); try { //(2) Traverse to find the element to delete for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //(3) if (o.equals(p.item)) { unlink(p, trail); return true; } } //(4) return false; } finally { //(5) fullyUnlock(); } }
void fullyLock() { putLock.lock(); takeLock.lock(); }
First, the code (1) obtains the double lock, and then the queue in and queue out operations of other threads will be suspended.
Code (2) traverses the queue to find the element to be deleted. If it cannot be found, it returns false. Find the method to execute unlink. Let's take a look at this method.
void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; //If the current queue is full, you should also wake up the waiting thread after deleting it if (count.getAndDecrement() == capacity) notFull.signal(); }
trail is the precursor node for deleting the element. After deleting the element, if it is found that there is free space in the current queue, it will wake up a thread in the conditional queue of notFull that is blocked by calling the put method.
Code (5) calls fullyUnlock to release the double lock.
void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
Summary: because the remove method adds two locks before deleting the specified element, it is thread safe in the process of traversing the queue to find the specified element, and at this time, all other threads calling queue in and queue out operations will be blocked. Note that the order in which multiple resource locks are acquired is the opposite of the order in which they are released.
size operation
Gets the number of current queue elements.
public int size() { return count.get(); }
Since the count of outgoing and incoming operations are locked, the result is accurate compared with the size method of ConcurrentLinkedQueue.
summary
The internal of LinkedBlockingQueue is realized through one-way linked list. The head and tail nodes are used for queue in and out operations, that is, the queue in operation is to operate on the tail node, and the queue out operation is to operate on the head node.
Separate exclusive locks are used for the operations of the head and tail nodes to ensure atomicity, so the out of queue and in queue operations can be carried out at the same time. In addition, the exclusive locks of the head and tail nodes are equipped with a conditional queue to store blocked threads. Combined with queue in and queue out operations, a production and consumption model is realized.