brief introduction
Concurrent HashMap is a thread-safe version of HashMap. It also uses the structure of array + linked list + red-black tree to store elements.
Compared with HashTable with the same thread security, efficiency and other aspects have been greatly improved.
A Brief Introduction to the Use of Locks
Here is a brief introduction to the various locks, so that I can have an impression when I talk about the relevant concepts below.
synchronized
The keyword in java is implemented as monitor lock internally, which is mainly expressed by the field of object monitor in object header.
synchronized has been optimized a lot since the old version. There are three ways to run it: biased lock, lightweight lock and heavyweight lock.
Biased lock refers to a synchronization code that has been accessed by a thread, which automatically acquires the lock and reduces the cost of acquiring the lock.
Lightweight locks are accessed by another thread when the lock is biased. The biased locks will be upgraded to lightweight locks. This thread will try to acquire locks by spinning, without blocking and improving performance.
Heavy-duty lock refers to when the lock is a lightweight lock, when the spinning thread spins a certain number of times, and has not yet acquired the lock, it will enter the blocking state. The lock is upgraded to a heavy-duty lock. Heavy-duty lock will block other threads and reduce performance.
CAS
CAS, Compare And Swap, is an optimistic lock. It believes that concurrent operations on the same data may not necessarily be modified. When updating data, try to update data, and if it fails, keep trying.
volatile (unlocked)
Keywords in java, when multiple threads access the same variable, one thread modifies the value of the variable, and other threads can immediately see the modified value.
Volatile only guarantees visibility, not atomicity. For example, the variable I modified by volatile is not guaranteed to be correct for i++ operation, because i++ operation is a two-step operation, equivalent to i = i +1, read first, and add 1. In this case, volatile can not be guaranteed.
Spin lock
Spinlock refers to the way that the thread trying to acquire the lock is not blocked, but the way of loops is constantly trying. This advantage is to reduce the unlock caused by context switching of threads and improve performance. The disadvantage is that loops consume CPU.
Sectional lock
Segmented lock is a design idea of lock. It refines the granularity of lock. It is mainly used in Concurrent HashMap to achieve efficient concurrent operation. When the operation does not need to update the whole array, only one item of the lock array can be used.
ReentrantLock
Re-entrant locks refer to the automatic acquisition of locks when a thread tries to acquire locks after acquiring them. The advantage of re-entrant locks is to avoid deadlocks, and synchronized locks are also re-entrant.
Source code analysis
Construction method
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
Comparing with HashMap, we can find that instead of threshold s and loadFactor in HashMap, we use sizeCtl to control it, and only store the capacity in it. The official explanation is as follows:
- "-1" indicates that a thread is initializing
- - (1 + nThreads), indicating that n threads are expanding together
- 0, default value, then use default capacity for real initialization
- "> 0", the next expansion threshold after initialization or expansion is completed
Adding Elements
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // Neither key nor value can be null if (key == null || value == null) throw new NullPointerException(); // Calculate hash value int hash = spread(key.hashCode()); // Number of elements in the bucket in which the element to insert is located int binCount = 0; // Dead loop, used in conjunction with CAS (if CAS fails, the whole bucket will be retrieved for the following process) for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // If the bucket is not initialized or the number of buckets is 0, the bucket is initialized tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // If the barrel in which the element to be inserted does not have an element, insert the element into the barrel. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // If you insert elements with CAS and find that there are already elements, you go to the next loop and re-operate. // If CAS is used to insert elements successfully, break jumps out of the loop and the process ends break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // If the hash of the first element of the bucket in which the element to be inserted is MOVED, the current thread helps migrate the element together tab = helpTransfer(tab, f); else { // If the bucket is not empty and does not migrate elements, lock the bucket (segmented lock) // And find out if the element you want to insert is in the bucket // If it exists, the replacement value (onlyIfAbsent=false) // If it does not exist, it is inserted at the end of the list or into the tree. V oldVal = null; synchronized (f) { // Check again if the first element has changed, and if so, go into the next cycle and start over from scratch. if (tabAt(tab, i) == f) { // If the hash value of the first element is greater than or equal to 0 (indicating that it is not migrating, nor is it a tree) // That is, elements in buckets are stored in linked lists. if (fh >= 0) { // The number of elements in a bucket is assigned to 1 binCount = 1; // Traverse the whole bucket, and binCount plus 1 at each end for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // If this element is found, a new value (onlyIfAbsent=false) is assigned. // And exit the cycle oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { // If no element is found at the end of the list // Insert it at the end of the list and exit the loop pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // If the first element is a tree node Node<K,V> p; // The number of elements in the barrel is assigned to 2 binCount = 2; // Call the insertion method of the red-black tree to insert elements // If successful insertion returns null // Otherwise, return to the found node if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // If this element is found, a new value (onlyIfAbsent=false) is assigned. // And exit the cycle oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // If the binCount is not zero, it indicates that the element has been successfully inserted or found. if (binCount != 0) { // If the number of linked list elements reaches 8, try tree // Because when the element is inserted into the tree above, the binCount assigns only 2, not counting the number of elements in the whole tree. // So there's no repetition of tree if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // If the element to be inserted already exists, the old value is returned if (oldVal != null) return oldVal; // Exit the outer loop and end the process break; } } } // Successful insertion of elements, adding 1 to the number of elements (whether to expand in this) addCount(1L, binCount); // Successful insertion of elements returns null return null; }
The overall process is similar to HashMap in the following steps:
- If the bucket array is not initialized, it is initialized.
- If the barrel where the element to be inserted is empty, try to insert the element directly into the first position of the barrel.
- If expansion is under way, the current threads are joined together in the process of expansion.
- If the barrel in which the element to be inserted is not empty and the element is not migrated, the barrel is locked (sectional lock).
- If the element in the current bucket is stored in a linked list, the element is searched or inserted in the linked list.
- If the element in the current bucket is stored as a red-black tree, the element is searched or inserted in the red-black tree.
- If the element exists, the old value is returned.
- If the element does not exist, add 1 to the number of elements in the whole Map and check if expansion is needed.
The main locks used in adding element operations are spin locks + CAS + synchronized + segmented locks.
Why use synchronized instead of ReentrantLock?
Because synchronized has been greatly optimized, it is no worse than ReentrantLock in certain cases.
Initialization bucket array
When the element is first placed, the bucket array is initialized.
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // If sizeCtl < 0 indicates initialization or expansion, let the CPU go Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // If the sizeCtl atom is updated to - 1 successfully, the current thread enters initialization // If the atomic update fails, it means that other threads have entered initialization first, and then enter the next loop. // If the next loop has not yet been initialized, sizeCtl < 0 enters the logic of if above to relinquish the CPU. // If the next loop is updated, table.length!=0, exit the loop try { // Check again if the table is empty to prevent ABA problems if ((tab = table) == null || tab.length == 0) { // If sc is 0, the default value 16 is used int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // New Array @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // Assignment to table bucket array table = tab = nt; // Set sc to 0.75 times the array length // n - (n >>> 2) = n - n/4 = 0.75n // It can be seen that the loading factor and the expansion threshold are all written dead here. // That's why there are no threshold s and loadFactor attributes. sc = n - (n >>> 2); } } finally { // Assigning sc to sizeCtl stores the expansion threshold sizeCtl = sc; } break; } } return tab; }
- Using CAS locks to control only one thread to initialize bucket arrays;
- After initialization, sizeCtl stores the expansion threshold.
- The expansion threshold is 0.75 times the size of the bucket array. The bucket array size is the capacity of the map, that is, how many elements can be stored at most.
Judging whether expansion is needed
After each addition of elements, the number of elements is increased by 1, and whether the expansion threshold is reached is judged, then the expansion or assistant expansion is carried out.
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // The ideas used here are identical to those of the LongAdder class (I'll talk about that later). // Store the size of the array on different segments according to different threads (also the idea of segmented locking) // And there is a baseCount, which updates baseCount first, and if it fails, updates the corresponding segments of different threads. // This ensures minimal conflict. // First try adding the number to the baseCount and then to the fragmented CounterCell if it fails if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // If as is empty // Or the length is 0. // Or the segment where the current thread is located is null // Or fail to add a number of segments to the current thread if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // Mandatory increase in quantity (in any case, quantity must be added, not simply spin) // Different threads failed to update different segments // Expansion of counterCells // To reduce the probability of multiple threads hash to the same segment fullAddCount(x, uncontended); return; } if (check <= 1) return; // Calculate the number of elements s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // If the number of elements reaches the threshold of expansion, expansion is carried out. // Note that under normal circumstances, sizeCtl stores 0.75 times the capacity threshold. while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // rs is a postmark marker for expansion int rs = resizeStamp(n); if (sc < 0) { // SC < 0 indicates expansion is under way if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // Expansion has been completed, out of the cycle // Normal should only trigger the condition nextTable==null, other conditions do not see when to trigger break; // If the expansion is not completed, the current thread is added to the migration element // And add 1 to the number of expanding threads if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // Here is where the thread triggering the expansion enters // The sizeCtl's 16-bit high holds the expanded postmark rs // The low 16 bits of sizeCtl store the number of expanded threads plus 1, i.e. (1+nThreads) // So the official value of sizeCtl for expansion - (1+nThreads) is wrong. // Enter migration elements transfer(tab, null); // Recalculate the number of elements s = sumCount(); } } }
- The number of elements is stored in a way similar to LongAdder class, on different segments, which reduces conflicts when different threads update size at the same time.
- When calculating the number of elements, the total number of elements can be calculated by adding the values of these segments and baseCount.
- Normally, sizeCtl stores the expansion threshold, which is 0.75 times the capacity.
- The sizeCtl high-bit storage expansion postmark (resizeStamp) and the number of low-bit storage expansion threads plus 1 (1+nThreads) are used for expansion.
- When other threads add elements, if they find expansion, they will also join the expansion column.
Assist in expanding (migrating elements)
When a thread adds elements, it finds that it is expanding and that the bucket element in which the current element is located has been migrated, it assists in migrating elements from other buckets.
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // If the bucket array is not empty, and the first element of the current bucket is of ForwardingNode type, and nextTab is not empty // It means that the current bucket has been migrated before helping to migrate the elements of other buckets. // The first element of the old bucket is set to Forwarding Node and its nextTab is pointed to the new bucket array. if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); // SizeCtl < 0, indicating expansion while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // Extended threads plus 1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // Current threads help migrate elements transfer(tab, nextTab); break; } } return nextTab; } return table; }
The current barrel element migration is completed to assist in the migration of other barrel elements.
Migration elements
The capacity is doubled and some elements are transferred to other barrels.
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating // If nextTab is empty, the migration has not started yet. // Create a new bucket array try { // The new bucket array is twice as large as the original bucket. @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } // New bucket array size int nextn = nextTab.length; // Create a new ForwardingNode type node and store the new bucket array in it ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // The whole while loop is calculating the value of i. The process is too complicated to care too much. // The value of i decreases from n-1, so you can break the point of interest. // Where n is the size of the old bucket array, that is, i is reduced from 15 to 1 to migrate elements like this while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { // If a traversal is complete // That is, all the elements in the bucket of the whole map are migrated. int sc; if (finishing) { // If the migration is complete, replace the old bucket array // The next expansion threshold is 0.75 times the capacity of the new barrel array. nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // The current thread expansion is completed, and the number of expanded threads is -1 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // Expansion must be equal on both sides return; // Set finishing to true // if condition that finishing is true finishing = advance = true; // i is reassigned to n // This will go through the bucket array again to see if the migration is complete. // That is, the second traversal will go to the following condition (fh = f. hash) === MOVED i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) // If there is no data in the bucket, place it directly in Forwarding Node to mark that the bucket has been migrated advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // If the hash value of the first element in the bucket is MOVED // Explain that it is a ForwardingNode node // That is, the bucket has been moved. advance = true; // already processed else { // Lock the bucket and migrate elements synchronized (f) { // Judge again whether the first element of the current bucket has been modified // That is, maybe other threads migrated elements first. if (tabAt(tab, i) == f) { // Divide a linked list into two linked lists // The rule is hash of each element in the bucket and operation of the bucket size n // Put equal to 0 in the low list, not equal to 0 in the high list. // Among them, the location of the low link list migrating to the new bucket is the same as that of the old bucket. // The location of the high list in the new bucket is exactly where it is in the old bucket plus n. // That's why capacity doubles when capacity expands. Node<K,V> ln, hn; if (fh >= 0) { // The hash value of the first element is greater than or equal to 0. // It shows that the elements in the bucket are stored in the form of a linked list. // This is basically similar to the HashMap migration algorithm. // The only difference is that there's one more step to find lastRun. // Here lastRun is a sub-list that does not need to be specially processed after extracting the list. // For example, the hash value of all elements and the bucket size n are 0.044,000 and 0.040 respectively after operation. // Then the last three 0 elements must still be in the same bucket. // At this point lastRun corresponds to the penultimate node // I don't quite understand why we should do this. int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // Let's see if the last few elements belong to the low-level list or the high-level list. if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // Traversing the list, putting hash & n 0 in the low list // Non-zero in high-order list for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // The location of the low-level list remains unchanged setTabAt(nextTab, i, ln); // The position of high-order list is in-situ plus n setTabAt(nextTab, i + n, hn); // Mark that the current bucket has been migrated setTabAt(tab, i, fwd); // advance is true, go back to the above for -- i operation advance = true; } else if (f instanceof TreeBin) { // If the first element is a tree node // It's the same. It divides into two trees. // It's also based on Hash & n being zero in the low-level tree. // Not 0 in the high tree TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // Traverse the whole tree and divide it into two trees according to whether hash&n is zero or not. for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // If the number of elements in a differentiated tree is less than or equal to 6, it degenerates into a linked list. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // The location of low-level trees remains unchanged setTabAt(nextTab, i, ln); // The position of the high tree is in situ plus n setTabAt(nextTab, i + n, hn); // Mark that the bucket has been moved setTabAt(tab, i, fwd); // advance is true, go back to the above for -- i operation advance = true; } } } } } }
- The new bucket array is twice the size of the old one.
- The migration elements begin with the barrel at the back.
- A Forwarding Node type element is placed in the bucket where the migration is completed, marking the completion of the bucket migration.
- When migrating, the elements in the barrel are divided into two chains or trees according to whether hash&n is equal to 0 or not.
- The low link list (tree) is stored in the original location.
- The upper link list (tree) is stored in the original position plus n position.
- When migrating elements, the current bucket will be locked, which is also the idea of sectional locking.