Introduction and use of thread safe Collection related classes

Original 1: https://blog.csdn.net/lkp1603645756/article/details/85016035

Original 2: https://www.cnblogs.com/shamao/p/11065885.html

1 Introduction

When we use multithreading, we sometimes use collections, but when we read and write multithreaded collections, we will report the error of concurrent modificationexception. The reason is that we are not allowed to read and write at the same time.

2. Introduction to collection classes under concurrent contract

The following is Java util. Class diagram of collection composition structure under concurrent package

3 characteristics of thread safe collection

The following collection unification features are introduced: thread safety and support for concurrent operations

Non blocking queue (the queue has no data, the operation queue generates an exception or returns null, which does not have the characteristics of waiting / blocking)

  • ConcurrentHashMap: compared with thread safe HashTable, the advantage is that HashTable does not support structural adjustment (addition and deletion) in the iterator, otherwise there will be ConcurrentModificationException. The design of segment lock is adopted. A HashMap is divided into n segments, and the hashCode of key is used to determine the field assigned to it. There is a race relationship only in the same segment. Each segment is equivalent to a HashTable, and the execution efficiency is equivalent to N times.

  • ConcurrentSkipListMap: supports sorting.

  • ConcurrentSkipListSet: supports sorting and does not allow duplicate elements.

Note: for the above two, the implementation of sorting requires that the objects in the collection implement the Comparable interface, and the non repeated implementation is to rewrite the hashCode and equals methods

  • ConcurrentLinkedQueue: queue operation (only the head of the queue), poll() / peek() / element()

  • ConcurrentLinkedDeque: double ended queue (supporting operation of queue head and tail), pollFirst() / pollLast()

  • CopyOnWriteArrayList: applicable to read operation > > write operation. Copy on write, that is, if you need to change the contents of CopyOnWriteArrayList, first copy a new List and modify it on the new List, and finally point the reference of the original List to the new List. Thread traverses safely, because if another thread modifies the List during traversal, it will actually copy the modification on a new List without affecting the List currently being traversed. ArrayList non thread safe

  • CopyOnWriteArraySet: HashSet non thread safe

Blocking queue (empty queue needs to wait until there are elements, and full queue needs to wait until there is space)

  • ArrayBlockingQueue: bounded blocking queue

  • LinkedBlockingQueue: unbounded blocking queue, based on one-way linked list

  • PriorityBlockingQueue: unbounded and orderly blocking queue, based on array

  • SynchronousQueue: synchronization queue. Insert needs to wait for removal, and remove needs to wait for insertion

  • DelayQueue: a queue that delays the execution of tasks. The collection elements need to implement Java util. concurrent. Delayed interface

  • The functionality of the method is similar to that of the method of linktransferqueue()

What is a CopyOnWrite container

The CopyOnWrite container is the container that is copied when writing. The popular understanding is that when we add elements to a container, we do not directly add them to the current container, but first Copy the current container, Copy a new container, then add elements to the new container, and then point the reference of the original container to the new container after adding elements. The advantage of this is that we can read the CopyOnWrite container concurrently without locking, because the current container will not add any elements. Therefore, the CopyOnWrite container is also an idea of separating reading and writing. Reading and writing are different containers.

4 detailed introduction and use of thread safety class

  • CopyOnWriteArrayList: implements the List interface, which is equivalent to thread safe ArrayList.

  • CopyOnWriteArraySet: inherits from AbstractSet class and is equivalent to thread safe HashSet. CopyOnWriteArraySet contains a CopyOnWriteArrayList object, which is implemented through CopyOnWriteArrayList.

  • ConcurrentSkipListSet: inherits from AbstractSet class and is equivalent to thread safe TreeSet. The ConcurrentSkipListSet is implemented through the ConcurrentSkipListMap.

  • ArrayBlockingQueue: inherited from AbstractQueue class, it is a thread safe and bounded blocking queue implemented by array.

  • LinkedBlockingQueue: inherited from AbstractQueue class, it is a (specified size) blocking queue implemented by one-way linked list. The queue sorts elements according to FIFO (first in first out).

  • LinkedBlockingDeque: inheriting from AbstractQueue class, it is a two-way concurrent blocking queue (with a specified size) implemented by a two-way linked list. The blocking queue supports both FIFO and FILO operation modes.

  • ConcurrentLinkedQueue: inheriting from AbstractQueue class, it is an unbounded queue implemented by one-way linked list. The queue sorts elements according to FIFO (first in first out).

  • ConcurrentLinkedDeque: inheriting from AbstractQueue class, it is an unbounded queue implemented by two-way linked list. This queue supports both FIFO and FILO operation modes.

