Concurrency and multithreading -- source code analysis of ConcurrentHashMap (jdk1.8)


Among the concurrent tool classes and concurrent collections, jdk1 is probably the most complex 8 + concurrent HashMap source code, compared with jdk1 8. We have abandoned the implementation of Segment lock and adopted Synchronized, CAS and volatile to implement a thread safe container, which is the thread safe version of HashMap. In terms of data structure, it is very similar to HashMap, but there are more internal classes, which we can see below.

If you want to know the relevant source code of HashMap, you can refer to: HashMap source code analysis (jdk1.8)

As for CAS, the relevant mechanisms of volatile and Synchronized are understood by default, so there won't be too much explanation. Secondly, many unimportant or the same code as HashMap will be mentioned in one stroke, otherwise the length is too huge. If you don't know much about the implementation idea of HashMap, you can take a look at the above link first.

Member variable

	transient volatile Node<K,V>[] table;

	 //The next Node array is used when capacity is expanded
    private transient volatile Node<K,V>[] nextTable;

    //Base counter value, updated through CAS
    private transient volatile long baseCount;

    //Used during table initialization and capacity expansion, - 1 indicates that initialization is in progress and (N+1) indicates that N active threads are expanding capacity
    private transient volatile int sizeCtl;

    //Split the index of nextTable + 1 during capacity expansion
    private transient volatile int transferIndex;

Constructor

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //Throw an exception directly for parameters that do not meet the conditions                     
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //If the initial capacity is less than the concurrency level, the value is assigned    
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

concurrencyLevel indicates the concurrency level, which is 1 by default.

Node

The code will not be posted, which takes up too much space. It is different from HashMap. Node does not support setValue and supports find(), that is, map get().

TreeNode

TreeNode represents the structure of the red black tree, which is the implementation of its data structure. The red black tree saved in the array is not a TreeNode, but the following TreeBin

TreeBin

TreeNode implements the data structure of the red black tree. TreeBin holds its reference. When operating the red black tree, lock and unlock the root through lockRoot() and unlockRoot(). The real red black tree on the Node array is TreeBin.

ForwardingNode

ForwardingNode refers to the head of the node during capacity expansion. It is an empty node. It just tells subsequent threads that the current node data has been migrated.

