Constructor for LinkedBlockingQueue
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // head and last point to the sentinel node last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) ...
1.2 ensure thread safety
The underlying layer of LinkedBlockingQueue uses ReetrantLock to ensure thread safety. In fact, it is a "consumption production" model. Through this article, we can also learn the actual use scenarios of ReetrantLock.
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
- Source code analysis
Before talking about LinkedBlockingQueue, let's take a look at the interface BlockingQueue that needs to be implemented
2.1 BlockingQueue
The class that implements BlockingQueue must additionally support the operation of waiting for the queue until it is non empty when looking for elements; When storing elements, wait until the space in the queue is available.
There are four forms of its methods, which deal with the operations that can not be satisfied immediately, but may be satisfied in the future.
Throw an exception directly
Returns a special value (null or false)
Wait until the operation succeeds
Timeout setting, give up when the time is exceeded
Summary of BlockingQueue methods
We know different methods and different processing methods that cannot be met immediately, so we can better understand the source code of LinkedBlockingQueue.
Let's start from
offer(e)
offer(e, time, unit)
put(e)
poll()
Go analyze the LinkedBlockingQueue
2.2 offer(e)
Return true if the addition is successful; If the insertion value is null, an error is reported; Or if the queue is full, return false directly and will not wait for the queue to be idle
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//When the queue is full, it returns directly
if (count.get() == capacity)
return false;
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
The code is relatively simple, so I won't go into detail.
2.3 offer(e, time, unit)
If the queue is full and has not exceeded the set time, it will wait. When waiting, it will respond to the interrupt; If the set time is exceeded, the operation will be the same as offer (E)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { // The queue is full. false will be returned after a specific time if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
2.4 put(e)
Wait until it succeeds or is interrupted
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//Prevent false Awakening
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
I didn't expect that there was a false wake-up scene above (if you know, please tell me, thank you).
it is recommended that applications programmers always assume that they can occur and so always wait in a loop. --Condition
Anyway, it's right to use Condition to wait in the loop
2.5 poll()
When AWT is null, the queue will not be returned directly; Non blocking method
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
The other methods are actually similar. Look directly at the four methods of BlockingQueue above.
Finally, let's talk about the method remove(Object o), which will mention a knowledge point.
2.6 remove
Delete o, if successful, return true, otherwise
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
//Algorithm problem, delete a node of a linked list, you can look at the source code and record two nodes, one in the front and one in the back.
for (Node trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
The above code is a simple operation on the linked list. We mainly look at the codes of fullyLock() and fullyUnlock()
/** * Locks to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlocks to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
We all know that the unlocking order should be opposite to the lock acquisition order, so why
In fact, I don't think that if the unlocking sequence of fullyUnlock above is the same as that of obtaining locks, there will be no problems and deadlocks (it's another matter if there are other operations between releasing locks and obtaining locks). So it's just for the beauty of the code?
Suppose we have the following code (there are other operations between unlocking and acquiring the lock)
A.lock();
B.lock();
Foo();
A.unlock();
Bar();
B.unlock();
USB Microphone https://www.soft-voice.com/
Wooden Speakers https://www.zeshuiplatform.com/
Amazon evaluation www.yisuping.com cn
Shenzhen website construction www.sz886.com com