Lock free multithreading -- a class of atomic related atoms

preface

java.util.concurrent.atomic also provides some concurrency tool classes, which are divided into five categories:

  1. Update base types atomically
    1. AtomicInteger: integer atomic class
    2. AtomicLong: long integer atomic class
    3. AtomicBoolean: Boolean atomic class
  2. Atomic reference
  3. Atomic array
  4. Field Updater
  5. Atomic accumulator

1. Atomic integer

Take AtomicInteger as an example to discuss its api interface: by observing the source code, we can find that AtomicInteger is implemented internally through the principle of cas!

Related APIs:

public static void main(String[] args) {
    AtomicInteger i = new AtomicInteger(0);
    // Get and auto increment (i = 0, result i = 1, return 0), similar to i++
    System.out.println(i.getAndIncrement());
    // Auto increment and get (i = 1, result i = 2, return 2), similar to + + i
    System.out.println(i.incrementAndGet());
    // Subtract and get (i = 2, result i = 1, return 1), similar to -- i
    System.out.println(i.decrementAndGet());
    // Gets and subtracts itself (i = 1, result i = 0, returns 1), similar to i--
    System.out.println(i.getAndDecrement());
    // Get and add value (i = 0, result i = 5, return 0)
    System.out.println(i.getAndAdd(5));
    // Add value and get (i = 5, result i = 0, return 0)
    System.out.println(i.addAndGet(-5));
    // Get and update (i = 0, p is the current value of i, result i = -2, return 0)
    // Functional programming interface, in which the operation in the function can guarantee the atom, but the function needs no side effects
    System.out.println(i.getAndUpdate(p -> p - 2));
    // Update and get (i = -2, p is the current value of i, result i = 0, return 0)
    // Functional programming interface, in which the operation in the function can guarantee the atom, but the function needs no side effects
    System.out.println(i.updateAndGet(p -> p + 2));
    // Get and calculate (i = 0, p is the current value of i, x is parameter 1, result i = 10, return 0)
    // Functional programming interface, in which the operation in the function can guarantee the atom, but the function needs no side effects
    // If getAndUpdate refers to an external local variable in lambda, ensure that the local variable is final
    // getAndAccumulate can refer to external local variables through parameter 1, but it does not have to be final because it is not in lambda
    System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
    // Calculate and obtain (i = 10, p is the current value of i, x is the value of parameter 1, the result i = 0, return 0)
    // Functional programming interface, in which the operation in the function can guarantee the atom, but the function needs no side effects
    System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}

AtomicInteger source code:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    /*
     * This class intended to be implemented using VarHandles, but there
     * are unresolved cyclic startup dependencies.
     */
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
    private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

    private volatile int value;

Unsafe is the core class of CAS. Java cannot directly access the underlying operating system, but through local methods. Nevertheless, the JVM opens a back door: unsafe, which provides hardware level atomic operations. Use it to call some native methods.

VALUE is the offset address of the variable VALUE in memory. unsafe obtains the original VALUE of the data through the offset address.

The current value of value is modified with volatile to ensure that the same value is seen in a multithreaded environment.

Let's take the addAndGet() method of AtomicInteger as an example to see the source code:

addAndGet source code:

public final int addAndGet(int delta) {
    return U.getAndAddInt(this, VALUE, delta) + delta;
}

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    // Spin lock until it can be replaced
    do {
    	// Gets the value in the memory address
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}
// If the value offset in the memory address is the same as the value I passed
// Assign the value in the current memory to x
public final boolean weakCompareAndSetInt(Object o, long offset,
                                          int expected,
                                          int x) {
    return compareAndSetInt(o, offset, expected, x);
}

// Four parameters, representing the object, the address of the object, the expected value, and the modified value
public final native boolean compareAndSetInt(Object o, long offset,
                                             int expected,
                                             int x);

2. Atomic reference

Why do I need atomic reference types? Ensure that shared variables of reference type are thread safe (ensure that this atomic reference has not referenced others).

A basic type atomic class can only update one variable. If you need to update multiple variables, you need to use a reference type atomic class.

  • AtomicReference: reference type atomic class
  • AtomicStampedReference: atomic updates reference types with version numbers. This class associates integer values with references, which can be used to solve the update data and version number of atomic data, and solve the ABA problem that may occur when using CAS for atomic update.
  • AtomicMarkableReference: atomic updates reference types with tags. This class associates the boolean tag with the reference, and can also solve the ABA problem that may occur when using CAS for atomic update.

Using atomic reference to realize thread safety of BigDecimal deposit and withdrawal
The following is an unsafe implementation process:

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;
    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }
    @Override
    public BigDecimal getBalance() {
        return balance;
    }
    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
}

The solution code is as follows: in the AtomicReference class, there is a variable of type value to save the reference to the BigDecimal object.

