How does BlockingQueue play threads like this?

Preface

BlockingQueue is blocking queues. It's a wonderful way to use ReentrantLock. Based on its basic principles, we can achieve long-connection chat on the Web. Of course, the most common way to do this is to implement producer-consumer mode, as shown in the following figure:

In Java, BlockingQueue is an interface whose implementation classes are ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue, etc. The differences are mainly reflected in the storage structure or the operation on elements, but the principles of take and put operations are similar.The following source code takes ArrayBlockingQueue as an example.

Analysis

There is a ReentrantLock inside the BlockingQueue that generates two Condition s, which can be seen in the Array BlockingQueue property declaration:

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

...

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

If you can personify notEmpty, notFull, put thread, and take thread, then I think put and take operation may be the following process:

put(e)

take()

The source code for ArrayBlockingQueue.put (E) is as follows (where the Chinese comment is a custom comment, the same below):

/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
      while (count == items.length)
          notFull.await(); // Wait if the queue is full
      insert(e);
  } finally {
      lock.unlock();
  }
}

/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
		items[putIndex] = x;
	 	putIndex = inc(putIndex);
		++count;
		notEmpty.signal(); // A new element is inserted, notifying the removal element thread while waiting
}

The source code for ArrayBlockingQueue.take() is as follows:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); // If the queue is empty, wait
        return extract();
    } finally {
        lock.unlock();
    }
}

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal(); // A new element is removed, notifying the inserting element thread waiting
    return x;
}

You can see that put(E) and take() are synchronized, and in a put operation, when the queue is full, the put operation is blocked until there is a free place in the queue.In a take operation, when the queue is empty, the take operation is blocked until there are new elements in the queue.

Using two Condition s here prevents the same put or take operation from waking up when signal() is called.

Reference Address

If you like my article, you can focus on your personal subscription number.Welcome to leave a message and exchange at any time.If you want to join the WeChat group and discuss it together, add lastpass4u, the administrator's culture assistant, who will pull you into the group.

Keywords: Programming Java

Added by spectsteve7 on Sat, 04 Apr 2020 19:56:37 +0300