BlockingQueue details

Note: This essay is completely quoted from http://wsmajunfeng.iteye.com/blog/1629354 , it's well written. Thank you very much. Copying it is an accumulation. I'm afraid I won't find it in the future.

I preface

In the new Concurrent package, BlockingQueue solves the problem of how to "transmit" data efficiently and safely in multithreading. These efficient and thread safe queue classes bring great convenience for us to quickly build high-quality multithreaded programs. This article introduces all members of the BlockingQueue family in detail, including their respective functions and common usage scenarios.

II Meet BlockingQueue

Blocking queue, as its name suggests, is a queue, and the role of a queue in the data structure is roughly shown in the following figure:

From the above figure, we can clearly see that through a shared queue, data can be input from one end of the queue and output from the other end;

There are mainly two types of queues in common use: (of course, many different types of queues can be extended through different implementation methods, and DelayQueue is one of them)

First in first out (FIFO): the elements of the queue inserted first are also out of the queue first, which is similar to the function of queuing. To some extent, this queue also reflects a kind of fairness.

Last in first out (LIFO): the elements inserted into the queue first leave the queue. This queue gives priority to the recent events.   


In a multithreaded environment, data sharing can be easily realized through queues. For example, in the classical "producer" and "consumer" models, data sharing between them can be easily realized through queues. Suppose we have several producer threads and several consumer threads. If the producer thread needs to share the prepared data with the consumer thread and use the queue to transfer the data, it can easily solve the problem of data sharing between them. But what if there is a mismatch in data processing speed between producers and consumers in a certain period of time? Ideally, if the producer produces data faster than the consumer consumes, and when the produced data accumulates to a certain extent, the producer must pause and wait (block the producer thread) in order to wait for the consumer thread to process the accumulated data, and vice versa. However, before the release of concurrent package, in the multithreaded environment, each programmer must control these details, especially considering efficiency and thread safety, which will bring great complexity to our program. Fortunately, at this time, the powerful concurrent package was born, and it also brought us a powerful BlockingQueue. (in the field of multithreading: the so-called blocking will suspend the thread (i.e. blocking) in some cases. Once the conditions are met, the suspended thread will wake up automatically). The following two figures illustrate two common blocking scenarios of BlockingQueue:
As shown in the figure above: when there is no data in the queue, all threads on the consumer side will be automatically blocked (suspended) until data is put into the queue.


As shown in the above figure: when the queue is filled with data, all threads on the producer side will be blocked (suspended) automatically until there is an empty position in the queue, and the thread will be awakened automatically.

This is why we need BlockingQueue in a multithreaded environment. As users of BlockingQueue, we no longer need to care about when to block threads and when to wake up threads, because BlockingQueue does all this for you. Since BlockingQueue is so powerful, let's see its common methods:

III The core method of BlockingQueue:

  1. Put data

(1) offer(anObject): if possible, add anObject to BlockingQueue, that is, if BlockingQueue can accommodate, return true; otherwise, return false (this method does not block the current execution method

Thread);        
(2) offer(E o, long timeout, TimeUnit unit): the waiting time can be set. If BlockingQueue cannot be added to the queue within the specified time, failure will be returned.

(3) put(anObject): add anObject to BlockingQueue. If there is no space in BlockQueue, the thread calling this method will be blocked until there is space in BlockingQueue

  2. get data

(1) poll(time): take the first object in the BlockingQueue. If it cannot be taken out immediately, you can wait for the time specified by the time parameter. If not, null will be returned;

(2) poll(long timeout, TimeUnit unit): take out an object of the head of the queue from the BlockingQueue. If there is data available in the queue within the specified time, the data in the queue will be returned immediately. Otherwise you know the time

There is no data available after timeout, and failure is returned.

(3) take(): take the first object in the BlockingQueue. If the BlockingQueue is empty, block it from entering the waiting state until new data is added to the BlockingQueue;  

(4) drawnto(): obtain all available data objects from BlockingQueue at one time (you can also specify the number of data to be obtained). Through this method, the efficiency of obtaining data can be improved; There is no need to lock or release locks in batches for many times.

IV Common BlockingQueue

After understanding the basic functions of BlockingQueue, let's take a look at the general members of the BlockingQueue family?

  1. ArrayBlockingQueue

The implementation of array based blocking queue maintains a fixed length array in the ArrayBlockingQueue to cache the data objects in the queue. This is a common blocking queue. In addition to a fixed length array, ArrayBlockingQueue also stores two shaping variables, which respectively identify the position of the head and tail of the queue in the array.

ArrayBlockingQueue shares the same lock object when the producer places data and the consumer obtains data, which also means that the two cannot run in parallel, which is especially different from LinkedBlockingQueue; According to the analysis of the implementation principle, ArrayBlockingQueue can fully adopt separate lock, so as to realize the complete parallel operation of producer and consumer operations. The reason why Doug Lea didn't do this may be that the data writing and obtaining operations of ArrayBlockingQueue are light enough to introduce an independent locking mechanism. In addition to bringing additional complexity to the code, it can't take any advantage in performance. Another obvious difference between ArrayBlockingQueue and LinkedBlockingQueue is that the former will not generate or destroy any additional object instances when inserting or deleting elements, while the latter will generate an additional Node object. In the system that needs to process large quantities of data efficiently and concurrently for a long time, its impact on GC is still different. When creating ArrayBlockingQueue, we can also control whether the internal lock of the object adopts fair lock, and non fair lock is adopted by default.

  2.LinkedBlockingQueue

Similar to ArrayListBlockingQueue, the blocking queue based on linked list also maintains a data buffer queue (the queue is composed of a linked list). When the producer puts a data into the queue, the queue will get the data from the producer and store it in the queue, and the producer will return immediately; Only when the queue buffer reaches the maximum cache capacity (LinkedBlockingQueue can specify this value through the constructor) will the producer queue be blocked. Until the consumer consumes a piece of data from the queue, the producer thread will be awakened. On the contrary, the processing on the consumer side is also based on the same principle. The reason why LinkedBlockingQueue can efficiently process concurrent data is that it uses independent locks for producer and consumer to control data synchronization, which also means that in the case of high concurrency, producers and consumers can operate the data in the queue in parallel, so as to improve the concurrency performance of the whole queue.

As developers, we should note that if a LinkedBlockingQueue object is constructed without specifying its capacity, the LinkedBlockingQueue will default to a capacity similar to infinite size (Integer.MAX_VALUE). In this case, once the producer's speed is greater than the consumer's speed, it may not wait until the queue is full and blocking occurs, The system memory may have been exhausted.

ArrayBlockingQueue and LinkedBlockingQueue are the two most common and commonly used blocking queues. Generally, when dealing with producer consumer problems between multiple threads, using these two classes is enough.

The following code demonstrates how to use BlockingQueue:

(1) testing

 1 import java.util.concurrent.BlockingQueue;
 2 import java.util.concurrent.ExecutorService;
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.LinkedBlockingQueue; 
 5 
 6 public class BlockingQueueTest {
 7  
 8     public static void main(String[] args) throws InterruptedException {
 9         // Declare a cache queue with a capacity of 10
10         BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
11  
12         //new has three producers and one consumer
13         Producer producer1 = new Producer(queue);
14         Producer producer2 = new Producer(queue);
15         Producer producer3 = new Producer(queue);
16         Consumer consumer = new Consumer(queue);
17  
18         // With Executors
19         ExecutorService service = Executors.newCachedThreadPool();
20         // Start thread
21         service.execute(producer1);
22         service.execute(producer2);
23         service.execute(producer3);
24         service.execute(consumer);
25  
26         // Execute for 10s
27         Thread.sleep(10 * 1000);
28         producer1.stop();
29         producer2.stop();
30         producer3.stop();
31  
32         Thread.sleep(2000);
33         // Exit Executor
34         service.shutdown();
35     }
36 }

(2) producers

 1 import java.util.Random;
 2 import java.util.concurrent.BlockingQueue;
 3 import java.util.concurrent.TimeUnit;
 4 import java.util.concurrent.atomic.AtomicInteger;
 5  
 6 /**
 7  * Producer thread
 8  * 
 9  * @author jackyuj
10  */
11 public class Producer implements Runnable {
12     
13     private volatile boolean  isRunning = true;//Running flag
14     private BlockingQueue queue;//Blocking queue
15     private static AtomicInteger count = new AtomicInteger();//Automatically updated values
16     private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
17  
18     //Constructor
19     public Producer(BlockingQueue queue) {
20         this.queue = queue;
21     }
22  
23     public void run() {
24         String data = null;
25         Random r = new Random();
26  
27         System.out.println("Start the producer thread!");
28         try {
29             while (isRunning) {
30                 System.out.println("Production data in progress...");
31                 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//Take 0 ~ default_ RANGE_ FOR_ A random number of sleep values
32  
33                 data = "data:" + count.incrementAndGet();//Atomically add 1 to the current value of count
34                 System.out.println("Transfer data:" + data + "Put in queue...");
35                 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//The set waiting time is 2s. If it exceeds 2s, it returns true
36                     System.out.println("Failed to put data:" + data);
37                 }
38             }
39         } catch (InterruptedException e) {
40             e.printStackTrace();
41             Thread.currentThread().interrupt();
42         } finally {
43             System.out.println("Exit producer thread!");
44         }
45     }
46  
47     public void stop() {
48         isRunning = false;
49     }
50 }

