[Learning Notes - Java Collection - 7] Map - Concurrent HashMap Source Analysis

brief introduction

Concurrent HashMap is a thread-safe version of HashMap. It also uses the structure of array + linked list + red-black tree to store elements.

Compared with HashTable with the same thread security, efficiency and other aspects have been greatly improved.

A Brief Introduction to the Use of Locks

Here is a brief introduction to the various locks, so that I can have an impression when I talk about the relevant concepts below.

synchronized
The keyword in java is implemented as monitor lock internally, which is mainly expressed by the field of object monitor in object header.
synchronized has been optimized a lot since the old version. There are three ways to run it: biased lock, lightweight lock and heavyweight lock.
Biased lock refers to a synchronization code that has been accessed by a thread, which automatically acquires the lock and reduces the cost of acquiring the lock.
Lightweight locks are accessed by another thread when the lock is biased. The biased locks will be upgraded to lightweight locks. This thread will try to acquire locks by spinning, without blocking and improving performance.
Heavy-duty lock refers to when the lock is a lightweight lock, when the spinning thread spins a certain number of times, and has not yet acquired the lock, it will enter the blocking state. The lock is upgraded to a heavy-duty lock. Heavy-duty lock will block other threads and reduce performance.

CAS
CAS, Compare And Swap, is an optimistic lock. It believes that concurrent operations on the same data may not necessarily be modified. When updating data, try to update data, and if it fails, keep trying.

volatile (unlocked)
Keywords in java, when multiple threads access the same variable, one thread modifies the value of the variable, and other threads can immediately see the modified value.
Volatile only guarantees visibility, not atomicity. For example, the variable I modified by volatile is not guaranteed to be correct for i++ operation, because i++ operation is a two-step operation, equivalent to i = i +1, read first, and add 1. In this case, volatile can not be guaranteed.

Spin lock
Spinlock refers to the way that the thread trying to acquire the lock is not blocked, but the way of loops is constantly trying. This advantage is to reduce the unlock caused by context switching of threads and improve performance. The disadvantage is that loops consume CPU.

Sectional lock
Segmented lock is a design idea of lock. It refines the granularity of lock. It is mainly used in Concurrent HashMap to achieve efficient concurrent operation. When the operation does not need to update the whole array, only one item of the lock array can be used.

ReentrantLock
Re-entrant locks refer to the automatic acquisition of locks when a thread tries to acquire locks after acquiring them. The advantage of re-entrant locks is to avoid deadlocks, and synchronized locks are also re-entrant.

Source code analysis

Construction method

public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
            MAXIMUM_CAPACITY :
            tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

Comparing with HashMap, we can find that instead of threshold s and loadFactor in HashMap, we use sizeCtl to control it, and only store the capacity in it. The official explanation is as follows:

  1. "-1" indicates that a thread is initializing
  2. - (1 + nThreads), indicating that n threads are expanding together
  3. 0, default value, then use default capacity for real initialization
  4. "> 0", the next expansion threshold after initialization or expansion is completed

Adding Elements