CopyOnWriteArrayList

explain

  • CopyOnWriteArrayList has a "volatile array" inside to hold data. When "adding / modifying / deleting" data, a new array will be created, and the updated data will be copied to the new array. Finally, the array will be assigned to "volatile array", which is why it is called CopyOnWriteArrayList. CopyOnWriteArrayList is a dynamic array implemented in this way. However, because it creates an array when "adding / modifying / deleting" data, it involves the operation of modifying data. CopyOnWriteArrayList is very inefficient, but it is more efficient if it only performs traversal search.

  • CopyOnWriteArrayList saves data through "volatile array". When a thread reads the volatile array, it can always see the last write of the volatile variable by other threads. In this way, the mechanism of "the read data is always the latest" is provided through volatile
    Guarantee.

  • CopyOnWriteArrayList protects data through mutexes. When "adding / modifying / deleting" data, it will first "obtain the mutex", and then update the data to the "volatile array" after modification, and then "release the mutex". In this way, the purpose of protecting data is achieved.

  • Traversal using iterators is fast and does not conflict with other threads. When constructing iterators, iterators rely on invariant array snapshots. Iterators support immutable operations such as hasNext(), next(), but do not support variable operations such as add(), remove().

Construction method

public CopyOnWriteArrayList() {
    setArray(new Object[0]);
}

public CopyOnWriteArrayList(E[] toCopyIn) {
    setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

public CopyOnWriteArrayList(Collection<? extends E> c) {
    Object[] elements;
    if (c.getClass() == CopyOnWriteArrayList.class)
        elements = ((CopyOnWriteArrayList<?>)c).getArray();
    else {
        elements = c.toArray();
        // c.toArray might (incorrectly) not return Object[] (see 6260652)
        if (elements.getClass() != Object[].class)
            elements = Arrays.copyOf(elements, elements.length, Object[].class);
    }
    setArray(elements);
}

Method to get and set array

Array is an array modified by volatile and transient.
As for volatile keyword, we know that "volatile can make variables visible", that is, when reading a volatile variable, we can always see the last write to the volatile variable (by any thread). Because of this feature, every time the "volatile array" is updated, other threads can see the updates made to it.
As for the transient keyword, it only works in serialization, and the transient variable will not be serialized automatically.

private transient volatile Object[] array;

final Object[] getArray() {
    return array;
}

final void setArray(Object[] a) {
    array = a;
}

Add element

Because array array is decorated with volatile and cannot guarantee thread safety, lock is used to ensure thread safety when adding elements.

Because array array is decorated with volatile, it can ensure that other threads can see the newly added elements after calling setArray() method.

public void add(int index, E element) {
    // Use locks to ensure thread safety.
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // Get the reference address pointed to by array.
        Object[] elements = getArray();
        int len = elements.length;
        // If the specified position is out of bounds, an exception is thrown.
        if (index > len || index < 0)
            throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);
        Object[] newElements;
        // If the insertion position is the end.
        int numMoved = len - index;
        if (numMoved == 0)
            // Copy the original array and expand a capacity.
            newElements = Arrays.copyOf(elements, len + 1);
        else {
            // If not inserted to the end, create an array that expands the capacity.
            newElements = new Object[len + 1];
            // Copy the original array in segments and leave the specified location empty.
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index + 1, numMoved);
        }
        // Sets the specified element at the specified location.
        newElements[index] = element;
        // Point the address referenced by array to the new array.
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}

Delete element

To delete an element is to delete the element at the specified position in the array.

Its implementation is that if the last element is deleted, it can be deleted directly through arrays Copyof() without creating a new array. Otherwise, create a new array, and then copy other elements except the deleted elements in the array to the new array. Finally, assign the new array to the array array.

public E remove(int index) {
    // Use locks to ensure thread safety.
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // Get the reference address pointed to by array.
        Object[] elements = getArray();
        int len = elements.length;
        // Gets the element based on the specified location.
        E oldValue = get(elements, index);
        // If the specified element is the last element.
        int numMoved = len - index - 1;
        if (numMoved == 0)
            // Copy and intercept the original array and point the reference address of array to the new array.
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            // If it is not the last element, create an array that reduces its capacity by one.
            Object[] newElements = new Object[len - 1];
            // Copy the original array in segments and leave the specified location empty.
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index, numMoved);
            // Point the reference address of array to the new array.
            setArray(newElements);
        }
        // Returns the element at that location.
        return oldValue;
    } finally {
        lock.unlock();
    }
}

Get element

Getting the element is very simple, that is, returning the element at the specified position of the array.

public E get(int index) {
    return get(getArray(), index);
}

