Learning Java concurrency tools -- talk about CopyOnWriteArrayList and BlockingQueue

preface

The previous blog briefly introduced ConcurrentHashMap, which is also one of the concurrent containers. This blog continues to introduce the other two concurrent containers, CopyOnWriteArrayList and BlockingQueue. Similarly, it will not go deep into the specific source code level, but simply talk about the common points in the source code.

CopyOnWriteArrayList

In the previous blog, we briefly mentioned that in order to replace synchronized map, concurrent HashMap is available because the granularity of synchronized map lock is too large. The granularity of concurrent HashMap lock is slightly smaller and the concurrency efficiency is higher. In the same situation, Vector and synchronized list also have the same problems. In order to solve the problems that the lock granularity of Vector and synchronized list is too large and cannot be edited during iteration, CopyOnWriteArrayList is created. CopyOnWriteArraySet is also of the same type.

In CopyOnWriteArrayList, the read is completely unlocked, and the write will not block the read operation. Only the write and write need to be synchronized and mutually exclusive

Related examples

Its core principle is actually relatively simple. CopyOnWrite essentially means the separation of reading and writing. CopyOnWriteArrayList reproduces the original List data during modification, opens up a new memory space, modifies the new data, and points the original data reference to the current new memory address after modification. During data modification, the original memory address is still read.

/**
 * autor:liman
 * createtime:2021/11/21
 * comment:CopyOnWriteArrayList Use example
 */
@Slf4j
public class CopyOnWriteArrayListUseDemo {

    public static void main(String[] args) {
        ordinalListDemo();
        copyOnWriteArrayListDemo();
    }

    public static void ordinalListDemo(){
        ArrayList<String> oridinalList = new ArrayList<>();
        oridinalList.add("1");
        oridinalList.add("2");
        oridinalList.add("3");
        oridinalList.add("4");
        oridinalList.add("5");
        Iterator<String> iterator = oridinalList.iterator();
        while (iterator.hasNext()) {
            System.out.println("oridinalList is " + oridinalList);
            String next = iterator.next();//After deleting "5", exceptions will be thrown in the next iteration
            System.out.println("current node is " + next);
            if (next.equals("2")) {
                //Ordinary ArrayList will throw exceptions if it modifies the collection during traversal
                oridinalList.remove("5");//Here, the set is modified during traversal, which will throw exceptions
            }
        }
    }

    /**
     * CopyOnWriteArrayList Examples of
     */
    public static void copyOnWriteArrayListDemo(){
        CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
        copyOnWriteArrayList.add("1");
        copyOnWriteArrayList.add("2");
        copyOnWriteArrayList.add("3");
        copyOnWriteArrayList.add("4");
        copyOnWriteArrayList.add("5");
        Iterator<String> iterator = copyOnWriteArrayList.iterator();
        while (iterator.hasNext()) {
            System.out.println("oridinalList is " + copyOnWriteArrayList);
            String next = iterator.next();
            System.out.println("current node is " + next);
            if (next.equals("2")) {
                //CopyOnWriteArrayList will not throw errors when traversing, and can still be modified normally
                copyOnWriteArrayList.remove("5");
            }

            if(next.equals("3")){
                copyOnWriteArrayList.add("new node");
            }
        }
    }
}

The second method running example in the above code is shown below

oridinalList is [1, 2, 3, 4, 5]
current node is 1
oridinalList is [1, 2, 3, 4, 5]
current node is 2
oridinalList is [1, 2, 3, 4]
current node is 3
oridinalList is [1, 2, 3, 4, new node]
current node is 4
oridinalList is [1, 2, 3, 4, new node]
current node is 5

Note that in the last row of output, after editing the CopyOnWriteArrayList, the current node data output is still 5, that is, the modified data does not take effect immediately. It seems that although the data has been modified, the original data is still read.

Second code example

/**
 * autor:liman
 * createtime:2021/11/21
 * comment:Data determinants of multiple iterators
 * The data that the iterator can traverse depends on the time when the iterator is initialized
 */
@Slf4j
public class CopyOnWriteArrayListMultiIterDemo {

    public static void main(String[] args) {
        CopyOnWriteArrayList<Integer> nums = new CopyOnWriteArrayList<Integer>(new Integer[]{1,2,3});
        Iterator<Integer> iteratorOne = nums.iterator();
        System.out.println(nums);//1,2,3
        nums.add(5);
        System.out.println(nums);//1,2,3,5
        Iterator<Integer> iteratorTwo = nums.iterator();
        iteratorOne.forEachRemaining(System.out::print);//1,2,3
        System.out.println();
        iteratorTwo.forEachRemaining(System.out::print);//1,2,3,5
    }
}

Related source code

The add method