public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // Neither key nor value can be null
    if (key == null || value == null) throw new NullPointerException();
    // Calculate hash value
    int hash = spread(key.hashCode());
    // Number of elements in the bucket in which the element to insert is located
    int binCount = 0;
    // Dead loop, used in conjunction with CAS (if CAS fails, the whole bucket will be retrieved for the following process)
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // If the bucket is not initialized or the number of buckets is 0, the bucket is initialized
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // If the barrel in which the element to be inserted does not have an element, insert the element into the barrel.
            if (casTabAt(tab, i, null,
                    new Node<K,V>(hash, key, value, null)))
                // If you insert elements with CAS and find that there are already elements, you go to the next loop and re-operate.
                // If CAS is used to insert elements successfully, break jumps out of the loop and the process ends
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // If the hash of the first element of the bucket in which the element to be inserted is MOVED, the current thread helps migrate the element together
            tab = helpTransfer(tab, f);
        else {
            // If the bucket is not empty and does not migrate elements, lock the bucket (segmented lock)
            // And find out if the element you want to insert is in the bucket
            // If it exists, the replacement value (onlyIfAbsent=false)
            // If it does not exist, it is inserted at the end of the list or into the tree.
            V oldVal = null;
            synchronized (f) {
                // Check again if the first element has changed, and if so, go into the next cycle and start over from scratch.
                if (tabAt(tab, i) == f) {
                    // If the hash value of the first element is greater than or equal to 0 (indicating that it is not migrating, nor is it a tree)
                    // That is, elements in buckets are stored in linked lists.
                    if (fh >= 0) {
                        // The number of elements in a bucket is assigned to 1
                        binCount = 1;
                        // Traverse the whole bucket, and binCount plus 1 at each end
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                // If this element is found, a new value (onlyIfAbsent=false) is assigned.
                                // And exit the cycle
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // If no element is found at the end of the list
                                // Insert it at the end of the list and exit the loop
                                pred.next = new Node<K,V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        // If the first element is a tree node
                        Node<K,V> p;
                        // The number of elements in the barrel is assigned to 2
                        binCount = 2;
                        // Call the insertion method of the red-black tree to insert elements
                        // If successful insertion returns null
                        // Otherwise, return to the found node
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            // If this element is found, a new value (onlyIfAbsent=false) is assigned.
                            // And exit the cycle
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // If the binCount is not zero, it indicates that the element has been successfully inserted or found.
            if (binCount != 0) {
                // If the number of linked list elements reaches 8, try tree
                // Because when the element is inserted into the tree above, the binCount assigns only 2, not counting the number of elements in the whole tree.
                // So there's no repetition of tree
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // If the element to be inserted already exists, the old value is returned
                if (oldVal != null)
                    return oldVal;
                // Exit the outer loop and end the process
                break;
            }
        }
        }
        // Successful insertion of elements, adding 1 to the number of elements (whether to expand in this)
        addCount(1L, binCount);
        // Successful insertion of elements returns null
        return null;
    }
    

The overall process is similar to HashMap in the following steps:

  1. If the bucket array is not initialized, it is initialized.
  2. If the barrel where the element to be inserted is empty, try to insert the element directly into the first position of the barrel.
  3. If expansion is under way, the current threads are joined together in the process of expansion.
  4. If the barrel in which the element to be inserted is not empty and the element is not migrated, the barrel is locked (sectional lock).
  5. If the element in the current bucket is stored in a linked list, the element is searched or inserted in the linked list.
  6. If the element in the current bucket is stored as a red-black tree, the element is searched or inserted in the red-black tree.
  7. If the element exists, the old value is returned.
  8. If the element does not exist, add 1 to the number of elements in the whole Map and check if expansion is needed.

The main locks used in adding element operations are spin locks + CAS + synchronized + segmented locks.

Why use synchronized instead of ReentrantLock?

Because synchronized has been greatly optimized, it is no worse than ReentrantLock in certain cases.

Initialization bucket array

When the element is first placed, the bucket array is initialized.


private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // If sizeCtl < 0 indicates initialization or expansion, let the CPU go
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // If the sizeCtl atom is updated to - 1 successfully, the current thread enters initialization
            // If the atomic update fails, it means that other threads have entered initialization first, and then enter the next loop.
            // If the next loop has not yet been initialized, sizeCtl < 0 enters the logic of if above to relinquish the CPU.
            // If the next loop is updated, table.length!=0, exit the loop
            try {
                // Check again if the table is empty to prevent ABA problems
                if ((tab = table) == null || tab.length == 0) {
                    // If sc is 0, the default value 16 is used
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // New Array
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // Assignment to table bucket array
                    table = tab = nt;
                    // Set sc to 0.75 times the array length
                    // n - (n >>> 2) = n - n/4 = 0.75n
                    // It can be seen that the loading factor and the expansion threshold are all written dead here.
                    // That's why there are no threshold s and loadFactor attributes.
                    sc = n - (n >>> 2);
                }
            } finally {
                // Assigning sc to sizeCtl stores the expansion threshold
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
  1. Using CAS locks to control only one thread to initialize bucket arrays;
  2. After initialization, sizeCtl stores the expansion threshold.
  3. The expansion threshold is 0.75 times the size of the bucket array. The bucket array size is the capacity of the map, that is, how many elements can be stored at most.

Judging whether expansion is needed

After each addition of elements, the number of elements is increased by 1, and whether the expansion threshold is reached is judged, then the expansion or assistant expansion is carried out.


private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // The ideas used here are identical to those of the LongAdder class (I'll talk about that later).
    // Store the size of the array on different segments according to different threads (also the idea of segmented locking)
    // And there is a baseCount, which updates baseCount first, and if it fails, updates the corresponding segments of different threads.
    // This ensures minimal conflict.

    // First try adding the number to the baseCount and then to the fragmented CounterCell if it fails
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // If as is empty
        // Or the length is 0.
        // Or the segment where the current thread is located is null
        // Or fail to add a number of segments to the current thread
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // Mandatory increase in quantity (in any case, quantity must be added, not simply spin)
            // Different threads failed to update different segments
            // Expansion of counterCells
            // To reduce the probability of multiple threads hash to the same segment
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // Calculate the number of elements
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // If the number of elements reaches the threshold of expansion, expansion is carried out.
        // Note that under normal circumstances, sizeCtl stores 0.75 times the capacity threshold.
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // rs is a postmark marker for expansion
            int rs = resizeStamp(n);
            if (sc < 0) {
                // SC < 0 indicates expansion is under way
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    // Expansion has been completed, out of the cycle
                    // Normal should only trigger the condition nextTable==null, other conditions do not see when to trigger
                    break;

                // If the expansion is not completed, the current thread is added to the migration element
                // And add 1 to the number of expanding threads
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                // Here is where the thread triggering the expansion enters
                // The sizeCtl's 16-bit high holds the expanded postmark rs
                // The low 16 bits of sizeCtl store the number of expanded threads plus 1, i.e. (1+nThreads)
                // So the official value of sizeCtl for expansion - (1+nThreads) is wrong.

                // Enter migration elements
                transfer(tab, null);
            // Recalculate the number of elements
            s = sumCount();
        }
    }
}
  1. The number of elements is stored in a way similar to LongAdder class, on different segments, which reduces conflicts when different threads update size at the same time.
  2. When calculating the number of elements, the total number of elements can be calculated by adding the values of these segments and baseCount.
  3. Normally, sizeCtl stores the expansion threshold, which is 0.75 times the capacity.
  4. The sizeCtl high-bit storage expansion postmark (resizeStamp) and the number of low-bit storage expansion threads plus 1 (1+nThreads) are used for expansion.
  5. When other threads add elements, if they find expansion, they will also join the expansion column.