private E get(Object[] a, int index) {
    return (E) a[index];
}

Set element

Before setting the element, judge whether the old element at the specified position is equal to the new element. If it is equal, it will not be replaced, but the setArray() method will still be called.

public E set(int index, E element) {
    // Use locks to ensure thread safety.
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // Get the reference address pointed to by array.
        Object[] elements = getArray();
        // Gets the old element at the specified location.
        E oldValue = get(elements, index);
        // If the old element is different from the new element.
        if (oldValue != element) {
            // Create a new array and copy the value of array array to replace the element at the specified position of the new array.
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len);
            newElements[index] = element;
            // Point the reference address of array to the new array
            setArray(newElements);
        } else {
            // In order to ensure the semantics of volatile, any read operation should be the structure of write operation. Therefore, although the write operation does not change the data, the set method is called. Of course, this is only a semantic description, and it is possible to remove it.
            setArray(elements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

ergodic

The iterative method of CopyOnWriteArrayList class returns an object of COWIterator class.

public Iterator<E> iterator() {
     return new COWIterator<E>(getArray(), 0);
}

CopyOnWriteArrayList maintains a COWIterator class for traversal in the class, which implements the ListIterator interface.

static final class COWIterator<E> implements ListIterator<E> {
    // Snapshot of the array.
    private final Object[] snapshot;
    // Specifies the subscript.
    private int cursor;

    // Construction method.
    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }

    // Determines whether the next element exists.
    public boolean hasNext() {
        return cursor < snapshot.length;
    }

    // Determines whether a previous element exists.
    public boolean hasPrevious() {
        return cursor > 0;
    }

    // Gets the next element.
    @SuppressWarnings("unchecked")
    public E next() {
        if (! hasNext())
            throw new NoSuchElementException();
        return (E) snapshot[cursor++];
    }

    // Gets the previous element.
    @SuppressWarnings("unchecked")
    public E previous() {
        if (! hasPrevious())
            throw new NoSuchElementException();
        return (E) snapshot[--cursor];
    }

    // Gets the location of the next element.
    public int nextIndex() {
        return cursor;
    }

    // Gets the location of the previous element.
    public int previousIndex() {
        return cursor-1;
    }

    // Deleting elements is not supported.
    public void remove() {
        throw new UnsupportedOperationException();
    }

    // Modifying elements is not supported.
    public void set(E e) {
        throw new UnsupportedOperationException();
    }

    // Adding elements is not supported.
    public void add(E e) {
        throw new UnsupportedOperationException();
    }

    // JDK1.8. The new method uses all the elements of the Iterator iterator, and the second time it is called, it will not do anything.
    @Override
    public void forEachRemaining(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        Object[] elements = snapshot;
        final int size = elements.length;
        for (int i = cursor; i < size; i++) {
            @SuppressWarnings("unchecked") E e = (E) elements[i];
            action.accept(e);
        }
        cursor = size;
    }
}

ArrayBlockingQueue

explain

  • ArrayBlockingQueue stores data internally through the Object [] array, which means that ArrayBlockingQueue is essentially implemented through the array. The size of ArrayBlockingQueue, that is, the capacity of the array, is specified when creating ArrayBlockingQueue.
  • ArrayBlockingQueue and ReentrantLock are combined. ArrayBlockingQueue contains a ReentrantLock object. ReentrantLock is a reentrant mutex. ArrayBlockingQueue implements "mutex access of multiple threads to competing resources" according to the mutex. Moreover, ReentrantLock is divided into fair lock and unfair lock. You can specify whether to use fair lock or unfair lock when creating ArrayBlockingQueue. ArrayBlockingQueue will use unfair lock by default.
  • ArrayBlockingQueue and Condition are combined. ArrayBlockingQueue contains two Condition objects. Moreover, the Condition also depends on the ArrayBlockingQueue. Through the Condition, more accurate access to the ArrayBlockingQueue can be realized.

Construction method

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);
    // Locking is to ensure visibility, because there may be other threads that modify the collection after initialization.
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

Add element

ArrayBlockingQueue provides two ways to add elements: offer() method and put() method.

If the offer() method fails to be added, it will immediately return false, and it is not allowed to be interrupted by other threads during the addition process.

If the put() method fails to be added, it will wait, and can be interrupted by other threads during the addition process, throwing an InterruptedException exception.

// It is not allowed to be interrupted by other threads. If the addition fails, false will be returned immediately.
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

// It is allowed to be interrupted by other threads, throw InterruptedException, and wait for adding failure.
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

// In fact, the addition method will wake up a thread waiting to delete the element after the addition is successful.
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

Delete element

