Java Concurrency-Analysis of CAS Principle

We know that when multithreaded operations share resources, there are three problems: visibility, orderliness and atomicity.

Generally, we use synchronized synchronized synchronized lock (exclusive lock, mutex lock), that is, only one thread can modify the shared variables at the same time.
Other threads must wait. But this is equivalent to single-threading, which does not reflect the advantages of multi-threading.

So do we have another way to solve these three problems?

Java has a volatile keyword, which can solve the problems of visibility and orderliness. And if the shared variable of the operation is the basic data type,
And only read or write variables at the same time, then the atomicity problem has been solved, and there will be no multi-threading problem.

But usually, we have to read the shared variables first, then operate the shared variables, and finally write the shared variables. How can we guarantee the atomicity of the whole operation at this time? One solution is CAS technology.
CAS (Compare and Swap) is to compare and exchange. Before explaining this, we should first understand two important concepts: pessimistic lock and optimistic lock.

I. Pessimistic Lock and Optimistic Lock

  1. Pessimistic Locks: Assuming concurrent conflicts occur, that is, shared resources will be changed by a thread. So when one thread acquires shared resources, it prevents other threads from acquiring shared resources. Also known as exclusive or mutually exclusive locks, such as synchronized synchronized synchronized locks in java.
  2. Optimistic Lock: Assuming no concurrency conflicts occur, only when the shared resource is last updated will it be judged whether there are other threads modifying the shared resource during this period. If a conflict occurs, try again until there is no conflict and the update succeeds. CAS is an optimistic lock implementation.
Pessimistic locks block other threads. Optimistic locks do not block other threads. If conflicts occur, try again in a dead-loop fashion until the update succeeds.

II. Realization Principle of CAS

The principle of CAS is simple, including three values: current memory value (V), expected original value (A) and expected updated value (B).

If the value of memory location V matches the expected original value A, then the processor automatically updates the location value to the new value B and returns true. Otherwise, the processor does nothing and returns false.

The most important point to realize CAS is the consistency of comparison and exchange operations, otherwise ambiguity will arise.

For example, when the current thread is successful and ready to update the shared variable value, the shared variable value is changed by other threads, then the CAS function must return false.

To achieve this requirement, the Unsafe class is provided in java, which provides three functions to operate on the basic types int and long, and to refer to the type Object.

    public final native boolean compareAndSwapObject(Object obj, long valueOffset, Object expect, Object update);

    public final native boolean compareAndSwapInt(Object obj, long valueOffset, int expect, int update);

    public final native boolean compareAndSwapLong(Object obj, long valueOffset, long expect, long update);

Significance of parameters:

  • Obj and valueOffset: Represents the memory address of this shared variable. This shared variable is a member property of the obj object, and valueOffset represents the memory offset of the shared variable in the obj class. So these two parameters can directly modify and read the shared variable values in memory.
  • expect: Represents the expected original value.
  • update: Represents the value that is expected to be updated.

Next, let's look at how atomic packages in the Java concurrency framework use CAS.

III. Atomic Classes in JUC Concurrent Framework

When calling methods of atomic classes under JUC concurrent framework, there is no need to consider multithreading. So let's analyze how it solves the multithreading problem. Take the AtomicInteger class as an example.

3.1 Membership Variables

    // It is used to implement CAS operation. Because it's an int type, call its compareAndSwapInt method
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    // The shared variable value is the memory offset on the AtomicInteger object.
    // By modifying the value of value directly in memory, this parameter is required in the compareAndSwapInt method
    private static final long valueOffset;

    // Through static code blocks, the AtomicInteger class is called when it is loaded
    static {
        try {
            // Through unsafe class, get the memory offset of value variable on AtomicInteger object
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    // By sharing variables, Atomic Integer ensures the security of its multithreaded operations.
    // The problem of visibility and orderliness is solved by volatile modification.
    private volatile int value;

There are three important attributes:

  • unsafe: It implements CAS operations, because shared variables are of type int, so the compareAndSwapInt method is called.
  • valueOffset: Shared variable value memory offset on AtomicInteger objects
  • value: Sharing variables, using volatile modification, solves the visibility and orderliness problems.

3.2 Important Methods

3.2.1 get and set methods

    // Read directly. Because volatile is a key sub-modification, you can always see (any thread) the latest write to this volatile variable
    public final int get() {
        return value;
    }

    // Write directly. Because volatile is a key sub-modifier, it modifies the value variable and is immediately read by other threads.
    public final void set(int newValue) {
        value = newValue;
    }

Because the value variable is modified by the volatile keyword, it can always read (any thread) the latest write to the volatile variable. It modifies the value variable and is immediately read by other threads.

3.2.2 comparison AndSet method

    // If the current value (memory value) of the value variable is equal to the expected value, then the update is assigned to the value variable, returning true.
    // If the current value of the value variable (memory value) is not equal to the expected value, do nothing and return false.
    // This is CAS operation, using unsafe. comparison AndSwapInt method to ensure the atomicity of the whole operation process.
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

CAS function is implemented by calling unsafe compareAndSwapInt method. However, CAS functions can only guarantee the atomicity of comparison and exchange operations, but update operations are not necessarily performed. For example, we want to increase the value of shared variables.
Value self-increment of shared variable is three operations: 1. reading value, 2. calculating value+1, and 3. assigning value+1 to value. Analysis of these three operations:

  • Read the value value value, because the value variable is modified by volatile keyword and can read the last modified value of any thread, so no problem.
  • Calculating the value+1: This is a problem. Maybe when calculating this value, other threads change the value because there is no synchronization lock, so other threads can change the value.
  • Assigning the value of value+1 to value: Using the CAS function, if false is returned, it means that the shared variable is modified by other threads before the current thread reads the value to call the CAS function method, then the result value of value+1 is not what we want, because we need to recalculate it.

3.2.3 getAndAddInt method

     public final int getAndAddInt(Object obj, long valueOffset, int var) {
        int expect;
        // Use the loop until the update succeeds.
        do {
            // Get the latest value
            expect = this.getIntVolatile(obj, valueOffset);
            // expect + var represents the value that needs to be updated, and if compareAndSwapInt returns false, the value value is changed by other threads.
            // Then try again, get the latest value expect again, and then calculate the value expect + var that needs to be updated. Until the update is successful
        } while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + var));

        // Returns the original value of the value variable of the current thread after the successful change of value. Not the changed value
        return expect;
    }

