Explanation of the implementation class of BlockingQueue

jdk has several implementation classes of BlockingQueue blocking queue. To sum up, the differences and uses.

Test code

package com.example.demo2.cusdemo.queuedemo;

import java.util.Random;
import java.util.concurrent.*;


public class TestBlockQueue {
    public static void main(String[] args) {
//        BlockingQueue q = new ArrayBlockingQueue(3);
//        BlockingQueue q = new PriorityBlockingQueue(3);
//        BlockingQueue q = new SynchronousQueue();
        BlockingQueue q = new DelayQueue();
//        BlockingQueue q = new LinkedBlockingQueue(3);
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }

}

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { queue.put(produce()); }
//            while (true) { queue.add(produce()); }
//            while (true) { queue.offer(produce(),5, TimeUnit.MILLISECONDS); }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    Object produce() {
        return new Random().nextInt(50);
    }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { consume(queue.take()); }
        } catch (InterruptedException ex) {

        }
    }
    void consume(Object x) {
        System.out.println(Thread.currentThread().getName()+"---consumer---"+x);
    }
}

Look at the class diagram

First: ArrayBlockingQueue

Method of joining the team

Public Boolean add (E) --- offer(e) will be called internally. If false is returned, throw an exception. You need to catch it yourself, otherwise the thread will end when it encounters this exception

Public Boolean offer (E) --- add elements to the queue, return true if adding succeeds, and return false if adding fails.
Public void put (e e e) throws interruptedexception --- this method is most used, but pay attention to the usage scenario. When adding elements to the queue, the queue will be full and the thread will enter blocking. When the element consumption is 0, wake up the thread to add. In actual use, it will cause timeout. Task addition failed. Users need to consider comprehensively
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException---

This method is: if the queue is full, it will be blocked for a period of time, and then judge whether there is an empty position in the queue. If yes - join the queue, and return true if the queue is successful; No count = = items Length, false is returned;

Out of line method

public E poll() -- if count=0 in the queue, null will be returned; otherwise, elements in the queue will be returned
public E take() throws InterruptedException ---- take values from the queue. If there are elements in the queue, they will be returned. If not, they will be blocked. It is used in combination with the put() method
public E poll(long timeout, TimeUnit unit) throws InterruptedException --- if there are elements in the queue, take them out and return them. If they are not blocked for a period of time, see whether there are elements in the queue. They have not returned null.
public E peek(), take out the first element from the queue, which may be null

advantage

Array queue adopts ring array, which has high access efficiency. But the same lock is used for access. This also reflects the principle of read-write mutual exclusion. Efficiency is relatively slow.

Second: LinkedBlockingQueue

Two locks are used for access

private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

This is a single linked list structure

static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

This access efficiency is relatively high

Third: PriorityBlockingQueue

Priority queue: this queue maintains a small top heap, which will store the smallest elements according to the sorting rules. If the collation changes, it is a big top heap. Every time you take it out, you take the top element.

Usage scenario: members have priority to buy tickets. High priority to do something.

Fourth: SynchronousQueue

Synchronization queue, you can specify fair and unfair methods. Each time a task is placed in the queue, the second one will be blocked. Wait for the consumer. After consumption, the generation thread will wake up and continue to put the task in the queue.

Fifth: DelayQueue

The delay queue calculates how long the delay is, and then the thread will block for a period of time before continuing to execute the task.

Keywords: Java

Added by jim35802 on Mon, 07 Feb 2022 12:26:57 +0200