java blocking queue

queue

Queue is a special linear table, which only allows deletion at the front of the table and insertion at the back of the table. Like stack, queue is a linear table with limited operation. The end that performs the insertion operation is called the tail of the queue, and the end that performs the deletion operation is called the head of the queue


In fact, the queue is the same as the usual queue. In order, those who queue first buy things first, and those who queue later buy things. The first one in the queue is called the head of the queue, and the last one is called the tail of the queue. This is the first in first out of the queue, which is the biggest difference from the stack.

Blocking queue

When the queue is empty, the consumer hangs. When the queue is full, the producer hangs. This is the production consumer model. Blocking is actually suspending the thread. Because of the mismatch between the producer's production speed and the consumer's consumption speed, the fast queue can be temporarily blocked by blocking the queue. For example, the producer produces two data per second and the consumer consumes one data per second. When the queue is full, the producer will block (suspend) and wake up after waiting for the consumer to consume.

Blocked queues can achieve a balance between producers and consumers by suspending, which is the biggest difference from ordinary queues.

How

jdk has actually provided us with an implementation scheme. java5 has added the concurrent package. The BlockingQueue in the concurrent package is a blocked queue. We don't need to care about how BlockingQueue implements blocking. Everything is encapsulated for us. We just need to be an API caller without feelings.

How to use BlockingQueue

BlockingQueue itself is just an interface that specifies the method of blocking the queue, which mainly depends on several implementation classes

1. Insert data

  • (1) Offer (E): returns true if the queue is not full, and false (not blocked) if the queue is full
  • (2) Offer (E, long timeout, timeunit unit): you can set the waiting time. If the queue is full, wait. If the waiting time is exceeded, false is returned
  • (3) Put (E): no return value. Wait until the queue is empty

2. Obtain data

  • (1) poll(): out of the queue if there is data; null if there is no data
  • (2) poll(long timeout, TimeUnit unit): you can set the waiting time. If there is no data, wait. If the waiting time is exceeded, null is returned
  • (3) take(): if there is data, leave the queue. If there is no data, keep waiting (blocking)

BlockingQueue mainly implements classes

Class namedata structurecharacteristic
ArrayBlockingQueueBased on the array implementation, it is a bounded queue by setting the array length during initializationThe difference between ArrayBlockingQueue and LinkedBlockingQueue is that ArrayBlockingQueue has only one lock object. One lock object will cause either the producer to obtain the lock or the consumer to obtain the lock. The two compete for the lock and cannot be in parallel.
LinkedBlockingQueueBased on the linked list, the size can be initialized. If it is not set, the default setting size is integer MAX_ VALUELinkedBlockingQueue has two lock objects that can be processed in parallel.
DelayQueueAn unbounded queue based on priorityQueue elements must implement the Delayed interface to support Delayed acquisition. Elements are sorted by time. Consumers can take them out of the queue only after the elements expire
PriorityBlockingQueueAn unbounded queue based on priority. The bottom layer is based on array storage elementsThe elements are stored in the order of preference level. The priority is realized through the compareTo method of Comparable (NATURAL sorting). Unlike other blocked queues, it will only block consumers, not producers, and the array will continue to expand. This is a colored egg. Be careful when using it.
SynchronousQueueThere is no container inside a special queueWhen a producer produces a data, it will be blocked. The producer can only produce again after consumers consume it. It is a little inappropriate to call it a queue. In real life, it can be called a team only by multiple people, and it is unreasonable for one person to be called a team.

Own implementation code

public class BlockingQuery {

    private Object[] tab; //Queue container

    private int takeIndex; //Out of team subscript

    private int putIndex; //Team subscript

    private int size;//Number of elements

    private ReentrantLock reentrantLock = new ReentrantLock();

    private Condition notEmpty;//Read condition

    private Condition notFull;//Write condition

    public YzBlockingQuery(int tabCount) {
        if (tabCount <= 0) {
            new NullPointerException();
        }

        tab = new Object[tabCount];
        notEmpty = reentrantLock.newCondition();
        notFull = reentrantLock.newCondition();
    }

    public boolean offer(Object obj) {
        if (obj == null) { throw new NullPointerException(); }
        try {
            //Acquire lock
            reentrantLock.lock();
            //The queue is full
            while (size==tab.length){
                System.out.println("The queue is full");
                //blocking
                notFull.await();
            }
            tab[putIndex]=obj;
            if(++putIndex==tab.length){
                putIndex=0;
            }
            size++;
            //Wake up read thread
            notEmpty.signal();
            return true;
        } catch (Exception e) {
            //Wake up read thread
            notEmpty.signal();
        } finally {
            reentrantLock.unlock();
        }
        return false;
    }


    public Object take(){
        try {
            reentrantLock.lock();
            while (size==0){
                System.out.println("The queue is empty");
                //blocking
                notEmpty.await();
            }
            Object obj= tab[takeIndex];
            //If you get to the last, start from the beginning
            if(++takeIndex==tab.length){
                takeIndex=0;
            }
            size--;
            //Wake up write thread
            notFull.signal();
            return obj;
        }catch (Exception e){
            //Wake up write thread
            notFull.signal();
        }finally {
            reentrantLock.unlock();
        }
        return null;
    }


    public static void main(String[] args) {
        Random random = new Random(100);
        YzBlockingQuery yzBlockingQuery=new YzBlockingQuery(5);
        Thread thread1 = new Thread(() -> {
            for (int i=0;i<100;i++) {
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                yzBlockingQuery.offer(i);
                System.out.println("The producer produced:"+i);
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i=0;i<100;i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Object take = yzBlockingQuery.take();
                System.out.println("Consumer consumption:"+take);
            }
        });

        thread1.start();
        thread2.start();
    }
}

In practice, most situations do not need to be implemented by yourself. In actual use, you can master it and select the corresponding implementation class according to the actual situation.
LinkedBlockingQueue

This article comes from

Keywords: Java

Added by rubric on Mon, 17 Jan 2022 18:59:08 +0200