Java multithreading -- blocking queue -- use / tutorial / example

Original website:

brief introduction

explain

This article introduces the usage of blocking queue in Java with examples.

Queue type

There are several types of BlockingQueue: ArrayBlockingQueue, LinkedBlockingQueue, synchroousqueue, PriorityBlockingQueue, DelayedWorkQueue.

Queue typeexplain
ArrayBlockingQueue

FIFO queue based on array; Bounded; Size must be specified when creating;

Use a re-entry lock. The default is a non fair lock. The queue in and queue out share a lock and are mutually exclusive.

LinkedBlockingQueue

FIFO queue based on linked list; Bounded / unbounded; The default size is integer MAX_ Value (unbounded), customizable (bounded);

The two re-entry locks respectively control the entry and exit of elements, and use Condition to wake up and wait between threads.

The throughput is usually higher than ArrayBlockingQueue.

The LinkedBlockingQueue of the default size will cause new tasks to wait in the queue when all corePoolSize threads are busy. In this way, the thread created will not exceed corePoolSize. (therefore, the value of maximumPoolSize is invalid.) When each task is completely independent of other tasks, that is, task execution does not affect each other, it is suitable to use unbounded queue; For example, in a Web page server. This kind of queuing can be used to handle transient burst requests. This strategy allows unbounded threads to have the possibility of growth when commands arrive continuously more than the average number that the queue can handle.

SynchronousQueue

Waiting queue without cache; Unbounded; It can be considered that the size is 0.
Instead of saving the submitted tasks, submit them directly. If there are more than corePoolSize tasks, directly create a new thread to execute the tasks until (corePoolSize + new thread) > maximumpoolsize.

This strategy can avoid locks when processing request sets that may have internal dependencies. Direct submission usually requires unbounded maximumPoolSizes to avoid rejecting newly submitted tasks. This strategy allows unbounded threads to grow when commands arrive continuously more than the average number that the queue can handle.

Throughput is usually higher than LinkedBlockingQueue.

/ / it is also said that it is a blocking queue that does not store elements. Each insert operation must wait until another thread calls the remove operation, otherwise the insert operation will remain blocked

See the following for details: execute Process of CachedThreadPool

PriorityBlockingQueue

Priority queue based on linked list; Bounded / unbounded; The default size is integer MAX_ Value, customizable;

Similar to LinkedBlockingQueue, but the sorting of the objects contained in it is not FIFO, but determined by the natural order of the objects or the Comparator of the constructor.

DelayedWorkQueue

common method

Put data

methodexplain
offer(E e)

Insert an element at the end of the queue.

If there is free in the queue, true will be returned after successful insertion

If the queue is full, discard the current element and return false.

If the e element is null, a NullPointerException exception is thrown.

The method is non blocking.

offer(E o, long timeout, TimeUnit unit)The waiting time can be set. If BlockingQueue cannot be added to the queue within the specified time, failure will be returned.
add(E e)

Internally call the offer method.

The difference from calling offer directly:

add: throw an exception when failure occurs

offer: false is returned in case of failure

put(E e)

Insert an element at the end of the queue.

If there is free in the queue, it will be inserted and returned directly.

If the queue is full, the current thread will be blocked until the queue is free. It will return after successful insertion.

If the interrupt flag is set by other threads when blocking, the blocked thread will throw an InterruptedException exception and return.

If the e element is null, a NullPointerException exception is thrown.

get data

methodexplain
poll()

Gets the current queue header element and removes it from the queue.

Returns null if the queue is empty.

poll(long timeout, TimeUnit unit)

Take (delete) the object of a queue leader from the BlockingQueue.

Once there is data available within the specified time, the data in the queue is returned immediately.

If no data is available until the time expires, failure is returned.

take()

Gets the current queue header element and removes it from the queue.

If the queue is empty, block the current thread until the queue is not empty, and then return the element;

If the interrupt flag is set by other threads when blocking, the blocked thread will throw an InterruptedException exception and return.

drainTo()

Obtain (delete) all available data objects from BlockingQueue at one time (the number of data can be specified).

This method can improve the efficiency of obtaining data and does not need to lock or release locks in batches for many times.

Other methods

methodexplain
remainingCapacity()Gets the space remaining in the queue
contains(Object o)Determine whether the value is in the queue.
remove(Object o)Removes the specified value from the queue.
size()

Gets how many values are in the queue.

(returns the value of AtomicLong)