Assist in expanding (migrating elements)

When a thread adds elements, it finds that it is expanding and that the bucket element in which the current element is located has been migrated, it assists in migrating elements from other buckets.


final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // If the bucket array is not empty, and the first element of the current bucket is of ForwardingNode type, and nextTab is not empty
    // It means that the current bucket has been migrated before helping to migrate the elements of other buckets.
    // The first element of the old bucket is set to Forwarding Node and its nextTab is pointed to the new bucket array.
    if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        // SizeCtl < 0, indicating expansion
        while (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // Extended threads plus 1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // Current threads help migrate elements
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

The current barrel element migration is completed to assist in the migration of other barrel elements.

Migration elements

The capacity is doubled and some elements are transferred to other barrels.


private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        // If nextTab is empty, the migration has not started yet.
        // Create a new bucket array
        try {
            // The new bucket array is twice as large as the original bucket.
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    // New bucket array size
    int nextn = nextTab.length;
    // Create a new ForwardingNode type node and store the new bucket array in it
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // The whole while loop is calculating the value of i. The process is too complicated to care too much.
        // The value of i decreases from n-1, so you can break the point of interest.
        // Where n is the size of the old bucket array, that is, i is reduced from 15 to 1 to migrate elements like this
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            // If a traversal is complete
            // That is, all the elements in the bucket of the whole map are migrated.
            int sc;
            if (finishing) {
                // If the migration is complete, replace the old bucket array
                // The next expansion threshold is 0.75 times the capacity of the new barrel array.
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // The current thread expansion is completed, and the number of expanded threads is -1
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    // Expansion must be equal on both sides
                    return;
                // Set finishing to true
                // if condition that finishing is true
                finishing = advance = true;
                // i is reassigned to n
                // This will go through the bucket array again to see if the migration is complete.
                // That is, the second traversal will go to the following condition (fh = f. hash) === MOVED
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            // If there is no data in the bucket, place it directly in Forwarding Node to mark that the bucket has been migrated
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            // If the hash value of the first element in the bucket is MOVED
            // Explain that it is a ForwardingNode node
            // That is, the bucket has been moved.
            advance = true; // already processed
        else {
            // Lock the bucket and migrate elements
            synchronized (f) {
                // Judge again whether the first element of the current bucket has been modified
                // That is, maybe other threads migrated elements first.
                if (tabAt(tab, i) == f) {
                    // Divide a linked list into two linked lists
                    // The rule is hash of each element in the bucket and operation of the bucket size n
                    // Put equal to 0 in the low list, not equal to 0 in the high list.
                    // Among them, the location of the low link list migrating to the new bucket is the same as that of the old bucket.
                    // The location of the high list in the new bucket is exactly where it is in the old bucket plus n.
                    // That's why capacity doubles when capacity expands.
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        // The hash value of the first element is greater than or equal to 0.
                        // It shows that the elements in the bucket are stored in the form of a linked list.
                        // This is basically similar to the HashMap migration algorithm.
                        // The only difference is that there's one more step to find lastRun.
                        // Here lastRun is a sub-list that does not need to be specially processed after extracting the list.
                        // For example, the hash value of all elements and the bucket size n are 0.044,000 and 0.040 respectively after operation.
                        // Then the last three 0 elements must still be in the same bucket.
                        // At this point lastRun corresponds to the penultimate node
                        // I don't quite understand why we should do this.
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        // Let's see if the last few elements belong to the low-level list or the high-level list.
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // Traversing the list, putting hash & n 0 in the low list
                        // Non-zero in high-order list
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // The location of the low-level list remains unchanged
                        setTabAt(nextTab, i, ln);
                        // The position of high-order list is in-situ plus n
                        setTabAt(nextTab, i + n, hn);
                        // Mark that the current bucket has been migrated
                        setTabAt(tab, i, fwd);
                        // advance is true, go back to the above for -- i operation
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        // If the first element is a tree node
                        // It's the same. It divides into two trees.
                        // It's also based on Hash & n being zero in the low-level tree.
                        // Not 0 in the high tree
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        // Traverse the whole tree and divide it into two trees according to whether hash&n is zero or not.
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // If the number of elements in a differentiated tree is less than or equal to 6, it degenerates into a linked list.
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // The location of low-level trees remains unchanged
                        setTabAt(nextTab, i, ln);
                        // The position of the high tree is in situ plus n
                        setTabAt(nextTab, i + n, hn);
                        // Mark that the bucket has been moved
                        setTabAt(tab, i, fwd);
                        // advance is true, go back to the above for -- i operation
                        advance = true;
                    }
                }
            }
        }
    }
}

  1. The new bucket array is twice the size of the old one.
  2. The migration elements begin with the barrel at the back.
  3. A Forwarding Node type element is placed in the bucket where the migration is completed, marking the completion of the bucket migration.
  4. When migrating, the elements in the barrel are divided into two chains or trees according to whether hash&n is equal to 0 or not.
  5. The low link list (tree) is stored in the original location.
  6. The upper link list (tree) is stored in the original position plus n position.
  7. When migrating elements, the current bucket will be locked, which is also the idea of sectional locking.

Keywords: Java less

Added by pagie on Sat, 17 Aug 2019 12:17:45 +0300