class DecimalAccountCas implements DecimalAccount{

    //private BigDecimal balance;
    private AtomicReference<BigDecimal> balance ;

    public DecimalAccountCas(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while(true){
            BigDecimal pre = balance.get();
            // Note: balance here returns a new object, pre= next
            BigDecimal next = pre.subtract(amount);
            if (balance.compareAndSet(pre,next)){
                break;
            }
        }
    }
}

ABA question:

As shown in the following procedure, although two threads in the other method modify the shared variable, it becomes the original value after modification, which is invisible in the main thread. This operation has no impact on the business code:

static AtomicReference<String> ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
    log.debug("main start");
    // Get value A
    String prev = ref.get();
    other();
    Thread.sleep(1000);
    log.debug("A -> C {}", ref.compareAndSet(prev, "A"));
}

private static void other() throws InterruptedException {
    new Thread(() -> {
        log.debug("A -> B {}", ref.compareAndSet(ref.get(), "B"));
    }, "t1").start();
    Thread.sleep(500);
    new Thread(() -> {
    	// Note: if you use log debug("change B->A {}", ref.compareAndSet(ref.get(), new String("A")));
        // Then the log in this experiment debug("change A->C {}", ref.compareAndSet(prev, "C"));
        // The print is false, because the reference of the object returned by new String("A") is different from that of the object returned by "A"!
        log.debug("B -> A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();
}

result:

13:49:06.903 [main] DEBUG aba - main start
13:49:06.909 [t1] DEBUG aba - A -> B true
13:49:07.415 [t2] DEBUG aba - B -> A true
13:49:08.425 [main] DEBUG aba - A -> C true

The main thread can only judge whether the value of the shared variable is the same as the initial value a, and cannot perceive the situation of changing from a to B and back to A. if the main thread wants to: as long as other threads [have moved] the shared variable, its cas will fail. At this time, only comparing the value is not enough, and it needs to add another version number. Use AtomicStampedReference to solve the problem.

ABA problem solving: AtomicStampedReference

Java provides AtomicStampedReference to solve this problem. AtomicStampedReference marks the version stamp of the object by wrapping the tuple of [E,Integer], so as to avoid the ABA problem.

The compareAndSet() method of AtomicStampedReference is defined as follows:

public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    return
        expectedReference == current.reference &&
        expectedStamp == current.stamp &&
        ((newReference == current.reference &&
          newStamp == current.stamp) ||
         casPair(current, Pair.of(newReference, newStamp)));
}

compareAndSet has four parameters: expected reference, updated reference, expected flag and updated flag.
If the updated reference and flag are equal to the current reference and flag, it will directly return true. Otherwise, a new pair object will be generated through pair and replaced with the current pair CAS. Pair is an internal class of AtomicStampedReference, which is mainly used to record reference and version stamp information (identification). Its definition is as follows:

private static class Pair<T> {
    final T reference;
    final int stamp;
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

private volatile Pair<V> pair;

Pair records the reference and version stamp of the object. The version stamp is int type and keeps increasing automatically. At the same time, pair is an immutable object, all its attributes are defined as final, and an of method is provided externally, which returns a new Pari object. The pair object is defined as volatile to ensure visibility in a multithreaded environment. In AtomicStampedReference, most methods generate a new pair object by calling the of method of pair, and then assign it to the variable pair. For example, set method:

public void set(V newReference, int newStamp) {
    Pair<V> current = pair;
    if (newReference != current.reference || newStamp != current.stamp)
        this.pair = Pair.of(newReference, newStamp);
}
Use AtomicStampedReference to solve aba problems:
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // Get value A
    int stamp = ref.getStamp();
    log.info("main stamp:{}",stamp);
    String prev = ref.getReference();
    other();
    Thread.sleep(1000);
    // Try to change to C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C",stamp,stamp+1));
}

private static void other() throws InterruptedException {
    new Thread(() -> {
        int stamp = ref.getStamp();
        log.info("t1 stamp:{}",stamp);
        log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",stamp,stamp+1));
    }, "t1").start();
    Thread.sleep(500);
    new Thread(() -> {
        int stamp = ref.getStamp();
        log.info("t2 stamp:{}",stamp);
        log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp+1));
    }, "t2").start();
}

result:

14:13:48.082 [main] DEBUG aba - main start...
14:13:48.086 [main] INFO aba - main stamp:0
14:13:48.089 [t1] INFO aba - t1 stamp:0
14:13:48.090 [t1] DEBUG aba - change A->B true
14:13:48.603 [t2] INFO aba - t2 stamp:1
14:13:48.603 [t2] DEBUG aba - change B->A true
14:13:49.617 [main] DEBUG aba - change A->C false
compare compares addresses:

At the same time, from the source code of compareAndSet above, we can see that the comparison method of compare is the double equal sign, that is, the comparison is the reference address. If Integer type is used, the judgment of 1000 = = 1000 is false. For other objects, only the objects from the same new are equal.
Code example:

static AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(200,0);
public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // Get value A
    int stamp = ref.getStamp();
    log.info("main stamp:{}",stamp);
    Integer prev = ref.getReference();
    other();
    Thread.sleep(1000);
    // Try to change to C
    log.debug("change A->C {}", ref.compareAndSet(prev, 400,stamp,stamp+1));
}

private static void other() throws InterruptedException {
    new Thread(() -> {
        int stamp = ref.getStamp();
        log.info("t1 stamp:{}",stamp);
        log.debug("change A->B {}", ref.compareAndSet(200, 300,stamp,stamp+1));
    }, "t1").start();
    Thread.sleep(500);
    new Thread(() -> {
        int stamp = ref.getStamp();
        log.info("t2 stamp:{}",stamp);
        log.debug("change B->A {}", ref.compareAndSet(300, 200,stamp,stamp+1));
    }, "t2").start();
}


Solution:

// Obtained during comparison, or the value is less than 128
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), 300,stamp,stamp+1));

AtomicMarkableReference

AtomicStampedReference can add version number to atomic reference and track the whole change process of atomic reference, such as a - > b - > A - > C. through AtomicStampedReference, we can know that the reference variable has been changed several times in the middle.

However, sometimes, I don't care about how many times the reference variable has been changed, but just whether it has been changed. Therefore, there is AtomicMarkableReference
Source code:

public boolean compareAndSet(V       expectedReference,
                                 V       newReference,
                                 boolean expectedMark,
                                 boolean newMark) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedMark == current.mark &&
            ((newReference == current.reference &&
              newMark == current.mark) ||
             casPair(current, Pair.of(newReference, newMark)));
    }

Through the source code of AtomicMarkableReference, we can see that its logo is Boolean, that is, we don't need to care about how many times it has been updated, we only care about whether it has been updated.

3. Atomic array

Update an element in the array in an atomic way, which mainly includes three:

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

For functional programming, see: Four functional interfaces

Code demonstration:

public class TestAtomicArray {
    public static void main(String[] args) {
        TestAtomicArray.demo(
                ()->new AtomicIntegerArray(10),
                (array)-> array.length(),
                (array,index)-> array.getAndIncrement(index),
                (array)->System.out.println(array)
        );

        TestAtomicArray.demo(
                ()->new int[10],
                (array)-> array.length,
                (array,index)-> array[index]++,
                (array)->System.out.println(Arrays.toString(array))
        );
    }

    /**
     Parameter 1: provide array, thread unsafe array or thread safe array
     Parameter 2: method of obtaining array length
     Parameter 3: Auto increment method. Pass two parameters: array and index. Array is an array, and index is the subscript of the elements that are auto incremented each time
     Parameter 4, method of printing array
     */
    // supplier provider makes something out of nothing () - > result
    // Function function one parameter one result (parameter) - > result, BiFunction (parameter 1, parameter 2) - > result
    // consumer: no result for a parameter (parameter) - > void, biconsumer (parameter 1, parameter 2) - >
    public static <T> void demo(
            Supplier<T> arraySupplier,
            Function<T,Integer> lengthFun,
            BiConsumer<T,Integer> putConsumer,
            Consumer<T> printConsumer){
        List<Thread> ts = new ArrayList<>();
        T array = arraySupplier.get();
        Integer length = lengthFun.apply(array);
        for (int i = 0;i<length;i++){
            ts.add(new Thread(()->{
                for (int j=0;j<10000;j++){
                    putConsumer.accept(array,j%length);
                }
            }));
        }
        ts.forEach(t->t.start());
        ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printConsumer.accept(array);
    }
}

result:

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
[6857, 6840, 6815, 6802, 6825, 6847, 6845, 6850, 6824, 6768]

You can see that the array initialized with atomic outputs the result correctly

4. Field Updater

Updating attributes in an object by atomic means mainly includes three classes:

  • AtomicReferenceFieldUpdater
  • AtomicIntegerFieldUpdater / / the field type is integer
  • AtomicLongFieldUpdater / / the field type is long

Note that attributes must be decorated with volatile

Field updater uses:

public class TestAtomicField {
    public static void main(String[] args) {
        Student student = new Student();
        AtomicReferenceFieldUpdater u = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
        System.out.println(u.compareAndSet(student, null, "Zhang San"));
    }
}

@Data
class Student {
    // Note that you need to use volatile decoration to ensure visibility
    volatile String name;
}

result:

true

5. Atomic accumulator

As the name suggests, atomic accumulator is to accumulate numbers; In jdk8,

Keywords: Java Back-end Multithreading

Added by it2051229 on Wed, 16 Feb 2022 09:59:20 +0200