In the Unsafe class, this method uses do_while loop to calculate the updated value by using the current value, and then sets the value variable by compareAndSwapInt method. If the compareAndSwapInt method fails to return, it means that the value of the value variable has been changed by other threads, so the loop gets the latest value of the value variable, and then sets the value variable by compareAndSwapInt method. Until the setup is successful. Jump out of the loop and return the value before the update.

    // Add 1 to the current value of value and return the current value
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    // Add - 1 to the current value of value and return the current value
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }

   
    // Add delta to the current value of value and return the current value
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    
    // Add 1 to the current value of value and return the updated value (i.e., the current value plus 1)
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

    // Add - 1 to the current value of value and return the updated value (that is, add - 1 to the current value)
    public final int decrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
    }

    // Add delta to the current value of value and return the updated value (i.e., current value plus delta)
    public final int addAndGet(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
    }

They are all implemented using unsafe.getAndAddInt method.

Important examples

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

class Data {
    AtomicInteger num;

    public Data(int num) {
        this.num = new AtomicInteger(num);
    }

    public int getAndDecrement() {
        return num.getAndDecrement();
    }
}

class MyRun implements Runnable {

    private Data data;
    /**
     * Number used to record all tickets sold
     */
    private List<Integer> list;
    private CountDownLatch latch;

    public MyRun(Data data, List<Integer> list, CountDownLatch latch) {
        this.data = data;
        this.list = list;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            action();
        }  finally {
            // Release latch shared lock
            latch.countDown();
        }
    }

    /**
     * Buy tickets. Note that data. num > 0 is not used as a criterion until the thread is sold out.
     * This will lead to the use of shared variable data.num in both places, so when doing multi-threaded synchronization, more conditions need to be considered.
     * Here the for loop is only five times, which means that each thread only sells five tickets and stores all the sales numbers in the list collection.
     */
    public void action() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int newNum = data.getAndDecrement();

            System.out.println("thread"+Thread.currentThread().getName()+"  num=="+newNum);
            list.add(newNum);
        }
    }
}

public class ThreadTest {
    public static void startThread(Data data, String name, List<Integer> list,CountDownLatch latch) {
        Thread t = new Thread(new MyRun(data, list, latch), name);
        t.start();
    }

    public static void main(String[] args) {
        // Use CountDownLatch to let the main thread wait until all the sub-threads have finished executing
        CountDownLatch latch = new CountDownLatch(6);

        long start = System.currentTimeMillis();
        // Here we use concurrent list collections
        List<Integer> list = new CopyOnWriteArrayList<>();
        Data data = new Data(30);
        startThread(data, "t1", list, latch);
        startThread(data, "t2", list, latch);
        startThread(data, "t3", list, latch);
        startThread(data, "t4", list, latch);
        startThread(data, "t5", list, latch);
        startThread(data, "t6", list, latch);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Processing list collections, sorting and flipping
        Collections.sort(list);
        Collections.reverse(list);
        System.out.println(list);

        long time = System.currentTimeMillis() - start;
        // Output total time spent
        System.out.println("\n Main thread termination time=="+time);
    }
}

Result output

Thread T2 num==30
Thread T1 num==25
Thread T5 num==29
Thread T6 num==26
Thread T4 num==28
Thread T3 num==27
Thread T4 num==24
Thread T2 num==19
Thread T1 num==20
Thread T3 num==22
Thread T5 num==21
Thread T6 num==23
Thread T5 num==17
Thread T1 num==14
Thread T6 num==13
Thread T3 num==15
Thread T2 num==18
Thread T4 num==16
Thread T4 num==10
Thread T1 num==7
Thread T6 num==12
Thread T3 num==8
Thread T2 num==9
Thread T5 num==11
Thread T5 num==6
Thread T1 num==1
Thread T6 num==2
Thread T2 num==3
Thread T4 num==4
Thread T3 num==5
[30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]

Main thread termination time==57

We use Atomic Integer instead of synchronous locks to solve multithreaded security.

Keywords: Programming Java

Added by Smruthi on Wed, 29 May 2019 12:23:09 +0300