(3) consumer

 1 import java.util.Random;
 2 import java.util.concurrent.BlockingQueue;
 3 import java.util.concurrent.TimeUnit;
 4  
 5 /**
 6  * Consumer thread
 7  * 
 8  * @author jackyuj
 9  */
10 public class Consumer implements Runnable {
11     
12     private BlockingQueue<String> queue;
13     private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
14  
15     //Constructor
16     public Consumer(BlockingQueue<String> queue) {
17         this.queue = queue;
18     }
19  
20     public void run() {
21         System.out.println("Start the consumer thread!");
22         Random r = new Random();
23         boolean isRunning = true;
24         try {
25             while (isRunning) {
26                 System.out.println("Getting data from queue...");
27                 String data = queue.poll(2, TimeUnit.SECONDS);//When there is data, it is directly taken from the head of the queue. When there is no data, it is blocked. If there is data within 2s, it is taken. If there is no data after 2s, it returns failure
28                 if (null != data) {
29                     System.out.println("Get the data:" + data);
30                     System.out.println("Consuming data:" + data);
31                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
32                 } else {
33                     // If there is no data for more than 2s, it is considered that all production threads have exited, and the consumption thread will exit automatically.
34                     isRunning = false;
35                 }
36             }
37         } catch (InterruptedException e) {
38             e.printStackTrace();
39             Thread.currentThread().interrupt();
40         } finally {
41             System.out.println("Exit consumer thread!");
42         }
43     }
44  
45     
46 }

  3. DelayQueue

The element in DelayQueue can be obtained from the queue only when the specified delay time has expired. DelayQueue is a queue with no size limit, so the operation of inserting data into the queue (producer) will never be blocked, but only the operation of obtaining data (consumer) will be blocked.

Usage scenario:

DelayQueue has few usage scenarios, but they are quite ingenious. Common examples are using a DelayQueue to manage a connection queue that does not respond to timeout.

  4. PriorityBlockingQueue

Priority based blocking queue (the priority is determined by the comparer object passed in by the constructor), but it should be noted that PriorityBlockingQueue will not block the data producer, but only the data consumer when there is no consumable data. Therefore, special attention should be paid when using. The speed of producer production data must not be faster than that of consumer consumption data, otherwise all available heap memory space will be exhausted over time. When implementing PriorityBlockingQueue, the lock of internal thread synchronization adopts fair lock.

  5. SynchronousQueue

A non buffered waiting queue is similar to direct transactions without intermediaries. It is a bit like producers and consumers in primitive society. Producers take products to the market to sell to the final consumers of products, and consumers must go to the market to find the direct producer of the goods they want. If one party fails to find a suitable target, I'm sorry, Everyone is waiting at the market. Compared with the buffered BlockingQueue, there is no intermediate dealer link (buffer zone). If there is a dealer, the producer directly wholesales the products to the dealer, without worrying that the dealer will eventually sell these products to those consumers. Because the dealer can stock some goods, compared with the direct transaction mode, Generally speaking, the intermediate dealer mode will have higher throughput (can be sold in batches); On the other hand, due to the introduction of dealers, additional transaction links are added from producers to consumers, and the timely response performance of a single product may be reduced.

There are two different ways to declare a synchronous queue, which have different behavior. The difference between fair mode and unfair mode:

If the fair mode is adopted: the synchronous queue will adopt a fair lock and cooperate with a FIFO queue to block redundant producers and consumers, so as to the overall fair strategy of the system;

However, in case of unfair mode (SynchronousQueue default): SynchronousQueue adopts unfair lock and cooperates with a LIFO queue to manage redundant producers and consumers. In the latter mode, if there is a gap between the processing speed of producers and consumers, it is easy to be hungry, Some data that producers can't process or will never be available to consumers.

V Summary

BlockingQueue not only realizes the basic functions of a complete queue, but also automatically manages the Automatic wake-up waiting function between multiple lines in a multithreaded environment, so that programmers can ignore these details and pay attention to more advanced functions.

Added by noimad1 on Mon, 07 Mar 2022 10:39:24 +0200