/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return {@code true} (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    //ReentrantLock is used
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //Gets the data and length in the original collection
        Object[] elements = getArray();
        int len = elements.length;
        //Copy the original data to a new collection
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        //New data
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

The get method

public E get(int index) {
    return get(getArray(), index);
}
//Simple direct return, no locking logic
private E get(Object[] a, int index) {
    return (E) a[index];
}

get returns directly with theout any locking logic

BlockingQueue

BlockingQueue is a blocking queue. Compared with ordinary queues, there is a possible blocking difference between reading data and storing data. There are several common queues in Java.

Several methods of blocking queue

This figure only lists several commonly used queues.

[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-xyydxbap-163809136590) (F: \ blog_doc \ blogpic \ sync & JUC learn \ image-202111242008 34327. PNG)]

Concurrent linkedqueue is a non blocking queue, which can also ensure thread safety.

The so-called blocking queue is actually a queue with blocking function. It is thread safe. It is usually used for producers to store data from one end and consumers to read data from the other end (it seems that this is the usage in most scenarios of the queue). The so-called blocking queue means that if the queue is empty, consumers will be blocked from reading data from the queue; If the queue data is full, the producer will be blocked from storing data in the queue.

The following methods are usually used in blocking queues

put,takeThese two will block,
put blocks when the queue is full and take blocks when the queue is empty
add,remove,elementadd, if the queue is full, throw the exception directly
remove. If the queue is empty, throw an exception directly
Element, returns the queue header element. If it is empty, throw an exception
offer,poll,peekoffer, add, return false when the queue is full
poll, take out the header element, delete the header element in the queue, and return null if the queue is empty
peek, returns the header element, but does not delete the extracted element

Typical blocking queue

The commonly used blocking queues are ArrayBlockingQueue, LinkedBlockingQueue, synchronous queue and PriorityBlockingQueue. Here is a brief introduction to the first two

ArrayBlockingQueue

ArrayBlockingQueue is a bounded queue. During initialization, you need to specify the capacity and whether it is fair. If it is specified as fair, the data waiting in the queue for the longest time will be processed first.

/**
 * autor:liman
 * createtime:2021/11/24
 * comment:Bounded. Capacity needs to be specified. Fair or unfair can be specified
 * Example: 10 interviewers, one interviewer and one lounge have three positions
 */
@Slf4j
public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        //The ArrayBlockingQueue here is similar to three chairs
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        Interviewer interviewer = new Interviewer(arrayBlockingQueue);
        Consumer consumer = new Consumer(arrayBlockingQueue);
        new Thread(interviewer).start();
        new Thread(consumer).start();
    }
}

//The simulation is the interviewer
class Interviewer implements Runnable {
    private BlockingQueue queue;

    public Interviewer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("10 All the candidates are here");
        for (int i = 0; i < 10; i++) {
            String candidate = "Candidate" + (i+1);
            try {
                //Enter the queue and wait for the interview
                queue.put(candidate);
                System.out.println("candidate" + (i + 1) + "Waiting for interview");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            //End flag bit
            queue.put("allin");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//Consumers simulate interviewers
class Consumer implements Runnable {

    BlockingQueue<String> queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        //Simulated interview time-consuming
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String msg = "";
        try {
            while (!((msg = queue.take()).equals("allin"))) {
                System.out.println(msg + "Being interviewed");
            }
            System.out.println("All candidates have been interviewed");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Its source code is as follows

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    	//If it's full, start waiting
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

LinkedBlockingQueue

This is an unbounded blocking queue. It is not really unbounded, but has a capacity of Integer.MAX_VALUE means that the producer will not block to a large extent. The underlying data structure is a linked list. At the same time, it maintains two locks, a putLock and a takeLock.

put source code

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
	//Get put lock
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
		 //Queue full, waiting
        while (count.get() == capacity) {
            notFull.await();
        }
		//Join the queue and modify the queue length
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

take source code

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    //Get takeLock
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //If empty, wait
        while (count.get() == 0) {
            notEmpty.await();
        }
        //Exit the queue and modify the queue length
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

Other blocking queues

PriorityBlockingQueue can be simply understood as the thread safe version of PriorityQueue. It supports priority. When the capacity is insufficient, it will be expanded and notified to support sorting according to the compareTo results of elements.

There are no peek and other functions in the SynchronousQueue, because peek is the head node, but the capacity of SynchronousQueue is 0, so there is no peek method. The newCachedThreadPool thread pool uses this queue as the task queue.

summary

After briefly combing the contents of CopyOnWriteArrayList and BlockingQueue, CopyOnWriteArrayList is suitable for scenarios with more reading and less writing. BlockingQueue is actually used not only for producer consumer mode, but also for task queues in thread pool.

Keywords: Java Back-end

Added by tbales on Tue, 30 Nov 2021 12:53:21 +0200