Concurrent HashMap parsing
1. Source code analysis
1.1 interpretation of sizectl
-
sizeCtl is 0, which means that the number is not initialized, and the initial capacity of the array is 16
-
sizeCtl is a positive number:
- If the array is uninitialized, it records the initial capacity of the array
- If the array has been initialized, it records the threshold of array expansion (initial capacity of array * 0.75)
-
sizeCtl is - 1, indicating that the array is being initialized
-
Sizecl is less than 0 and not - 1, indicating that the array is being expanded, and the lower 16 bits represent the number of threads for concurrent expansion
1.2 array initialization (Solving concurrency problem - CAS)
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // Cede CPU execution rights else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //The first incoming thread will execute CAS lock try { if ((tab = table) == null || tab.length == 0) { //double check int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //16 initial capacity @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); //0.75*n } } finally { sizeCtl = sc; } break; } } return tab; }
1.3 test method
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); // Insert data for the first time and expand the capacity concurrently (only one thread is controlled to expand the capacity) else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //The bucket head node is empty if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //CAS operation: if the header Node in memory is null, a new Node will be created break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // hash is MOVED, indicating that HashMap is currently expanding tab = helpTransfer(tab, f); // Help Map expand to speed up else { V oldVal = null; synchronized (f) { // sync lock (lock object to the head node of the bucket to insert data) if (tabAt(tab, i) == f) { // Equivalent to double check if (fh >= 0) { // Indicates that it is a common linked list structure and there is no capacity expansion operation binCount = 1; for (Node<K,V> e = f;; ++binCount) { //binCount records the number of nodes under the current bucket K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent)//Find the same key element and overwrite the old value e.val = value; //onlyIfAbsent if true, don't change existing value break; } Node<K,V> pred = e; if ((e = e.next) == null) { // Insert element at the end of linked list pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // If the fh header node hash is negative (- 2), it is a red black tree structure Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } // end if } // end sync block if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // Tree if (oldVal != null) return oldVal; break; } } // end else } addCount(1L, binCount); return null; }
1.4 Size related methods
-
size() method:
In the put and remove methods of ConcurrentHashMap, the addCount() method is called only after the synchronization lock is released, so the sum of key value pairs obtained through the size method may be outdated data.
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
In the sumCount method, take baseCount as the initial value and traverse all the quantities in the cumulative counterCells
Calculation formula: b a s e C o u n t + ∑ i = 0 n − 1 c o u n t C e l l s [ i ] baseCount + \sum_{i=0}^{n-1}countCells[i] baseCount+∑i=0n−1countCells[i]
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
-
addCount method:
Parameter x: count value to be added
Parameter check: less than 0, do not check whether to resize, less than or equal to 1
CounterCell class:
@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available. Rechecks occupancy * after a transfer to see if another resize is already needed * because resizings are lagging additions. * * @param x the count to add * @param check if <0, don't check resize, if <= 1 only check if uncontended */ private final void addCount(long x, int check) { CounterCell[] as; long b, s; // if entry conditions: // 1. counterCells has not been initialized, indicating that there has been no concurrency conflict before // 2. CAS failed to update baseCount. If successful, execute the < 2 > code directly and check the capacity expansion if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // fullAddCount() when the following condition is true // 1. counterCells are still not initialized // 2. The node corresponding to the index value calculated by the current thread hash is null // 3. If the corresponding node of countercells is not empty, CAS will increase the original basic value if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); // Complete addCount policy scheme //(rehash, double expansion of counterCells, update baseCout) return; } if (check <= 1) return; s = sumCount(); } // <2> Check whether to expand the capacity. The inserted element is always checked in the putVal method if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
1.5 capacity expansion and helpTransfer method
1. Capacity expansion principle
There are two steps to expand the capacity of ConcurrentHashMap:
one ⃣ Firstly, expand the array capacity and create a new array newTable twice the original capacity (this step is guaranteed to be completed by only one thread)
two ⃣ ﹥ then perform data migration, calculate the new bucket position of all elements in the old array table, and transfer it to the new array.
Principle of concurrent data migration 🚛:
one ⃣ In the process of data migration, the migration of each bucket node in the table will not affect each other. Using this feature, you can migrate elements of different buckets concurrently through multiple threads.
two ⃣ Therefore, the buckets of the table array can be divided into segments by "segmentation". Each segment contains buckets of a certain index interval. Different segments are divided into different threads. Each thread is only responsible for migrating buckets in its own interval.
There are two reasons why the initial capacity of the internal table is a power of 2:
- Bit operation can efficiently obtain the index value of the bucket node to be put into, and reduce hash conflict
- The expansion migration is convenient and efficient, which can realize multi-threaded concurrent cooperation to complete data migration
2. Timing of capacity expansion
one ⃣ Insert nodes into the linked list. If the number of nodes is greater than or equal to 8, the red black tree will be triggered and the treeifyBin method will be called
In this method, if the table length is small (less than 64), the tryprevize method will be preferred to avoid unnecessary red black tree
two ⃣ In the put method, after inserting the node, call addCount(1L, binCount) to update the Map element count, which contains the capacity expansion judgment.
If the total number of Map elements exceeds the threshold (0.75* table capacity), capacity expansion is triggered.
3. Expansion source code analysis
/** * The next table to use; non-null only while resizing. */ private transient volatile Node<K,V>[] nextTable; /** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. */ private static int RESIZE_STAMP_BITS = 16; /** * The maximum number of threads that can help resize. * Must fit in 32 - RESIZE_STAMP_BITS bits. */ private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /** * The bit shift for recording size stamp in sizeCtl. */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
Capacity expansion stamp
Before each capacity expansion, the resizeStamp function will be called to generate a unique capacity expansion stamp with the table capacity as the seed.
static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); // Integer.numberOfLeadingZeros represents the binary representation of parameter n, which is the number of consecutive zeros from the left }
During capacity expansion, the sizeCtl variable will be used to indicate the capacity expansion status. The upper 16 bits store the capacity expansion stamp, and the lower 16 bits store the number of concurrent capacity expansion threads
-
Capacity expansion function:
Capacity expansion in addCount():
Code analysis at < 1 >: check whether the current thread can participate in capacity expansion
-
(sc >>> RESIZE_STAMP_SHIFT) != rs
If they are not equal, it indicates that the capacity of the table has changed and the expansion is completed. At this time, sizeCtl is the expansion threshold
-
sc == rs + 1 (here is the bug in jdk1.8, which has been modified after jdk12)
It is used to interpret the end of capacity expansion, (rs < < 16) + 1 indicates the end of capacity expansion, + 2 indicates the capacity expansion of the first thread. If the capacity expansion of the first thread ends, rs will perform - 1 operation. At this time, SC = = (rs < < 16) + 1
-
sc == rs + MAX_RESIZERS (here is a bug in jdk1.8, which has been modified after jdk12)
At the beginning, it should be SC = = (RS < < 16) + MAX_RESIZERS ,MAX_RESIZERS defaults to (1 < < 16) - 1
The number of capacity expansion threads has reached the maximum
-
(nt = nextTable) == null
This indicates that the initialization of nextTable has not been completed (only one thread can complete it)
-
transferIndex <= 0
Indicates that the bucket to be migrated has been partitioned by concurrent capacity expansion threads and cannot continue to help with capacity expansion
Here, int rs = resizeStamp(n) in addCount and helpTransfer; There is a problem with the assignment logic (in JDK1.8). The expansion stamp obtained by RS is the low 16 bit data. When comparing with sc, it needs to move 16 bits left to correctly judge the subsequent sc == rs + 1 and sc == rs + MAX_RESIZERS
The logical Bug has been fixed in JDK12[ Bug ID: JDK-8214427 probable bug in logic of ConcurrentHashMap.addCount() (java.com)]
This logic Bug is difficult to reproduce and has little impact on the program, because it is necessary to judge when the concurrent capacity expansion thread reaches the maximum value. It is difficult to simulate this situation and judge whether the current capacity expansion is over, and the subsequent logic will also eliminate the logic error of this situation
//Check whether to expand the capacity. The inserted element is always checked in the putVal method if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //sizeCtl here represents the capacity expansion threshold, and the value of s is calculated by sumCount(), indicating the current total number of KV pairs while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //There is a bug in the assignment of rs here, which was previously //int rs = resizeStamp(n); int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT; //Generate the capacity expansion stamp corresponding to the current capacity n if (sc < 0) { // Concurrent capacity expansion has started // <1> flag if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // Check whether the current thread can participate in capacity expansion break; // Through verification, update sizectl - > sizectl + 1 indicates that one more thread participates in capacity expansion if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // SC > = 0 indicates that the current container has not been concurrently expanded, and the thread running here is the first expanded thread else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // RS < < 16 + 1 indicates the end of capacity expansion // Here + 2 indicates that the first thread performs capacity expansion and updates the sizeCtl variable to modify the capacity expansion status transfer(tab, null); s = sumCount(); } }
transfer method (create a table with a capacity twice that of the previous one, and migrate data concurrently through multiple threads)
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // Calculate the stripe step size, that is, how many buckets in talbe each thread is responsible for each migration // n> > > > 3 is equivalent to n/8 // In a single CPU environment, only one thread will participate in capacity expansion // In the multi CPU environment, the maximum number of threads participating in capacity expansion at the same time is 8*NCPU if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_ TRANSFER_ The default value of stride is 16 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 2X expansion nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; // transferIndex starts from n (the first thread), and the next thread will start from transferIndex stripe } int nextn = nextTab.length; // Capacity after expansion // Mark that the current bucket has completed data migration through ForwardingNode ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; //Mark whether the i node has completed the migration boolean finishing = false; // to ensure sweep before committing nextTab, ensure that the entire table is migrated for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { // Spin preprocessing (bucket interval of positioning thread in this round of processing) int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { // Indicates that all barrels have been divided up i = -1; advance = false; } // Locate the bucket section of this round and modify the value of transferIndex // Execution result: transferindex = transferindex - stripe else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; // transferIndex - stride i = nextIndex - 1; // transferIndex - 1 advance = false; // break, jump out of the loop } } // Determine the bucket interval and start the migration if (i < 0 || i >= n || i + n >= nextn) { int sc; // Here, the last concurrent capacity expansion thread will execute to and perform the finishing work (I < n) if (finishing) { // finishing is true to indicate that all migrations have been completed nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); //Recalculate capacity expansion threshold * 0.75 return; } // The current thread has completed its assigned migration task. The number of CAS expansion threads is - 1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // Determine whether the current thread is the last return; //If not, exit finishing = advance = true; //If yes, rescan the whole table to ensure that the migration is completed i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) //If it is empty, it does not need to be migrated and directly placed in the ForwardingNode node advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) //It is already a ForwardingNode node. Skip it directly advance = true; // already processed else { //The bucket has not completed migration synchronized (f) { //Add synchronous lock to the barrel 🔒, Ensure that only one thread operates if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { // A hash greater than 0 indicates that the nodes under the bucket are arranged in a linked list int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; //After a loop, lastRun points to the last node with a different runBit } } if (runBit == 0) { // A runBit of 0 indicates that migration is not required ln = lastRun; hn = null; } else { // A runBit of 1 indicates that it needs to be migrated to the bucket location of i + n hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); // Adding ln linked list by head inserting method else hn = new Node<K,V>(ph, pk, pv, hn); // Adding hn linked list by head inserting method } // ln and hn linked lists are placed in corresponding positions setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); //fwd tag advance = true; //Indicates the end of the current node migration } // The nodes under the bucket are arranged according to the red black tree structure else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { //Lo tree maintains the original barrel position if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else {//Hi tree migrates to the bucket location of i + n if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } // end TreeBin transfer } } // end Sync(f) } //end while(advance) } // end for(;;) }
-
-
Help with capacity expansion:
/** * Helps transfer if a resize is in progress. */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT; // java bug fixed while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }