preface
The previous blog briefly introduced ConcurrentHashMap, which is also one of the concurrent containers. This blog continues to introduce the other two concurrent containers, CopyOnWriteArrayList and BlockingQueue. Similarly, it will not go deep into the specific source code level, but simply talk about the common points in the source code.
CopyOnWriteArrayList
In the previous blog, we briefly mentioned that in order to replace synchronized map, concurrent HashMap is available because the granularity of synchronized map lock is too large. The granularity of concurrent HashMap lock is slightly smaller and the concurrency efficiency is higher. In the same situation, Vector and synchronized list also have the same problems. In order to solve the problems that the lock granularity of Vector and synchronized list is too large and cannot be edited during iteration, CopyOnWriteArrayList is created. CopyOnWriteArraySet is also of the same type.
In CopyOnWriteArrayList, the read is completely unlocked, and the write will not block the read operation. Only the write and write need to be synchronized and mutually exclusive
Related examples
Its core principle is actually relatively simple. CopyOnWrite essentially means the separation of reading and writing. CopyOnWriteArrayList reproduces the original List data during modification, opens up a new memory space, modifies the new data, and points the original data reference to the current new memory address after modification. During data modification, the original memory address is still read.
/** * autor:liman * createtime:2021/11/21 * comment:CopyOnWriteArrayList Use example */ @Slf4j public class CopyOnWriteArrayListUseDemo { public static void main(String[] args) { ordinalListDemo(); copyOnWriteArrayListDemo(); } public static void ordinalListDemo(){ ArrayList<String> oridinalList = new ArrayList<>(); oridinalList.add("1"); oridinalList.add("2"); oridinalList.add("3"); oridinalList.add("4"); oridinalList.add("5"); Iterator<String> iterator = oridinalList.iterator(); while (iterator.hasNext()) { System.out.println("oridinalList is " + oridinalList); String next = iterator.next();//After deleting "5", exceptions will be thrown in the next iteration System.out.println("current node is " + next); if (next.equals("2")) { //Ordinary ArrayList will throw exceptions if it modifies the collection during traversal oridinalList.remove("5");//Here, the set is modified during traversal, which will throw exceptions } } } /** * CopyOnWriteArrayList Examples of */ public static void copyOnWriteArrayListDemo(){ CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); copyOnWriteArrayList.add("1"); copyOnWriteArrayList.add("2"); copyOnWriteArrayList.add("3"); copyOnWriteArrayList.add("4"); copyOnWriteArrayList.add("5"); Iterator<String> iterator = copyOnWriteArrayList.iterator(); while (iterator.hasNext()) { System.out.println("oridinalList is " + copyOnWriteArrayList); String next = iterator.next(); System.out.println("current node is " + next); if (next.equals("2")) { //CopyOnWriteArrayList will not throw errors when traversing, and can still be modified normally copyOnWriteArrayList.remove("5"); } if(next.equals("3")){ copyOnWriteArrayList.add("new node"); } } } }
The second method running example in the above code is shown below
oridinalList is [1, 2, 3, 4, 5] current node is 1 oridinalList is [1, 2, 3, 4, 5] current node is 2 oridinalList is [1, 2, 3, 4] current node is 3 oridinalList is [1, 2, 3, 4, new node] current node is 4 oridinalList is [1, 2, 3, 4, new node] current node is 5
Note that in the last row of output, after editing the CopyOnWriteArrayList, the current node data output is still 5, that is, the modified data does not take effect immediately. It seems that although the data has been modified, the original data is still read.
Second code example
/** * autor:liman * createtime:2021/11/21 * comment:Data determinants of multiple iterators * The data that the iterator can traverse depends on the time when the iterator is initialized */ @Slf4j public class CopyOnWriteArrayListMultiIterDemo { public static void main(String[] args) { CopyOnWriteArrayList<Integer> nums = new CopyOnWriteArrayList<Integer>(new Integer[]{1,2,3}); Iterator<Integer> iteratorOne = nums.iterator(); System.out.println(nums);//1,2,3 nums.add(5); System.out.println(nums);//1,2,3,5 Iterator<Integer> iteratorTwo = nums.iterator(); iteratorOne.forEachRemaining(System.out::print);//1,2,3 System.out.println(); iteratorTwo.forEachRemaining(System.out::print);//1,2,3,5 } }
Related source code
The add method
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */ public boolean add(E e) { //ReentrantLock is used final ReentrantLock lock = this.lock; lock.lock(); try { //Gets the data and length in the original collection Object[] elements = getArray(); int len = elements.length; //Copy the original data to a new collection Object[] newElements = Arrays.copyOf(elements, len + 1); //New data newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
The get method
public E get(int index) { return get(getArray(), index); } //Simple direct return, no locking logic private E get(Object[] a, int index) { return (E) a[index]; }
get returns directly with theout any locking logic
BlockingQueue
BlockingQueue is a blocking queue. Compared with ordinary queues, there is a possible blocking difference between reading data and storing data. There are several common queues in Java.
Several methods of blocking queue
This figure only lists several commonly used queues.
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-xyydxbap-163809136590) (F: \ blog_doc \ blogpic \ sync & JUC learn \ image-202111242008 34327. PNG)]
Concurrent linkedqueue is a non blocking queue, which can also ensure thread safety.
The so-called blocking queue is actually a queue with blocking function. It is thread safe. It is usually used for producers to store data from one end and consumers to read data from the other end (it seems that this is the usage in most scenarios of the queue). The so-called blocking queue means that if the queue is empty, consumers will be blocked from reading data from the queue; If the queue data is full, the producer will be blocked from storing data in the queue.
The following methods are usually used in blocking queues
put,take | These two will block, put blocks when the queue is full and take blocks when the queue is empty |
---|---|
add,remove,element | add, if the queue is full, throw the exception directly remove. If the queue is empty, throw an exception directly Element, returns the queue header element. If it is empty, throw an exception |
offer,poll,peek | offer, add, return false when the queue is full poll, take out the header element, delete the header element in the queue, and return null if the queue is empty peek, returns the header element, but does not delete the extracted element |
Typical blocking queue
The commonly used blocking queues are ArrayBlockingQueue, LinkedBlockingQueue, synchronous queue and PriorityBlockingQueue. Here is a brief introduction to the first two
ArrayBlockingQueue
ArrayBlockingQueue is a bounded queue. During initialization, you need to specify the capacity and whether it is fair. If it is specified as fair, the data waiting in the queue for the longest time will be processed first.
/** * autor:liman * createtime:2021/11/24 * comment:Bounded. Capacity needs to be specified. Fair or unfair can be specified * Example: 10 interviewers, one interviewer and one lounge have three positions */ @Slf4j public class ArrayBlockingQueueDemo { public static void main(String[] args) { //The ArrayBlockingQueue here is similar to three chairs ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3); Interviewer interviewer = new Interviewer(arrayBlockingQueue); Consumer consumer = new Consumer(arrayBlockingQueue); new Thread(interviewer).start(); new Thread(consumer).start(); } } //The simulation is the interviewer class Interviewer implements Runnable { private BlockingQueue queue; public Interviewer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("10 All the candidates are here"); for (int i = 0; i < 10; i++) { String candidate = "Candidate" + (i+1); try { //Enter the queue and wait for the interview queue.put(candidate); System.out.println("candidate" + (i + 1) + "Waiting for interview"); } catch (InterruptedException e) { e.printStackTrace(); } } try { //End flag bit queue.put("allin"); } catch (InterruptedException e) { e.printStackTrace(); } } } //Consumers simulate interviewers class Consumer implements Runnable { BlockingQueue<String> queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { //Simulated interview time-consuming try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String msg = ""; try { while (!((msg = queue.take()).equals("allin"))) { System.out.println(msg + "Being interviewed"); } System.out.println("All candidates have been interviewed"); } catch (InterruptedException e) { e.printStackTrace(); } } }
Its source code is as follows
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //If it's full, start waiting while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
LinkedBlockingQueue
This is an unbounded blocking queue. It is not really unbounded, but has a capacity of Integer.MAX_VALUE means that the producer will not block to a large extent. The underlying data structure is a linked list. At the same time, it maintains two locks, a putLock and a takeLock.
put source code
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); //Get put lock final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //Queue full, waiting while (count.get() == capacity) { notFull.await(); } //Join the queue and modify the queue length enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
take source code
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; //Get takeLock final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //If empty, wait while (count.get() == 0) { notEmpty.await(); } //Exit the queue and modify the queue length x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
Other blocking queues
PriorityBlockingQueue can be simply understood as the thread safe version of PriorityQueue. It supports priority. When the capacity is insufficient, it will be expanded and notified to support sorting according to the compareTo results of elements.
There are no peek and other functions in the SynchronousQueue, because peek is the head node, but the capacity of SynchronousQueue is 0, so there is no peek method. The newCachedThreadPool thread pool uses this queue as the task queue.
summary
After briefly combing the contents of CopyOnWriteArrayList and BlockingQueue, CopyOnWriteArrayList is suitable for scenarios with more reading and less writing. BlockingQueue is actually used not only for producer consumer mode, but also for task queues in thread pool.