JAVA concurrency cornerstone - CAS Principle Practice

This article is based on Teacher Liu's video: Java concurrency cornerstone - CAS Principle Practice

Before learning CAS, start with a simple case, and then lead to the basic use of CAS.

CAS based site counters

Requirements: when we develop a website, we need to count the number of visits. Every time a user sends a request, the number of visits + 1. How to achieve this?

We simulate that 100 people visit our website at the same time, and each person initiates 10 requests for our website. The final total number of visits should be 1000

1.1. Website access statistics Demo

/**
 * @author wcc
 * @date 2022/1/6 13:45
 */
public class Demo {

  // Total visits
  static int count = 0;
    
  // Method of simulating access
  public static void request() throws Exception{
    //  The simulation takes 5 ms
    TimeUnit.MILLISECONDS.sleep(5);
    count ++;
  }

  public static void main(String[] args) throws InterruptedException {
    // Opening time
    long startTime = System.currentTimeMillis();
    int threadSize = 100; // Number of simulated users
    CountDownLatch countDownLatch = new CountDownLatch(threadSize);

    for (int i = 0; i < threadSize; i++) {
      Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            // Simulate user behavior, and each user visits the website 10 times
            for (int j = 0; j < 10; j++) {
              request();
            }
          }catch (Exception e){
            e.printStackTrace();
          }finally {
            countDownLatch.countDown();
          }
        }
      });
      thread.start();
    }

    // How to ensure that the following code is executed after 100 threads are executed?
    countDownLatch.await();
    long endTime = System.currentTimeMillis();

    System.out.println(Thread.currentThread().getName() + ": Time consuming:" + (endTime - startTime) + ",count:" + count);
  }
}

Here is a brief introduction to CountDownLatch, and then we will update its source code analysis:

  • CountDownLatch concept:
    • CountDownLatch is a synchronization tool class used to coordinate synchronization between multiple threads or to talk about communication between threads (rather than being used as mutual exclusion).
    • CountDownLatch enables a thread to wait for other threads to complete their work before continuing execution. Implemented using a counter. The initial value of the counter is the number of threads. When each thread completes its task, the value of the counter will be reduced by one. When the counter value is 0, it means that all threads have completed the task, and then the threads waiting on CountDownLatch can resume executing the task.
  • Usage of CountDownLatch:
    • Typical usage of CountDownLatch 1: wait for n threads to finish executing before a thread starts running again. Initialize the counter of CountDownLatch as N - > New CountDownLatch (n). Whenever a task thread finishes executing, decrease the counter by one - > CountDownLatch Countdown(), when the value of the counter changes to 0, in CountDownLatch The thread after await () will be awakened. A typical application scenario is that when starting a service, the main thread needs to wait for multiple components to load before continuing to execute.
    • CountDownLatch typical usage 2: realize the maximum parallelism of multiple threads starting to execute tasks. Note that parallelism, not concurrency, emphasizes that multiple threads start executing at the same time at a certain time. Similar to a race, multiple threads are placed at the starting point, waiting for the starting gun to sound, and then start running at the same time. The method is to initialize a shared CountDownLatch(1) and initialize its counter to 1. Before multiple threads start executing tasks, first CountDownLatch Await(). When the main thread calls countDown(), the counter changes to 0 and multiple threads are awakened at the same time.

The execution result of the above case code is shown in the figure below:

As shown in the figure, we theoretically enabled 100 threads to simulate users, and each thread simulated access 10 times. The final count should be 1000, but no matter how we test, the final count is almost less than 1000.

Cause analysis

/**
 * Q: Analyze the problem:
 * A: count++ The operation is actually completed by three steps
 *  1.Get the value of count and record it as A: A=count
 *  2.Take A value + 1 to get B, B=A+1
 *  3.Assign B value to count
 *
 * If two threads A and B execute count + + at the same time, they execute the first step of the above steps at the same time, and the count obtained is the same
 * After the three-step operation is completed, the count is only increased by 1, resulting in incorrect count results
 *
 * Q: How to solve the problem of incorrect results:
 * A: When operating count + +, we queue multiple threads for processing, and multiple threads arrive at the request() method at the same time
 * Only one thread can be allowed to operate. Other threads will wait outside. After the internal threads are processed, the external threads will start competing for the method
 * Execution right. The count + + of such operation is queued, and the result must be correct
 *
 * Q: How to achieve queuing effect?
 * A: JAVA The synchronized keyword (locking) and ReentrantLocak in can lock resources to ensure the correctness of concurrency
 * In the case of multithreading, the locked resources can be accessed serially
 */

1.2. Use Synchronized keyword to improve Demo case

The improvement code is as follows:

/**
 * @author wcc
 * @date 2022/1/6 13:45
 */
public class Demo {

  // Total visits
  static int count = 0;
    
  // Method of simulating access
  public synchronized static void request() throws Exception{
    //  The simulation takes 5 ms
    TimeUnit.MILLISECONDS.sleep(5);
    count ++;
  }

  public static void main(String[] args) throws InterruptedException {
    // Opening time
    long startTime = System.currentTimeMillis();
    int threadSize = 100; // Number of simulated users
    CountDownLatch countDownLatch = new CountDownLatch(threadSize);

    for (int i = 0; i < threadSize; i++) {
      Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            // Simulate user behavior, and each user visits the website 10 times
            for (int j = 0; j < 10; j++) {
              request();
            }
          }catch (Exception e){
            e.printStackTrace();
          }finally {
            countDownLatch.countDown();
          }
        }
      });
      thread.start();
    }

    // How to ensure that the following code is executed after 100 threads are executed?
    countDownLatch.await();
    long endTime = System.currentTimeMillis();

    System.out.println(Thread.currentThread().getName() + ": Time consuming:" + (endTime - startTime) + ",count:" + count);
  }
}

The execution case code results are as follows:

It can be seen that the result is correct when we want to get 1000 total visits. However, when we add the synchronized keyword to the request() method, the thread execution efficiency is seriously reduced compared with that without locking because the method is locked!

Cause analysis

/**
* Q: What is the reason for taking too long?
* A: The request() method in the program is decorated with the synchronized keyword to ensure that the request method can be used at the same time in the case of concurrency
* Only one thread is allowed to enter. Locking the request method is equivalent to serial execution. The result of count is consistent with our expectation, but it takes too long
*
* Q: How to solve problems that take too long:
* A: count++ The operation is actually completed by three steps
*     1.Get the value of count and record it as A: A=count
*     2.Take A value + 1 to get B, B=A+1
*     3.Assign B value to count
*  Implementation of step 3 of upgrade:
*    1.Acquire lock
*    2.Get the latest value of count and record it as LV
*    3.Judge whether the value of LV is equal to A. if it is equal, assign the value of B to count and return true. Otherwise, return false
*    4.Release lock
*/

1.3. Reduce the locking scope and improve the Demo case again

package cas;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author wcc
 * @date 2022/1/6 13:45
 */
public class Demo01 {

  // Total visits
  // Adding the volatile keyword means that every time you get the count value, you have to get it from the main memory instead of getting the count value cached in the current thread
  volatile static int count = 0;

  // Method of simulating access
  public static void request() throws Exception{
    //  The simulation takes 5 ms
    TimeUnit.MILLISECONDS.sleep(5);

    int expectCount; // Expected value: indicates the value of count when the thread enters the method
    while (!compareAndSwap((expectCount = getCount()),expectCount + 1)){
    }
  }

  /**
   * @param expectedCount expected value
   * @param newCount The new value that needs to be assigned to count
   * @return Return true for success and false for failure
   */
  public static synchronized boolean compareAndSwap(int expectedCount, int newCount){
    // Judge whether the current value of count is consistent with the expected expected count. If so, assign newCount to count
    if(getCount() == expectedCount){
      count = newCount;
      return true;
    }
    return false;
  }

  public static int getCount(){
    return count;
  }

  public static void main(String[] args) throws InterruptedException {
    // Opening time
    long startTime = System.currentTimeMillis();
    int threadSize = 100; // Number of simulated users
    CountDownLatch countDownLatch = new CountDownLatch(threadSize);

    for (int i = 0; i < threadSize; i++) {
      Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            // Simulate user behavior, and each user visits the website 10 times
            for (int j = 0; j < 10; j++) {
              request();
            }
          }catch (Exception e){
            e.printStackTrace();
          }finally {
            countDownLatch.countDown();
          }
        }
      });
      thread.start();
    }

    // How to ensure that the following code is executed after 100 threads are executed?
    countDownLatch.await();
    long endTime = System.currentTimeMillis();

    System.out.println(Thread.currentThread().getName() + ": Time consuming:" + (endTime - startTime) + ",count:" + count);
  }

}

