Analysis of LongAdder source code of Java8 atomic bomb

Analysis of LongAdder source code of Java8 atomic bomb

Starting from JDK 8, Java provides longadder for Long atomic operations LongAccumulator; For Double type, Java provides Double adder and Double accumulator.

Striped64 UML

AtomicLong is internally a volatile long variable, which is operated by multiple threads. Multiple threads perform CAS operations on a variable at the same time, which is still not fast enough in high concurrency scenarios. What if you improve the performance?
Split a variable into multiple copies and change it into multiple variables, which is similar to concurrent HashMap segmentation lock. As shown in the figure below, a Long type is disassembled into a base variable plus multiple cells, and each Cell is wrapped with a Long type variable. When multiple threads accumulate concurrently:

  • If the concurrency is low, it is directly added to the base variable
  • The concurrency is high and the conflict is large. These cells are shared equally

When taking the final value, sum the base and these cells.

Core API

add

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // Judge whether the cells have not been initialized, and try to cas the value
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // If the cells have been initialized or the cas operation fails, run the if internal statement
        boolean uncontended = true;
        // Whether the cell [] array is initialized
        // cell [] array is initialized, but is the array length 0
      	// Is the cell corresponding to this thread null
      	// Whether the attempt to update the cas of the cell corresponding to the thread fails. If one of these conditions is true, run the core method longAccumulate
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

Contains a Cell array, an internal class of Striped64. That is, the populated variant of AtomicLong only supports original access and CAS, has a value variable, and provides CAS method to update the value value.

/**
  * Handle update cases involving initialization, resizing, creating new cells, and / or contention
  *
  * @param x value
  * @param fn Update method
  * @param wasUncontended call
  */
 final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
     int h;
     // Get the value of thread probe
     if ((h = getProbe()) == 0) {
         // A value of 0 forces initialization
         ThreadLocalRandom.current();
         h = getProbe();
         wasUncontended = true;
     }
     boolean collide = false; // True if last slot nonempt
     for (;;) {
         Cell[] as; Cell a; int n; long v;
         // This if branch handles the first two of the above four conditions
       	 // At this time, the cells array has been initialized & & the length is greater than 0
         if ((as = cells) != null && (n = as.length) > 0) {
             // The cell corresponding to the thread is null
             if ((a = as[(n - 1) & h]) == null) {
                 // If the busy lock is not occupied
                 if (cellsBusy == 0) {       // Try to attach new Cell
                     // Create a new cell
                     Cell r = new Cell(x);   // Optimistically create
                     // Detect whether busy is 0 and try to lock busy
                     if (cellsBusy == 0 && casCellsBusy()) {
                         boolean created = false;
                         try { // Recheck under lock
                             Cell[] rs; int m, j;
                             // Confirm again that the cell corresponding to the thread probe is null, and assign a value to the new cell
                             if ((rs = cells) != null &&
                                 (m = rs.length) > 0 &&
                                 rs[j = (m - 1) & h] == null) {
                                 rs[j] = r;
                                 created = true;
                             }
                         } finally {
                             // Unlock
                             cellsBusy = 0;
                         }
                         if (created)
                             break;
                         //If it fails, try again
                         continue;           // Slot is now non-empty
                     }
                 }
                 collide = false;
             }
             //Set to true and give it to the loop for retry
             else if (!wasUncontended)       // CAS already known to fail
                 wasUncontended = true;      // Continue after rehash
             //Try to update the cell corresponding to the thread
             else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                          fn.applyAsLong(v, x))))
                 break;
             else if (n >= NCPU || cells != as)
                 collide = false;            // At max size or stale
             else if (!collide)
                 collide = true;
             //Try to extend the cell when none of the above conditions can be solved
             else if (cellsBusy == 0 && casCellsBusy()) {
                 try {
                     if (cells == as) {      // Expand table unless stale
                         Cell[] rs = new Cell[n << 1];
                         for (int i = 0; i < n; ++i)
                             rs[i] = as[i];
                         cells = rs;
                     }
                 } finally {
                     cellsBusy = 0;
                 }
                 collide = false;
                 continue;                   // Retry with expanded table
             }
             h = advanceProbe(h);
         }
         //At this time, the cells have not been initialized for the first time
         else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
             boolean init = false;
             try {                           // Initialize table
                 if (cells == as) {
                     Cell[] rs = new Cell[2];
                     rs[h & 1] = new Cell(x);
                     cells = rs;
                     init = true;
                 }
             } finally {
                 cellsBusy = 0;
             }
             if (init)
                 break;
         }
         //If the busy lock is unsuccessful or busy, try again. casBase accumulates value directly
         else if (casBase(v = base, ((fn == null) ? v + x :
                                     fn.applyAsLong(v, x))))
             break;                          // Fall back on using base
     }
 }
  /**
   * Spinlock (locked via CAS) used when resizing and/or creating Cells.
   * The spin lock realized through cas is used to expand or initialize cells
   */
  transient volatile int cellsBusy;