ArrayBlockingQueue

brief introduction

ArrayBlockingQueue is a FIFO bounded blocking queue implemented through an array. Its size is fixed when the instance is initialized and cannot be changed.

This class supports an optional fairness policy, which is used to sort the blocked and waiting threads to obtain exclusive locks. Because the internal operations of ArrayBlockingQueue need to obtain a ReentrantLock lock lock, which supports fairness policy, the fairness policy of ArrayBlockingQueue directly acts on the ReentrantLock lock lock to determine whether the thread has the right to obtain locks fairly. It is unfair by default. In fair mode, the queue grants thread access in FIFO order. Fairness usually reduces throughput, but reduces variability and avoids hunger.

Defects in ArrayBlockingQueue

It can be seen from the source code that almost every operation method in ArrayBlockingQueue needs to obtain the same ReentrantLock exclusive lock before it can be carried out, which greatly reduces the throughput. Almost every operation will block other operations. The most important thing is that the insertion and extraction operations are mutually exclusive. Therefore, ArrayBlockingQueue is not suitable for efficient data generation and consumption scenarios that require high throughput. LinkedBlockingQueue can make up for its low throughput.

example

Create a thread pool with corePoolSize of 2 and maximumPoolSize of 3. Set 2 tasks in the blockarrayqueue cache. Perform 6 tasks. ArrayBlockingQueue is a bounded queue:

  1. Tasks 1 and 2 are executed in the core thread;
  2. When tasks 3 and 4 come in, they are placed in the ArrayBlockingQueue cache queue, and only two can be placed (the size of ArrayBlockingQueue is set to 2);
  3. When tasks 5 and 6 come in, task 5 creates a new thread to execute the task, and the maximum number of threads has reached 3, so task 6 refuses;
  4. When a thread has finished executing, take tasks 3 and 4 out of the queue for execution

