Blocking Queue-A Simple Implementation of java Based on Link List

1. The Principle of Blocking Queue

The difference between a blocking queue and a normal queue is that when the blocking queue is empty, the operation of extracting elements from the queue will be blocked, and when the queue is full, the operation of adding elements to the queue will be blocked.

Threads trying to retrieve elements from an empty blocked queue will be blocked until other threads insert new elements into the empty queue. Similarly, threads that attempt to add new elements to a full blocked queue will also be blocked until other threads make the queue idle again.

2. Simple Implementation of Blocking Queue

/**
 * A blocking queue based on linked list
 * 
 * @author jade
 *
 */
public class BlockingQueue {
    private int capacity ; // Blocking queue capacity
    private List queue = new LinkedList(); // A blocking queue based on linked list

    public BlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    /**
     * Enter the queue
     * 
     * @param item
     * @throws InterruptedException
     */
    public synchronized void enqueue(Object item) throws InterruptedException {
        while (this.queue.size() == this.capacity) {
            wait();
        }
        if (this.queue.size() == 0) {
            notifyAll();
        }
        this.queue.add(item);
    }

    /**
     * Out of the queue
     * 
     * @return
     * @throws InterruptedException
     */
    public synchronized Object dequeue() throws InterruptedException {
        while (this.queue.size() == 0) {
            wait();
        }
        if (this.queue.size() == this.capacity) {
            notifyAll();
        }
        return this.queue.remove(0);
    }

}

Be careful:

1) Within enqueue and dequeue methods, the notifyAll method is called only when the queue size equals the upper limit (capacity) or the lower limit (0), but not when it is smaller.

If the size of the queue is neither equal to the upper limit nor the lower limit, any thread will not block when calling enqueue or dequeue methods, it will not need to wake up, and can normally add or remove elements to the queue.

2) Both enqueue and dequeue methods add synchronized methods, which acquire locks when entering synchronized and release locks when exiting. Without synchronized, it is impossible to confirm which lock to use directly wait/notify.

 

 

3,LinkedBlockingQueue

Several blocking queues are provided under the java.util.concurrent package. LinkedBlockingQueue is a blocking queue based on a linked list, and the default size is Integer.MAX_VALUE if the capacity size is not specified when creating LinkedBlockingQueue objects.

First, look at several member variables in the LinkedBlockingQueue class:

  private static final long serialVersionUID = -6903933977591709194L;
  private final int capacity;
  private final AtomicInteger count = new AtomicInteger();
  transient Node<E> head;
  private transient Node<E> last;
  private final ReentrantLock takeLock = new ReentrantLock();
  private final Condition notEmpty = this.takeLock.newCondition();
  private final ReentrantLock putLock = new ReentrantLock();
  private final Condition notFull = this.putLock.newCondition();

As you can see, LinkedBlockingQueue is actually a linked list for storing elements. Head and last represent the head node and the next node of the linked list, respectively, and capacity represents the capacity of the queue.

takelock,putLock are re-entrant locks, notEmpty and notFull are waiting conditions.

Let's look at the constructor of LinkedBlockingQueue, which has three overloaded versions:

  public LinkedBlockingQueue()
  {
    this(Integer.MAX_VALUE);
  }
  
  public LinkedBlockingQueue(int paramInt)
  {
    if (paramInt <= 0) {
      throw new IllegalArgumentException();
    }
    this.capacity = paramInt;
    this.last = (this.head = new Node(null));
  }
  
  public LinkedBlockingQueue(Collection<? extends E> paramCollection)
  {
    this(Integer.MAX_VALUE);
    ReentrantLock localReentrantLock = this.putLock;
    localReentrantLock.lock();
    try
    {
      int i = 0;
      Iterator localIterator = paramCollection.iterator();
      while (localIterator.hasNext())
      {
        Object localObject1 = localIterator.next();
        if (localObject1 == null) {
          throw new NullPointerException();
        }
        if (i == this.capacity) {
          throw new IllegalStateException("Queue full");
        }
        enqueue(new Node(localObject1));
        i++;
      }
      this.count.set(i);
    }
    finally
    {
      localReentrantLock.unlock();
    }
  }

The default capacity of the first constructor is Integer.MAX_VALUE. The second constructor has only one parameter to specify the capacity, and the third constructor can specify a collection for initialization.

 

Then look at the implementation of its two key methods: put() and take():

  public void put(E paramE)
    throws InterruptedException
  {
// Ensure that the elements are not empty
if (paramE == null) { throw new NullPointerException(); } int i = -1; Node localNode = new Node(paramE); ReentrantLock localReentrantLock = this.putLock; AtomicInteger localAtomicInteger = this.count;
// Response interruption localReentrantLock.lockInterruptibly();
try {
while (localAtomicInteger.get() == this.capacity) { this.notFull.await(); // block } enqueue(localNode); i = localAtomicInteger.getAndIncrement(); if (i + 1 < this.capacity) { this.notFull.signal(); /// signal() is used to improve performance, waking up blocked threads in the notFull conditional queue at put time. } } finally { localReentrantLock.unlock(); } if (i == 0) { signalNotEmpty(); // When put, the blocked thread in the notEmpty conditional queue is waked up with I = 0, but this requires a takeLock lock.
}
}
public E take()
    throws InterruptedException
  {
    int i = -1;
    AtomicInteger localAtomicInteger = this.count;
    ReentrantLock localReentrantLock = this.takeLock;
    localReentrantLock.lockInterruptibly();
    Object localObject1;
    try
    {
      while (localAtomicInteger.get() == 0) {
        this.notEmpty.await();
      }
      localObject1 = dequeue();
      i = localAtomicInteger.getAndDecrement();
      if (i > 1) {
        this.notEmpty.signal();
      }
    }
    finally
    {
      localReentrantLock.unlock();
    }
    if (i == this.capacity) {
      signalNotFull();
    }
    return (E)localObject1;
  }

Similar to the implementation of the put method, the put method waits for the notFull signal, while the take method waits for the notEmpty signal.

LinkedBlockingQueue maintains a data buffer queue (which consists of a linked list). When the producer puts a data into the queue, the queue gets the data from the producer and caches it inside the queue, and the producer returns immediately; only when the queue buffer reaches the maximum buffer capacity (L) InkedBlockingQueue can specify this value by a constructor before blocking the producer queue until the consumer consumes a piece of data from the queue and the producer thread is awakened. Conversely, the processing of the consumer side is based on the same principle.

LinkedBlockingQueue is able to process concurrent data efficiently because it uses separate locks to control data synchronization on both producer and consumer sides. This also means that producers and consumers can manipulate data in the queue in parallel under high concurrency to improve the parallel of the whole queue. Sexual function.

LinkedBlockingQueue uses ReentrantLock internally to implement putLock and takeLock. The conditional variable on putLock is notFull, which can wake up threads blocked on putLock with notFull. The conditional variable on takeLock is notEmtpy, which can wake up threads blocked on takeLock with notEmpty. -

 

Refer to the following blogs:

http://www.cnblogs.com/moonandstar08/p/4893337.html

http://www.cnblogs.com/dolphin0520/p/3932906.html

Keywords: Java

Added by Design on Thu, 18 Jul 2019 05:29:18 +0300