From the above analysis, the purpose of longAccumulate is to minimize the number of threads updating the same value. If not, expand the cell

Sum (final consistency issue)

There is no lock on the cells array, so it is final consistency, not strong consistency. It is similar to concurrent hashmap#clear (there are threads to put in data while performing the emptying operation, and then read after the clear call is completed). Therefore, it is suitable for high concurrency statistical scenarios, but not for scenarios where a Long variable needs to be strictly synchronized.

/**
 * Returns the current sum.  The return value is not an atomic snapshot; Calls without concurrent modifications can return exact values, but if there are concurrent modifications when calculating sum, normal collaboration may not be possible.
 */
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

LongAdder reduces conflicts and is more efficient than AtomicLong in summation scenarios.
Because LongAdder does not update a number when updating the value, but is distributed to multiple cell s, which can effectively reduce conflict and pressure in the case of multithreading, making it more efficient.

Because both long and double are 64 bits. However, since there is no CAS operation of double type, it is through the conversion of double type
Into a long type. Therefore, the above base and Cell [] variables are located in the base class Striped64. Striped in English means "strip", that is, slicing.

Usage scenario

It is applicable to the scenario of statistical summation counting, because it provides add and sum methods.

Can LongAdder replace AtomicLong

No, because AtomicLong provides many cas methods, such as getAndIncrement and getAndDecrement, which are very flexible to use, while LongAdder only has add and sum, which is limited to use.

Advantages: because the JVM will divide the reading operations of 64 bit double and long variables into two 32-bit reading operations, the low concurrency maintains the AtomicLong performance, and the hot data is hash ed to multiple cells with limited separation, which improves the parallelism through dispersion
However, there are data updates during statistics, and data errors may also occur. However, in high concurrency scenarios, such use is limited, and AtomicLong can be continued in low concurrency scenarios

Pseudo sharing and cache line filling

The Cell class definition uses the annotation contented, which is added in JDK 8, involving pseudo sharing and cache line filling.

Each CPU has its own cache. The basic unit of data exchange between cache and main memory is called Cache Line. In 64 bit x86 rack
In the structure, the cache line is 64 bytes, that is, the size of 8 Long types. This also means that when the cache fails and needs to be refreshed to main memory, at least 64 bytes need to be refreshed.

As follows, there are variables X, Y and Z in the main memory (assuming that each variable is a Long type), which are read into their own buffer by CPU1 and CPU2 respectively
Saved in the same line of Cache Line. When CPU1 modifies the X variable, it will invalidate the whole line of Cache Line, that is, send a message to the bus to inform CPU2 that the corresponding Cache Line is also invalidated. As the Cache Line is the basic unit of data exchange, it cannot only invalidate X. if it fails, the Cache Line of the whole line will be invalidated, which will also invalidate the cache of Y and Z variables.

Using the contained annotation, the cache line filling can be realized. Do not let adjacent elements in the Cell array fall into the same cache line.

Keywords: Java Back-end JavaSE

Added by maxf on Mon, 24 Jan 2022 18:42:55 +0200