The execution results of the case code are as follows:

You can see that in this way, you can not only achieve the desired results of website visits, but also have high efficiency!

This thread safe way of comparing and exchanging can be called CAS:

/**
 * @param expectedCount expected value
 * @param newCount The new value that needs to be assigned to count
 * @return Return true for success and false for failure
 */
public static synchronized boolean compareAndSwap(int expectedCount, int newCount){
  // Judge whether the current value of count is consistent with the expected expected count. If so, assign newCount to count
  if(getCount() == expectedCount){
    count = newCount;
    return true;
  }
  return false;
}

2. Introduction and implementation principle of CAS

  • The full name of CAS is "CompareAndSwap", which translates into "compare and replace" in Chinese.
  • definition:
    • The CAS operation contains three operands -- memory location (V), expected value (A), and new value (B).
    • If the value of the memory location matches the expected value, the processor automatically updates the value of the location to the new value. Otherwise, the processor does nothing.
    • In either case, it will return the value of this location before the CAS instruction (CAS only returns whether CAS is successful in some special cases, without extracting the current value)
    • CAS effectively states that "I think position V should contain value A; if it contains this value, put B in this position; otherwise, don't change the value of this position, just tell me the current value of this position."

2.1 CAS in JAVA

JAVA provides support for CAS operations, specifically in sun misc. In the unsafe class, the declaration is as follows:

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
  • Parameter var1: indicates the object to operate on
  • Parameter var2: indicates the offset of the attribute address in the object to be manipulated
  • Parameter var4: indicates the expected value of the data to be modified
  • Parameter var5: indicates the new value to be modified to

What is the implementation principle of CAS?

CAS in JAVA is realized by calling JNI code. JNI: JAVA NATIVE INTERFACE allows JAVA to call other languages. The method of compareAndSwapXXX series is realized by calling the bottom instruction of CPU with the help of C language.

For the commonly used Intel X86 platform, the instruction finally mapped to the CPU is cmpxchg, which is an atomic instruction. When the CPU executes this command, it realizes the operation of comparison and replacement!

Modern computers have hundreds of cores. cmpxchg how to ensure thread safety under multi-core?

When performing CAS operation at the bottom of the system, it will judge whether the current system is a multi-core system. If so, it will lock the bus. Only one thread will lock the bus successfully. After locking successfully, CAS operation will be executed, that is, the atomicity of CAS is platform level.

2.2. CAS may also have some problems

2.2.1 ABA problem (civet cat for Prince)

CAS needs to check whether the value has changed at the time of operation value. If it has not changed, it will be updated. However, if A value was originally A, which was changed to B by other threads before CAS method execution, and then changed back to A, CAS method will find whether its value has changed at the time of inspection, but it has actually changed. This is the ABA problem of CAS.

Use the program to simulate ABA problems:

/**
 * @author wcc
 * @date 2022/1/6 15:58
 */
public class CasABA {

  public static AtomicInteger a = new AtomicInteger(1);

  public static void main(String[] args) {
    Thread main = new Thread(new Runnable() {
      @Override
      public void run() {
        System.out.println("Operation thread:"+ Thread.currentThread().getName()+",Initial value:"+a.get());
        try {
          int expectNum = a.get(); // expected value
          int newNum = expectNum + 1; // New value
          Thread.sleep(1000); // The main thread sleeps for 1 second and gives up the CPU
          boolean isCASSuccess = a.compareAndSet(expectNum, newNum);
          System.out.println("Operation thread:"+Thread.currentThread().getName()+",CAS Operation:"+isCASSuccess);
        }catch (Exception e){
          e.printStackTrace();
        }
      }
    },"Main thread");

    Thread other = new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(20); // Ensure that the main thread takes precedence
          a.incrementAndGet(); // a+1 a=2
          System.out.println("Operation thread:"+Thread.currentThread().getName()+",[increment],value="+a.get());
          a.decrementAndGet(); // a-1 a=1
          System.out.println("Operation thread:"+Thread.currentThread().getName()+",[decrement],value="+a.get());
        }catch (Exception e){
          e.printStackTrace();
        }
      }
    },"Interference thread");

    main.start();
    other.start();
  }
}

