Queue Brief
Queue: basically, a queue is a first in first out (FIFO) data structure.
The Queue interface is at the same level as List and Set, and both inherit the Collection interface. LinkedList implements the Deque interface.
On the concurrent Queue, JDK provides two sets of implementations, one is the high-performance Queue non blocking represented by ConcurrentLinkedQueue, and the other is the blocking Queue represented by BlockingQueue interface, which inherits from the Queue interface.
Blocking queue and non blocking queue
The difference between a blocking queue and an ordinary queue is that when the queue is empty, the operation of obtaining elements from the queue will be blocked, or when the queue is full, the operation of adding elements to the queue will be blocked. Threads trying to get elements from an empty blocking queue will be blocked until other threads insert new elements into the empty queue. Similarly, threads trying to add new elements to a full blocking queue will also be blocked until other threads make the queue idle again, such as removing one or more elements from the queue or completely emptying the queue.
data:image/s3,"s3://crabby-images/a07ba/a07ba55697ecd05f975116ef340fc5748abdbb59" alt=""
Non blocking queue
LinkedList of blocking interface not implemented: implemented Java util. Queue interface and Java util. Abstractqueue interface.
Built in non blocking queues: PriorityQueue and ConcurrentLinkedQueue.
PriorityQueue and ConcurrentLinkedQueue classes add two concrete collection implementations to the Collection Framework.
The PriorityQueue class essentially maintains a sequence table. The elements added to the Queue are sorted according to their natural order (through its java.util.Comparable Implementation) or according to the java.util.Comparable passed to the constructor util. Comparator implementation.
ConcurrentLinkedQueue is a thread safe queue based on linked nodes. Concurrent access does not require synchronization. Because it adds elements at the end of the queue and removes them from the head, concurrent linkedqueue's shared access to public collections works well as long as you don't need to know the size of the queue. Collecting information about queue size can be slow and requires traversing the queue.
ConcurrentLinkedQueue: it is a queue suitable for high concurrency scenarios. It is implemented in a lock free manner
It provides high performance under high concurrency. Generally, the performance of ConcurrentLinkedQueue is better than that of BlockingQueue. it
It is an unbounded thread safe queue based on linked nodes. The elements of the queue follow the principle of first in first out, and the head is the first
The tail of the queue is the latest one. null elements are not allowed in this queue.
Important methods of ConcurrentLinkedQueue:
Both add() and offer() are methods for adding elements (there is no difference between the two methods in ConcurrentLinkedQueue).
poll() and peek() are header element nodes. The difference is that the former will delete the element, while the latter will not.
Example of ConcurrentLinkedQueue:
@RequestMapping("test-clq") public void testConcurrentLinkedQueue() { ConcurrentLinkedDeque<String> q = new ConcurrentLinkedDeque<>(); q.offer("Java"); q.offer("C#"); q.offer("Javascript"); q.offer("Python"); // Get the element from the beginning and delete it System.out.println(q.poll()); // Get the element from the beginning without deleting it System.out.println(q.peek()); // Get total length System.out.println(q.size()); // ergodic for (String s : q) { System.out.println(s); } }
result: Java C# 3 C# Javascript Python
Blocking queue
Blocking queue, i.e. BlockingQueue, is a queue. Through a shared queue, data can be input from one end of the queue and output from the other end.
There are mainly two types of queues in common use: (of course, many different types of queues can be extended through different implementation methods, and DelayQueue is one of them)
- First in first out (FIFO): the elements of the queue inserted first are also out of the queue first, which is similar to the function of queuing. To some extent, this queue also reflects a kind of fairness.
- Last in first out (LIFO): the elements inserted into the queue first leave the queue. This queue gives priority to the recent events.
In multi-threaded environment, data sharing can be easily realized through queues. For example, in the classical "producer" and "consumer" models, data sharing between them can be easily realized through queues. Suppose we have several producer threads and several consumer threads. If the producer thread needs to share the prepared data with the consumer thread and use the queue to transfer the data, it can easily solve the problem of data sharing between them. But what if the producer and consumer do not match the data processing speed in a certain period of time? Ideally, if the producer produces data faster than the consumer consumes, and when the produced data accumulates to a certain extent, the producer must pause and wait (block the producer thread) in order to wait for the consumer thread to process the accumulated data, and vice versa. However, before the release of concurrent package, in a multithreaded environment, each programmer must control these details, especially considering efficiency and thread safety, which will bring great complexity to our program. Fortunately, at this time, the powerful concurrent package was born, and it also brought us a powerful BlockingQueue. (in the field of multithreading: the so-called blocking will suspend the thread (i.e. blocking) in some cases. Once the conditions are met, the suspended thread will be awakened automatically.)
A blocking queue is a queue that supports two additional operations. The two additional operations are:
- When the queue is empty, the thread that gets the element waits for the queue to become non empty.
- When the queue is full, the thread that stores the element waits for the queue to become available.
Blocking queues are often used in the scenario of producers and consumers. Producers are threads that add elements to the queue, and consumers are threads that take elements from the queue. A blocking queue is a container where producers store elements, and consumers only get elements from the container.
BlockingQueue queue. As can be seen from the word blocking, access to the blocking queue may cause blocking in some cases. There are two main types of blocking:
- When the queue is full, enter the queue.
- When the queue is empty, perform the out of queue operation.
Therefore, when a thread attempts to perform an in queue operation on a full queue, it will be blocked unless another thread performs an out of queue operation. Similarly, when a thread attempts to perform an out of queue operation on an empty queue, it will be blocked unless another thread performs an in queue operation.
In Java, the interface of BlockingQueue is located in Java util. In the concurrent package (provided in Java version 5), we can see from the features of the blocking queue described above that the blocking queue is thread safe.
In the new Java util. In concurrent package, BlockingQueue solves the problem of how to "transmit" data efficiently and safely in multithreading. These efficient and thread safe queue classes bring great convenience for us to quickly build high-quality multithreaded programs.
The following table shows the operations of blocking queues in jdk 1.5:
Method name | describe |
---|---|
add | Add a meta index. If the queue is full, an iiiegaislabeeplian exception will be thrown |
remove | Remove and return the element at the head of the queue. If the queue is empty, a NoSuchElementException is thrown |
element | Returns the element of the queue header. If the queue is empty, a NoSuchElementException is thrown |
offer | Add an element and return true. If the queue is full, return false |
poll | Remove and return the element of the queue header. If the queue is empty, null is returned |
peek | Returns the element of the queue header. If the queue is empty, null is returned |
put | Add an element and block if the queue is full |
take | Removes and returns the element at the head of the queue. If the queue is empty, it is blocked |
ArrayBlockingQueue
ArrayBlockingQueue is a bounded blocking queue, and its internal implementation is an array. A boundary means that its capacity is limited. We must specify its capacity during initialization. Once specified, the capacity cannot be changed.
ArrayBlockingQueue stores data in a first in first out manner. The latest inserted object is the tail, and the latest removed object is the head.
LinkedBlockingQueue
The configuration of the size of the LinkedBlockingQueue blocking queue is optional. If we specify a size during initialization, it is bounded. If we do not specify it, it is unbounded. It is said to be borderless. In fact, the default size is integer MAX_ The capacity of value, its internal implementation is a linked list.
Like ArrayBlockingQueue, LinkedBlockingQueue also stores data in a first in first out manner. The latest inserted object is the tail and the latest removed object is the head.
PriorityBlockingQueue
PriorityBlockingQueue is a queue without boundaries. Its sorting rules are the same as Java util. Like PriorityQueue. Note: it is allowed to insert null objects in PriorityBlockingQueue.
All objects inserted into PriorityBlockingQueue must implement Java Lang. comparable interface. The sorting rule of queue priority is defined according to our implementation of this interface.
In addition, we can get an Iterator from the PriorityBlockingQueue, but this Iterator does not guarantee to iterate in priority order.
SynchronousQueue
Only one element is allowed inside a SynchronousQueue queue. When a thread inserts an element, it will be blocked unless the element is consumed by another thread.
DelayQueue
DelayQueue (implemented based on PriorityQueue) is an unbounded blocking queue that stores Delayed elements. Elements can be extracted from it only when the delay expires. The header of the queue is the Delayed element with the longest retention time after the delay expires. If none of the delays have expired, the queue has no header and poll will return null. When the getDelay(TimeUnit.NANOSECONDS) method of an element returns a value less than or equal to zero, the expiration occurs, and the poll will remove the element. Null elements are not allowed in this queue.
As an example, BlockingQueue is used to simulate producers and consumers:
// 生产者 public class ProducerThread implements Runnable { private BlockingQueue<String> queue; private AtomicInteger count = new AtomicInteger(); private volatile boolean FLAG = true; public ProducerThread(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "生产者开始启动...."); while (FLAG) { String data = count.incrementAndGet() + ""; try { boolean offer = queue.offer(data, 2, TimeUnit.SECONDS); if (offer) { System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功.."); } else { System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败.."); } Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ",生产者线程停止..."); } public void stop() { this.FLAG = false; } }
// 消费者 public class ConsumerThread implements Runnable { private volatile boolean FLAG = true; private BlockingQueue<String> blockingQueue; public ConsumerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "消费者开始启动...."); while (FLAG) { try { String data = blockingQueue.poll(2, TimeUnit.SECONDS); if (data == null || data == "") { FLAG = false; System.out.println("消费者超过2秒时间未获取到消息."); return; } System.out.println("消费者获取到队列信息成功,data:" + data); } catch (Exception e) { e.printStackTrace(); } } } }
// 测试 @RequestMapping("test-blockingQueue") public void testBlockingQueue() { LinkedBlockingDeque<String> blockingDeque = new LinkedBlockingDeque<>(1); ProducerThread producerThread = new ProducerThread(blockingDeque); ConsumerThread consumerThread = new ConsumerThread(blockingDeque); Thread t1 = new Thread(producerThread, "生产者"); Thread t2 = new Thread(consumerThread, "消费者"); t1.start(); t2.start(); // 10秒后停止线程 try { Thread.sleep(10 * 1000); producerThread.stop(); } catch (InterruptedException e) { e.printStackTrace(); } }
result:
Consumer starts
The producer starts
Producer, production queue 1 succeeded
Consumer successfully obtained queue information, data:1
Producer, production queue 2 succeeded
The consumer successfully obtained the queue information, data:2
Producer, production queue 3 succeeded
The consumer successfully obtained the queue information, data:3
Producer, production queue 4 succeeded
The consumer successfully obtained the queue information, data:4
Producer, production queue 5 succeeded
The consumer successfully obtained the queue information, data:5
Producer, production queue 6 succeeded
The consumer successfully obtained the queue information, data:6
Producer, production queue 7 succeeded
The consumer successfully obtained the queue information, data:7
Producer, production queue 8 succeeded
The consumer successfully obtained the queue information, data:8
Producer, production queue 9 succeeded
The consumer successfully obtained the queue information, data:9
Producer, production queue 10 succeeded
The consumer successfully obtained the queue information, data:10
Producer, producer thread stop
The consumer has not received the message for more than 2 seconds