The code for creating thread pool is as follows:

  /**
     * ArrayBlockingQueue
     */
    private static void arrayQueue() {
        System.out.println("\n\n =======ArrayBlockingQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new RejectHandler());
        execute(executors);
    }

The implementation results are as follows

1 is running... 
2 is running... 
6 is rejected ^^ //6 rejected
5 is running...  //5 new thread execution
1 is end !!! 
2 is end !!! 
3 is running... //1 and 2 are executed before 3 and 4 are executed
4 is running... 
5 is end !!! 

LinkedBlockingQueue

brief introduction

The similarities between LinkedBlockingQueue and ArrayBlockingQueue:

  1. Is a FIFO queue, null value insertion is not allowed.
  2. The capacity cannot be changed after the instance is constructed

difference

termLinkedBlockingQueueArrayBlockingQueue
Size assignmentWhen instantiating, you can specify the queue size or not (at this time, the default is Integer.MAX_VALUE).Size must be specified when instantiating.
throughput

Big.

The "double lock queue" algorithm is adopted. The entry and exit of elements are realized by putLock and takeLock respectively.

Small.

Almost every method needs to obtain the same ReentrantLock exclusive lock first.

example

Create a thread pool with corePoolSize of 2 and maximumPoolSize of 3. Unbounded queue. Also perform six tasks

  1. The core thread executes tasks 1 and 2, and other tasks 3 to 6 are put in the queue
  2. After executing 1 and 2, take 3 and 4 out of the queue for execution
  3. After executing 3 and 4, take 5 and 6 out of the queue

The code for creating thread pool is as follows:

 
    /**
     * LinkedBlockingQueue
     */
    private static void linkedQueue() {
        System.out.println("\n\n =======LinkedBlockingQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new RejectHandler());
        execute(executors);
    }

The operation results are as follows:

1 is running... 
2 is running... //Intermediate thread sleep 
2 is end !!!   //It will not run until 10s later
1 is end !!! 
3 is running...  //Tasks 3 and 4
4 is running... 
4 is end !!! 
3 is end !!! 
6 is running... 
5 is running... 
5 is end !!! 
6 is end !!! 

SynchronousQueue

explain

Create a thread pool with corePoolSize of 2 and maximumPoolSize of 3. Perform 6 tasks. According to my understanding, SynchrousQueue is a non cached queue (the theoretical basis is SynchrousQueue source code. You can see that isEmpty() is always true; size() always returns 0)

According to the parameter settings, only three tasks can be performed:

  1. Two core threads execute two tasks;
  2. In the third task, create a thread to execute task 3;
  3. When the fourth task comes, it has exceeded the maximumPoolSize, so the task is rejected.

code

     /**
     * SynchronousQueue
     */
    private static void syncQueue() {
        System.out.println("\n\n =======SynchronousQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new RejectHandler());
        execute(executors);
    }

results of enforcement

1 is running... 
4 is rejected ^^ //4 rejected
2 is running... 
3 is running... 
5 is rejected ^^  //5 rejected
6 is rejected ^^  //6 rejected
3 is end !!! 
1 is end !!! 
2 is end !!! 

PriorityBlockingQueue

brief introduction

PriorityBlockingQueue is a blocking queue with infinite capacity. Since the capacity is infinite, queueing operations such as put actually do not have blocking. As long as there is enough memory, you can successfully queue up immediately. Of course, there are mutually exclusive accesses competing for unique locks between multiple queueing threads. Although PriorityBlockingQueue is logically unbounded, it is possible to throw OutOfMemoryError when trying to add elements because of resource exhaustion.

The queue is also not allowed to put null values. It uses the same class as Java util. PriorityQueue is the same sort rule, and it is not allowed to put non comparable objects, which will lead to ClassCastException.

It is worth noting that although PriorityBlockingQueue is called priority queue, it does not mean that the elements will be ranked according to the sorting rules as soon as they enter the queue, but only the queue order transferred out by calling take, poll method or drawto is ranked by the priority queue. Therefore, the element order of iterator iterations returned by calling iterator() and splittable iterator() methods is not sorted. If you need to traverse in order, you can use arrays Sort (PQ. Toarray()) method. Note that the peek method always gets and does not delete the first element, so multiple calls to peek return the same value.

PriorityBlockingQueue is actually sorted by Comparator. Either the queued elements implement the Comparator interface (the so-called natural sorting), or when constructing the PriorityBlockingQueue instance, a unified Comparator instance is passed in. If both are used, the latter shall prevail

PriorityBlockingQueue does not guarantee the order of elements with the same priority, but you can define a user-defined class or comparator to determine the order of elements with the same priority through auxiliary attributes, which will be illustrated later.

DelayedWorkQueue

brief introduction

Why not use DelayQueue directly and re implement a DelayedWorkQueue? It may be convenient to add some extensions in the implementation process.

Usage scenario

  • Implementation of retry mechanism. For example, when the calling interface fails, the elements of the current call information are placed in the delay=10s element, and then the elements are placed in the queue. Then the queue is a retry queue. A thread gets the interface that needs retry through take method. Take returns, the interface retests, and the failure is placed in the queue again, and the number of retry can be added to the element.

  • Internal implementation of TimerQueue

Sequential consumption instance

scene

The machine should do the following tasks for the mobile phone in order: production, packaging and delivery. Consumer is waiting for receipt.

code

package org.example.a;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Phone {
    /**
     * Phone status:
     * PRODUCED: Produced
     * PACKED: Packaged
     * DELIVERED: Shipped
     * <p>The status of the phone can only be changed from produced - > packed - > delivered
     */
    public enum Status {
        PRODUCED, PACKED, DELIVERED
    }
    private Status status = Status.PRODUCED;//The default state is PRODUCED
    private final int id;
    public Phone(int id) { this.id =  id;}
    public void pack() {status = Status.PACKED;}
    public void deliver() {status = Status.DELIVERED;}
    public Status getStatus() {return status;}
    public int getId() {return id;}
    public String toString() {
        return "Phone id: " + id + ", status: " + status;
    }
}

class PhoneQueue extends LinkedBlockingQueue<Phone> {}

/**
 * The task of producing mobile phones.
 */
class Producer implements Runnable {
    private PhoneQueue PhoneQueue;
    private int count = 0;
    private Random random = new Random(47);
    public Producer(PhoneQueue queue) {
        this.PhoneQueue = queue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
                //Produce a piece of mobile phones. These mobile phones are orderly
                Phone Phone = new Phone(count++);
                System.out.println(Phone);
                //Put it in PhoneQueue
                PhoneQueue.put(Phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Producer interrupted.");
        }
        System.out.println("Producer off.");
    }
}

/**
 * Packaged tasks.
 */
class Packer implements Runnable {
    private PhoneQueue producedQueue;
    private PhoneQueue PackedQueue;
    public Packer(PhoneQueue producedQueue, PhoneQueue PackedQueue) {
        this.producedQueue = producedQueue;
        this.PackedQueue = PackedQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //It will be blocked until the next phone is obtained
                Phone Phone = producedQueue.take();
                Phone.pack();
                System.out.println(Phone);
                PackedQueue.put(Phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Packer interrupted.");
        }
        System.out.println("Packer off.");
    }
}

/**
 * The task of shipping.
 */
class Deliverer implements Runnable {
    private PhoneQueue butteredQueue;
    private PhoneQueue finishedQueue;
    public Deliverer(PhoneQueue butteredQueue, PhoneQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
        this.butteredQueue = butteredQueue;
    }

    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //It will be blocked until the next phone is obtained
                Phone phone = butteredQueue.take();
                phone.deliver();
                System.out.println(phone);
                finishedQueue.put(phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Deliverer interrupted.");
        }
        System.out.println("Deliverer off.");
    }
}

/**
 * People who buy mobile phones, consumers.
 */
class Consumer implements Runnable {
    private PhoneQueue finishedQueue;
    private int count = 0;
    public Consumer(PhoneQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //It will be blocked until the next phone is obtained
                Phone phone = finishedQueue.take();
                //Verify that the obtained mobile phones are in order and the status is DELIVERED
                if (phone.getId() != count++ ||
                        phone.getStatus() != Phone.Status.DELIVERED) {
                    System.out.println("Error -> " + phone);
                    System.exit(-1);
                } else {
                    //Using mobile phones
                    System.out.println(phone + "->Use");
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer interrupted.");
        }
        System.out.println("Consumer off.");
    }
}
public class Demo {
    public static void main(String[] args) {
        PhoneQueue producedQueue = new PhoneQueue();
        PhoneQueue packedQueue = new PhoneQueue();
        PhoneQueue deliveredQueue = new PhoneQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(producedQueue));
        exec.execute(new Packer(producedQueue, packedQueue));
        exec.execute(new Deliverer(packedQueue, deliveredQueue));
        exec.execute(new Consumer(deliveredQueue));
        try {
            TimeUnit.SECONDS.sleep(5);
        }catch (Exception e){
            e.printStackTrace();
        }
        exec.shutdownNow();
    }
}

results of enforcement

Phone id: 0, status: PRODUCED
Phone id: 0, status: PACKED
Phone id: 0, status: DELIVERED
Phone id: 0, status: DELIVERED->Use
Phone id: 1, status: PRODUCED
Phone id: 1, status: PACKED
Phone id: 1, status: DELIVERED
Phone id: 1, status: DELIVERED->Use
Phone id: 2, status: PRODUCED
Phone id: 2, status: PACKED
Phone id: 2, status: DELIVERED
Phone id: 2, status: DELIVERED->Use
Phone id: 3, status: PRODUCED
Phone id: 3, status: PACKED
Phone id: 3, status: DELIVERED
Phone id: 3, status: DELIVERED->Use
Phone id: 4, status: PRODUCED
Phone id: 4, status: PACKED
Phone id: 4, status: DELIVERED
Phone id: 4, status: DELIVERED->Use
Phone id: 5, status: PRODUCED
Phone id: 5, status: PACKED
Phone id: 5, status: DELIVERED
Phone id: 5, status: DELIVERED->Use
Phone id: 6, status: PRODUCED
Phone id: 6, status: PACKED
Phone id: 6, status: DELIVERED
Phone id: 6, status: DELIVERED->Use
Phone id: 7, status: PRODUCED
Phone id: 7, status: PACKED
Phone id: 7, status: DELIVERED
Phone id: 7, status: DELIVERED->Use
Consumer interrupted.
Packer interrupted.
Packer off.
Deliverer interrupted.
Deliverer off.
Producer interrupted.
Producer off.
Consumer off.

Other websites

ArrayBlockingQueue of Java synchronous data structure - don't wait for Sakura to step on the snow in the spring - blog Garden

Principle and usage of BlockingQueue_ Java_Yolanda_NuoNuo's column - CSDN blog Java concurrency (IV) use of BlockingQueue - ferry - use of OSCHINALinkedBlockingQueue_ Java_ A small cotton blog - CSDN blog

Thread pool queue_ watson1360884839 blog - CSDN blog
Three kinds of cache queues for thread pool_ Java_nihaomabmt's column - CSDN blog

Keywords: Java Back-end Multithreading

Added by sone_br on Sun, 06 Feb 2022 04:25:09 +0200