ArrayBlockingQueue provides two ways to delete elements: poll() method and take() method.

If the poll() method fails to delete, it will immediately return false, and it is not allowed to be interrupted by other threads during the addition process.

If the take() method fails to delete, it will wait, and can be interrupted by other threads during the deletion process, throwing an InterruptedException exception.

// It is not allowed to be interrupted by other threads. If the deletion fails, null will be returned immediately.
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

// It is allowed to be interrupted by other threads, throw InterruptedException, and wait for deletion failure.
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

// In fact, the deletion method wakes up a thread waiting to add elements after the deletion is successful.
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

LinkedBlockingQueue

explain

  • LinkedBlockingQueue is a blocking queue implemented by one-way linked list. The queue sorts the elements according to FIFO (first in first out). The new element is inserted into the tail of the queue, and the queue acquisition operation will obtain the element at the head of the queue. The throughput of linked queues is usually higher than that of array based queues, but in most concurrent applications, its predictable performance is lower.
  • LinkedBlockingQueue is optional (to prevent excessive expansion), that is, the capacity of the queue can be specified. If not specified, the default capacity size is equal to integer MAX_ VALUE.
  • LinkedBlockingQueue implements the BlockingQueue interface, which supports multithreading concurrency. When multiple threads compete for the same resource, after a thread obtains the resource, other threads need to block and wait.
  • LinkedBlockingQueue uses different locks for insertion and extraction operations when realizing the mutually exclusive access of multiple threads to competing resources. In addition, the insertion lock putLock is associated with the non full condition notFull, and the extraction lock takeLock is associated with the non empty condition notEmpty. More delicate control lock through notFull and notEmpty.

attribute

head Is the header of the linked list. When taking out data, it is from the header head Insert at.
last Is the end of the linked list. When adding data, it is from the end of the table last Insert at.
count Is the actual size of the linked list, that is, the number of nodes contained in the current linked list.
capacity Is the capacity of the list, which is specified when creating the linked list.
putLock It's a plug-in lock.
takeLock Is to take out the lock.
notEmpty Is a non null condition.
notFull Yes or no.

Construction method

1 // Create an integer MAX_ Value's LinkedBlockingQueue.
2 LinkedBlockingQueue()
3 // Create a LinkedBlockingQueue with the specified capacity.
4 LinkedBlockingQueue(int capacity)
5 // Create a capacity that is integer MAX_ The LinkedBlockingQueue of value initially contains the elements of a given collection, and the elements are added according to the traversal order of the collection iterator.
6 LinkedBlockingQueue(Collection<? extends E> c)

Other methods

// Insert the specified element at the end of this queue and wait if the queue is full.
void put(E e)
// Inserts the specified element at the end of this queue, and returns false if the queue is full.
boolean offer(E e)
// Insert the specified element at the end of this queue. If the queue is full, wait for the specified time.
boolean offer(E e, long timeout, TimeUnit unit)
// Gets and removes the header of this queue. If the queue is empty, wait.
E take()
// Gets and removes the header of this queue, and returns null if the queue is empty.
E poll()
// Gets and removes the header of this queue. If the queue is empty, wait for the specified time.
E poll(long timeout, TimeUnit unit)
// Gets but does not remove the header of the queue. If the queue is empty, null is returned.
E peek()
// Returns the iterator that iterates over the elements in the queue in the appropriate order.
Iterator<E> iterator()

ConcurrentLinkedQueue

explain

  • ConcurrentLinkedQueue is a thread safe queue, which is suitable for "high concurrency" scenarios. Concurrent linkedqueue uses CAS to ensure the thread safety of updates. It is a non blocking queue.
  • ConcurrentLinkedQueue is an unbounded thread safe queue based on linked list, which sorts elements according to FIFO (first in first out) principle. null elements cannot be placed in queue elements (except for special nodes implemented internally).

Construction method

1 // Create a concurrent linkedqueue that is initially empty.
2 ConcurrentLinkedQueue()
3 // Create a ConcurrentLinkedQueue that initially contains the given collection element, and add the elements according to the traversal order of the collection iterator.
4 ConcurrentLinkedQueue(Collection<? extends E> c)

Other methods

// Inserts the specified element at the end of this queue.
boolean offer(E e)
// Gets and removes the header of this queue, and returns null if the queue is empty.
E poll()
// Gets but does not remove the header of this queue. If the queue is empty, null is returned.
E peek()
// Returns the iterator that iterates in the appropriate order on this queue element.
Iterator<E> iterator()
// Returns the number of elements in this queue.
int size()

Keywords: Java Multithreading

Added by fiddlehead_cons on Wed, 09 Feb 2022 03:13:53 +0200