Blocking queue correlation

1. What is a blocking queue?

  • As the name suggests, first it is a queue:

  • When the blocking queue is empty, the operation of getting elements from the queue will be blocked.

  • When the blocking queue is full, the operation of adding elements to the queue will be blocked.

  • Similarly, trying to add a new thread to the full blocking queue will also be blocked until other threads remove one or more elements from the queue or completely empty the queue, making the queue idle again and adding new threads later.

2. What's the use?

  • In the field of multithreading: the so-called blocking will suspend the thread in some cases (i.e. thread blocking). Once the conditions are met, the suspended thread will be awakened automatically
  • In Java, the interface blocking queue is defined in the JUC package
  • Before the release of JUCt package, in a multithreaded environment, each programmer must control these details, especially considering efficiency and thread safety, which will bring great complexity to the program
  • Now, we don't need to care when we need to block threads or wake up threads. BlockingQueue does it all

3. BlockingQueue in JUC

  • Find it in the JUC package and right-click idea to open its class diagram:

  • BlockingQueue inherits from the Queue interface. In order to facilitate understanding, I also opened the class diagram of the List interface. Both the List interface and Queue interface inherit from the Collection interface.
  • Let's first look at some implementation classes in the List interface, such as ArrayList and LinkedList. CopyOnWriteArrayList talked about thread unsafe verification and resolution of ArrayList before. See the following for details: Thread unsafe verification and resolution of Java collection class ArrayList
  • With the List interface and its implementation as an analogy, it is relatively easy to look at the BlockingQueue interface.

3.1 first look at the core method of BlockingQueue:

  • add(): add elements, remove(): remove elements. The two methods throw exceptions when they fail.
  • offer(): add elements, poll(): remove elements. The two methods return true on success and false on failure.
  • put(): add element, take(): once element, the two methods will block until success or interrupt exit

3.2 several implementation classes of BlockingQueue:

  • ArrayBlockingQueue: a bounded blocking queue composed of arrays.
  • LinkedBlockingQueue: a bounded blocking queue composed of linked lists (Note: the default size of the comparison pit is Interger.MAX_VALUE)
  • SynchronousQueue: a blocking queue that does not store elements, but contains elements
  • PriorityBlockingQueue: an unbounded blocking queue that supports priority
  • DelayQueue: delay unbounded blocking queue supporting priority
  • LinkedTransferQueue: an unbounded blocking queue composed of a linked list structure
  • LinkedBlockingQeque: a bidirectional blocking queue composed of linked list structure.

4. Where is it used?

  • Producer consumer model
  • Thread pool
  • Message Oriented Middleware
    4.1 producer consumer model
  • Traditional producer consumer implementation using Lock
  • -Demo1:
/**
 * Producer consumer traditional edition
 *
 * @author wangjie
 * @version V1.0
 * @date 2019/12/24
 */
public class ProdConsumerTraditionDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();
        new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                try {
                    shareData.deIncrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();
    }
}

/**
 * Shared resource class
 */
class ShareData {
    private int num = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception {
        lock.lock();
        try {
            //judge
            while (num != 0) {
                //Waiting for no production
                condition.await();
            }
            //work
            num++;
            System.out.println(Thread.currentThread().getName() + "\t" + num);
            //Notification wake up
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void deIncrement() throws Exception {
        lock.lock();
        try {
            //judge
            while (num == 0) {
                //Waiting for no production
                condition.await();
            }
            //work
            num--;
            System.out.println(Thread.currentThread().getName() + "\t" + num);
            //Notification wake up
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}
  • Use the value of num to exchange between threads. num=0 means producer production and num=1 means consumer consumption
  • Operation results:
AA	1
BB	0
AA	1
BB	0
AA	1
BB	0
AA	1
BB	0
AA	1
BB	0

Process finished with exit code 0

  • Blocked queue producer consumer:
  • Demo2:
/**
 * Blocking queue producer consumer
 *
 * @author wangjie
 * @version V1.0
 * @date 2019/12/24
 */
public class ProdConsumerBlockQueueDemo {

    public static void main(String[] args) throws Exception {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t Production thread start");
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"Prod").start();

        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t Consumer thread start");
            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consumer").start();
        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println("Time out,Stop activity");
        myResource.stop();
    }
}

/**
 * Resource class
 */
class MyResource {
    /**
     * The interaction of production and consumption is enabled by default
     */
    private volatile boolean flag = true;
    /**
     * The default value is 0
     */
    private AtomicInteger atomicInteger = new AtomicInteger();

    private BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws Exception {
        String data = null;
        boolean returnValue;
        while (flag) {
            data = atomicInteger.incrementAndGet() + "";
            returnValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if (returnValue) {
                System.out.println(Thread.currentThread().getName() + "\t Insert queue data" + data + "success");
            } else {
                System.out.println(Thread.currentThread().getName() + "\t Insert queue data" + data + "fail");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "\t Stop representation flag" + flag);
    }

    public void myConsumer() throws Exception {
        String result = null;
        while (flag) {
            result = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(null==result||"".equalsIgnoreCase(result)){
                flag=false;
                System.out.println(Thread.currentThread().getName()+"\t"+"More than 2 m No consumption withdrawal");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName() + "Consumption queue" + result + "success");

        }
    }
    public void stop() throws Exception{
        flag=false;
    }
}
  • Operation results:
java.util.concurrent.ArrayBlockingQueue
Prod	Production thread start
consumer	Consumer thread start
Prod	 Inserting queue data 1 succeeded
consumer Consumption queue 1 succeeded
Prod	 Inserting queue data 2 succeeded
consumer Consumption queue 2 succeeded
Prod	 Inserting queue data 3 succeeded
consumer Consumption queue 3 succeeded
Prod	 Inserting queue data 4 succeeded
consumer Consumption queue 4 succeeded
Prod	 Insert queue data 5 succeeded
consumer Consumption queue 5 succeeded



Time out,Stop activity
Prod	 Insert queue data 6 succeeded
consumer Consumption queue 6 succeeded
Prod	 Stop representation flagfalse

Process finished with exit code 0

  • If you skim the source code of ArrayBlockingQueue, you will find:
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

4.2 thread pool and message middleware will be discussed later...
[End]
Note: source code of all test cases in this article: https://gitee.com/wjie2018/test-case.git

Keywords: Java

Added by knelson on Mon, 29 Nov 2021 16:25:35 +0200