queue
Queue is a special linear table, which only allows deletion at the front of the table and insertion at the back of the table. Like stack, queue is a linear table with limited operation. The end that performs the insertion operation is called the tail of the queue, and the end that performs the deletion operation is called the head of the queue
In fact, the queue is the same as the usual queue. In order, those who queue first buy things first, and those who queue later buy things. The first one in the queue is called the head of the queue, and the last one is called the tail of the queue. This is the first in first out of the queue, which is the biggest difference from the stack.
Blocking queue
When the queue is empty, the consumer hangs. When the queue is full, the producer hangs. This is the production consumer model. Blocking is actually suspending the thread. Because of the mismatch between the producer's production speed and the consumer's consumption speed, the fast queue can be temporarily blocked by blocking the queue. For example, the producer produces two data per second and the consumer consumes one data per second. When the queue is full, the producer will block (suspend) and wake up after waiting for the consumer to consume.
Blocked queues can achieve a balance between producers and consumers by suspending, which is the biggest difference from ordinary queues.
How
jdk has actually provided us with an implementation scheme. java5 has added the concurrent package. The BlockingQueue in the concurrent package is a blocked queue. We don't need to care about how BlockingQueue implements blocking. Everything is encapsulated for us. We just need to be an API caller without feelings.
How to use BlockingQueue
BlockingQueue itself is just an interface that specifies the method of blocking the queue, which mainly depends on several implementation classes
1. Insert data
- (1) Offer (E): returns true if the queue is not full, and false (not blocked) if the queue is full
- (2) Offer (E, long timeout, timeunit unit): you can set the waiting time. If the queue is full, wait. If the waiting time is exceeded, false is returned
- (3) Put (E): no return value. Wait until the queue is empty
2. Obtain data
- (1) poll(): out of the queue if there is data; null if there is no data
- (2) poll(long timeout, TimeUnit unit): you can set the waiting time. If there is no data, wait. If the waiting time is exceeded, null is returned
- (3) take(): if there is data, leave the queue. If there is no data, keep waiting (blocking)
BlockingQueue mainly implements classes
Class name | data structure | characteristic |
---|---|---|
ArrayBlockingQueue | Based on the array implementation, it is a bounded queue by setting the array length during initialization | The difference between ArrayBlockingQueue and LinkedBlockingQueue is that ArrayBlockingQueue has only one lock object. One lock object will cause either the producer to obtain the lock or the consumer to obtain the lock. The two compete for the lock and cannot be in parallel. |
LinkedBlockingQueue | Based on the linked list, the size can be initialized. If it is not set, the default setting size is integer MAX_ VALUE | LinkedBlockingQueue has two lock objects that can be processed in parallel. |
DelayQueue | An unbounded queue based on priority | Queue elements must implement the Delayed interface to support Delayed acquisition. Elements are sorted by time. Consumers can take them out of the queue only after the elements expire |
PriorityBlockingQueue | An unbounded queue based on priority. The bottom layer is based on array storage elements | The elements are stored in the order of preference level. The priority is realized through the compareTo method of Comparable (NATURAL sorting). Unlike other blocked queues, it will only block consumers, not producers, and the array will continue to expand. This is a colored egg. Be careful when using it. |
SynchronousQueue | There is no container inside a special queue | When a producer produces a data, it will be blocked. The producer can only produce again after consumers consume it. It is a little inappropriate to call it a queue. In real life, it can be called a team only by multiple people, and it is unreasonable for one person to be called a team. |
Own implementation code
public class BlockingQuery { private Object[] tab; //Queue container private int takeIndex; //Out of team subscript private int putIndex; //Team subscript private int size;//Number of elements private ReentrantLock reentrantLock = new ReentrantLock(); private Condition notEmpty;//Read condition private Condition notFull;//Write condition public YzBlockingQuery(int tabCount) { if (tabCount <= 0) { new NullPointerException(); } tab = new Object[tabCount]; notEmpty = reentrantLock.newCondition(); notFull = reentrantLock.newCondition(); } public boolean offer(Object obj) { if (obj == null) { throw new NullPointerException(); } try { //Acquire lock reentrantLock.lock(); //The queue is full while (size==tab.length){ System.out.println("The queue is full"); //blocking notFull.await(); } tab[putIndex]=obj; if(++putIndex==tab.length){ putIndex=0; } size++; //Wake up read thread notEmpty.signal(); return true; } catch (Exception e) { //Wake up read thread notEmpty.signal(); } finally { reentrantLock.unlock(); } return false; } public Object take(){ try { reentrantLock.lock(); while (size==0){ System.out.println("The queue is empty"); //blocking notEmpty.await(); } Object obj= tab[takeIndex]; //If you get to the last, start from the beginning if(++takeIndex==tab.length){ takeIndex=0; } size--; //Wake up write thread notFull.signal(); return obj; }catch (Exception e){ //Wake up write thread notFull.signal(); }finally { reentrantLock.unlock(); } return null; } public static void main(String[] args) { Random random = new Random(100); YzBlockingQuery yzBlockingQuery=new YzBlockingQuery(5); Thread thread1 = new Thread(() -> { for (int i=0;i<100;i++) { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } yzBlockingQuery.offer(i); System.out.println("The producer produced:"+i); } }); Thread thread2 = new Thread(() -> { for (int i=0;i<100;i++) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Object take = yzBlockingQuery.take(); System.out.println("Consumer consumption:"+take); } }); thread1.start(); thread2.start(); } }
In practice, most situations do not need to be implemented by yourself. In actual use, you can master it and select the corresponding implementation class according to the actual situation.
LinkedBlockingQueue