The output results are as follows:

In the result, before the main thread "compares and exchanges", the interfering thread first changes the value of a to 2, then changes it back to 1, and then executes the CAS operation of the main thread!

2.2.2 how to solve ABA problem?

The simplest solution to ABA problem is to add a version number to the value. Each time the value changes, its version number will be modified. CAS will compare this version number during operation.

The solution of ABA in JAVA (AtomicStampedReference) is similar to optimistic locking, that is: * * CAS exchange is controlled by the current version number. If the current version number is equal to the expected version number, the exchange can be carried out. Otherwise, the exchange cannot be carried out. Each exchange of the current version number is + 1 * *

AtomicStampedReference mainly contains an object reference and a Pair object with an integer stamped version number that can be automatically updated to solve the ABA problem.

private static class Pair<T> {
    final T reference; // Reference to the current object
    final int stamp;   // Version number
    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

private volatile Pair<V> pair;

Compareandset() method in AtomicStampedReference:

/**
 * Atomically sets the value of both the reference and stamp
 * to the given update values if the
 * current reference is {@code ==} to the expected reference
 * and the current stamp is equal to the expected stamp.
 *
 * @param expectedReference the expected value of the reference  Expected reference
 * @param newReference the new value for the reference           New value reference
 * @param expectedStamp the expected value of the stamp          Expected reference version number
 * @param newStamp the new value for the stamp                   The version number referenced by the new value
 * @return {@code true} if successful
 */
public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair; // The referenced object and version number are encapsulated in the Pair class
    return
        expectedReference == current.reference && // The expected reference is consistent with the current reference
        expectedStamp == current.stamp &&         // The expected version is consistent with the current version

        ((newReference == current.reference &&    //
          newStamp == current.stamp)
            ||
         casPair(current, Pair.of(newReference, newStamp)));
}

private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

Use AtomicStampedReference to solve ABA problems:

/**
 * @author wcc
 * @date 2022/1/6 15:58
 */
public class CasABADemo02 {

  public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1),1);

  public static void main(String[] args) {
    Thread main = new Thread(new Runnable() {
      @Override
      public void run() {
        System.out.println("Operation thread:"+Thread.currentThread().getName()+",[increment],Reference value="+a.getReference()
            +",Version number="+a.getStamp());
        try {
          Integer expectReference = a.getReference(); // Expected reference value
          Integer newReference = expectReference + 1; // New reference value
          Integer expectStamp = a.getStamp(); // Expected version number
          Integer newStamp = a.getStamp(); // New version number
          Thread.sleep(1000); // The main thread sleeps for 1 second and gives up the CPU
          boolean isCASSuccess = a.compareAndSet(expectReference, newReference, expectStamp, newStamp);
          System.out.println("Operation thread:"+Thread.currentThread().getName()+",CAS Operation:"+isCASSuccess);
          System.out.println("End of operation:"+Thread.currentThread().getName()+",Reference value="+a.getReference()
              +",Version number="+a.getStamp());
        }catch (Exception e){
          e.printStackTrace();
        }
      }
    },"Main thread");

    Thread other = new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          Thread.sleep(20); // Ensure that the main thread takes precedence
          a.compareAndSet(a.getReference(), (a.getReference() + 1), a.getStamp(), (a.getStamp() + 1));
          System.out.println("Operation thread:"+Thread.currentThread().getName()+",[increment],Reference value="+a.getReference()
          +",Version number="+a.getStamp());
          a.compareAndSet(a.getReference(), (a.getReference() - 1), a.getStamp(), (a.getStamp() + 1)); // a-1 a=1
          System.out.println("Operation thread:"+Thread.currentThread().getName()+",[increment],Reference value="+a.getReference()
              +",Version number="+a.getStamp());
        }catch (Exception e){
          e.printStackTrace();
        }
      }
    },"Interference thread");

    main.start();
    other.start();
  }
}

The operation results are shown in the figure below:


At this time, the ABA problem is solved. If the beaver cat ABA occurs before the main thread performs CAS operation, the comparison and exchange cannot be performed at this time.

Keywords: Java Back-end

Added by xcmir on Thu, 06 Jan 2022 14:30:56 +0200