putVal add

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //Get the hash value according to the hashCode() of the key
        int hash = spread(key.hashCode());
        int binCount = 0; //It is the number of elements of the current node. Judge whether to expand or tree
        //Infinite loop, wait condition break
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //If the current array is empty, initialize it directly, and then continue the loop
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //If the hash obtains that the value of the corresponding position of the current array is null, directly cas generates a node, and then break
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //If capacity expansion is currently in progress
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {  //If the current position has a value
                V oldVal = null;
                //synchronized locks the current array node
                synchronized (f) {
                    //Judge whether the value of the current position has changed, because it may be modified concurrently during this period
                    if (tabAt(tab, i) == f) {
                        //FH > = 0 indicates the current non red black tree node
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //If the key is a general node, the node Value is temporarily stored in oldVal, and then the Value is assigned to break
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //Currently, it is a linked list, and next is equal to null. Directly generate node, hang to the tail, and break
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //If it's a red black tree
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            //If the corresponding node of the red black tree is not null, then Value is assigned and the node Value is temporarily stored in oldVal
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //binCount != 0 indicates that the above cycle assignment is successful
                if (binCount != 0) {
                    //It indicates that it may be necessary to tree, treeifyBin logic and dye back to oldVa
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

The overall code logic is very similar to HashMap, but there are some more logic to ensure concurrency security. Some points are explained below.

  1. The key or value of ConcurrentHashMap cannot be empty, otherwise an exception will be thrown.
  2. tabAt() takes values from main memory through volatile to ensure that the values obtained in the concurrent environment are new.
  3. casTabAt() adds nodes to the array through CAS. At the same time, it is guaranteed to succeed. Of course, it may not come here after being put by other threads.
  4. Why does FH > = 0 represent a non red black tree node? Because the hash value corresponding to the root of the red black tree is fixed, static final int treebin = - 2;
  5. The capacity expansion is inside treeifyBin(), which is also different from HashMap, and the logical judgment is relatively more complex. The latter directly executes resize(), without considering concurrency.

To summarize the overall idea of putVal():

  1. onlyIfAbsent defaults to false, indicating whether to replace existing elements.
  2. Judge whether key or value is empty, otherwise an exception will be thrown.
  3. Judge whether the current array is empty. If it is initialized through initTable().
  4. Get the subscript position of the array through the hash value of the key to judge whether the current position is empty. If so, add it directly through CAS.
  5. Judge whether the hash value of the current Node is MOVED. If yes, it indicates that other threads are expanding capacity. Call helpTransfer() to help expand capacity. Transfer nodes simultaneously through multiple threads to reduce the performance consumption caused by capacity expansion.
  6. If the hash value is not MOVED, synchronized directly locks the current node. The hash value > = 0 indicates that it is not a red black tree. Traverse the current linked list node. If the key is found, replace it directly, otherwise add it to the tail.
  7. If it is a red black tree, TreeBin is the head node of the red black tree, which is added to the red black tree through putTreeVal().
  8. After adding, judge whether treeify is reached_ Threshold, if yes, through treeifyBin(); Judge to expand or tree.

initTable initialization

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //Sizectl < 0 indicates that other threads are initializing or expanding capacity. Directly execute yield() to give up CPU usage
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //CAS assigns a value to SIZECTL
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //Double detection
                    if ((tab = table) == null || tab.length == 0) {
                        //SC > 0 indicates that the initial capacity has been set when initializing through the constructor, otherwise an array with a size of 16 is generated
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //sc = 0.75n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

Initialize through initTable(), and initialize or expand HashMap through resize().

  1. If sizectl < 0, it indicates that other threads are initializing or expanding capacity. Directly execute yield() to give up CPU usage.
  2. If sizecl > 0, the initial capacity is set to sizecl.
  3. Otherwise, set the initial capacity to 16.
  4. sizeCtl is 3 / 4 of the array length

treeifyBin

    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
        //If the other condition of tree formation is not met, that is, the array length is less than 64, the capacity can be expanded directly
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }
  1. The overall logic of this method is to judge whether the second condition of tree formation is reached. If not, expand the capacity, and the array length is twice.
  2. If yes, generate a TreeNode, package it into TreeBin, and set it.

tryPresize()

    private final void tryPresize(int size) {
        //Determine whether size > = maximum_ Half of capability, if C = maximum_ Capability, otherwise
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                //Note that this part of logic needs to be used in the capacity expansion of transfer()
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

transfer()

transfer() is called from within helpTransfer. The annotation of helpTransfer is to help expand the capacity. If other threads are expanding the capacity, the capacity expansion is also the most difficult part of ConcurrentHashMap, lz try to be clear.

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //N > > > 3 is equivalent to n/8, and then divided by the number of CPU cores to determine whether it is less than 16. In order to make the array length transferred by each CPU the same, generally speaking, the length of 16 is processed by one thread by default
        //Therefore, generally, we only need 1-2 threads to complete data migration in our code.
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //nextTab is not initialized, please initialize
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //Update member variables
            nextTable = nextTab;
            //Subscript at transfer
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //ForwardingNode indicates a node being migrated. When the node with subscript i in the original array completes the migration, a fwd will be set to indicate that the data at the current location has been processed by other threads, and then it can be skipped
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //Indicates whether all nodes corresponding to the current subscript are migrated. If true, -i
        boolean advance = true;
        //Judge whether the expansion has been completed.
        boolean finishing = false; // to ensure sweep before committing nextTab
        //i indicates the processing, from nextIndex to - 1. For ease of understanding, let's take a chestnut. In the scenario where the length is expanded from 16 to 32, the range of responsibility of the first thread and bucket is [0,16]
        for (int i = 0, bound = 0;;) {
            //
            Node<K,V> f; int fh;
            while (advance) {
                //nextIndex represents the rightmost boundary of the current bucket, nextIndex=16, nextBound=0
                int nextIndex, nextBound;
                //Indicates that all subscript nodes of the array corresponding to the current bucket have been transferred
                if (--i >= bound || finishing)
                    advance = false;
                //Indicates that all bucket s have been allocated
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //CAS assigns a value to TRANSFERINDEX and executes the code for the first time. Here, bound is 0 and i is 15
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //The current thread has processed the bucket it is responsible for
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                
                //If the expansion is completed, it can exit the cycle
                if (finishing) {
                    nextTable = null;   //Delete member variable
                    table = nextTab;    //Update table array
                    sizeCtl = (n << 1) - (n >>> 1); //Update threshold (32 * 0.75 = 24)
                    return;
                }
                //For each thread that helps to expand capacity, perform - 1 operation on SIZECTL
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //① The principle of judgment here will be described later
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //If the node in the current location is null, put fwd directly into the current node, indicating that the data has been transferred
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //Indicates that the current node data is being transferred, skipping the current cycle times
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //Lock the current node
                synchronized (f) {
                    //Double judgment
                    if (tabAt(tab, i) == f) {
                        //ln and hn denote lownode and highnode respectively. ln stays in its original position and hn moves to the position of index+length
                        Node<K,V> ln, hn;
                        //If it wasn't a red black tree
                        if (fh >= 0) {
                            //The result of runBit can only be 0 and n
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //lastRun indicates the last node whose hash & n changes. The results of subsequent nodes' hash & n are the same and the location is the same, so you can bring it directly
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //Equal to 0, put it in ln, otherwise put it in hn
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //If the current node is a single data, copy it directly. Otherwise, cycle the linked list and copy the linked list
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //As mentioned above, ln is placed at the index position, hn is placed at the index+length position, and the index of the original array is saved to fwd
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            //***
                        }
                    }
                }
            }
        }
    }

Some notes have been added to the capacity expansion method above, but since the method itself is difficult to understand, it is generally not required to understand, and mastering ideas is the most important.

  1. First, determine the bucket interval that each thread is responsible for according to the number of CPU cores.
  2. If nextTab is empty, initialize it. This is because transfer() has multiple calls. For example, if it is called through putAll(), there may be no initialization. All have this logic.
  3. For each thread, the data is migrated according to the division of labor. As shown in the above example, the bucket[0,15] in thread1 and the bucket[16,31] in thread2 are migrated from the rightmost side of each bucket. For example, the thread starts from 15 and then 14,13 0, and then meet the judgment return.
  4. The code has a location where I comment ①, (SC - 2)= resizeStamp(n) << RESIZE_ STAMP_ The reason for this judgment is that before the first thread for capacity expansion enters transfer(), it assigns RS < < resize through CAS_ STAMP_ Shift) + 2, and the following methods are for SIZECTL+1 every time. This part of the code is in trypresse(). Then, after entering the transfer (), each method is assigned to a bucket to migrate data, and the above judgment will be satisfied when the data migration of the last thread is completed.
  5. Then there is the data migration part, which distinguishes the linked list from the red black tree. The red black tree part is skipped, and the ideas are the same. If the hash conflicting elements are stored before and after the expansion, they are the same as the HashMap, there is nothing to say about this part. Everyone should understand it, and the comments also say the ideas. If you don't understand it, you can check the HashMap link at the beginning, Or Baidu related content.
  6. It should be noted that nodes with the same and different hash & n results as the linked list will form a new linked list respectively. However, the former linked list is still in the original order, but the latter will be inverted compared with the previous one. Some big guys on the Internet said this, but I didn't understand it very much. Some big gods can leave a message and explain it.

Here, we have learned about the source code of new addition, initialization and capacity expansion. We have solved the most difficult part of ConcurrentHashMap. The subsequent query is relatively simple.

Get get

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        //The array is not empty, and the node corresponding to the subscript is not empty
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //Judge the hash value, and then judge the key. If yes, return val
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //If it is not a single node, the traversal judgment returns
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

From the above, we can see that get() does not have any synchronization measures, so concurrent HashMap supports concurrent reading.

Added by valoukh on Sun, 09 Jan 2022 04:58:26 +0200