5. Lockless Shared Mode (top), CAS, Atomics

Contents of this Chapter

  • CAS and volatile
  • Atomic Integer
  • Atomic Reference
  • Atomic accumulator
  • Unsafe

1. Questions raised

There are requirements below to ensure accont. Thread security for withdraw withdraw method

public interface Account {

    // Get Balance
    Integer getBalance();

    // Withdraw money
    void withdraw(Integer amount);

    /**
     * If 1000 threads are started within the method, each thread will do -10
     * If the initial balance is 10,000, the correct result should be 0
     *
     * @param account
     */
    static void demo(Account account) {
        List<Thread> threads = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            threads.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance() + " cost :" + (end - start) / 1000_000 + "ms");
    }

}

The original implementation is not thread safe

public class AccountUnsafe implements Account{

    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return this.balance;
    }

    @Override
    public void withdraw(Integer amount) {
        balance -= amount;
    }
}

test

Account.demo(new AccountUnsafe(10000));

output

90 cost :185ms

Why is it unsafe

withdraw method

public void withdraw(Integer amount) {
 	balance -= amount;
}

Corresponding byte code

ALOAD 0 // <- this
ALOAD 0
GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance
INVOKEVIRTUAL java/lang/Integer.intValue ()I // Unpacking
ALOAD 1 // <- amount
INVOKEVIRTUAL java/lang/Integer.intValue ()I // Unpacking
ISUB // subtraction
INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // Result packing
PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance

Multithreaded Execution Process

ALOAD 0 // thread-0 <- this 
ALOAD 0 
GETFIELD cn/itcast/AccountUnsafe.balance // thread-0 <- this.balance 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 Unboxing
ALOAD 1 // thread-0 <- amount 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 Unboxing
ISUB // thread-0 subtraction
INVOKESTATIC java/lang/Integer.valueOf // thread-0 result boxing
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-0 -> this.balance 
 
 
ALOAD 0 // thread-1 <- this 
ALOAD 0 
GETFIELD cn/itcast/AccountUnsafe.balance // thread-1 <- this.balance 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 Unboxing
ALOAD 1 // thread-1 <- amount 
INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 Unboxing
ISUB // thread-1 subtraction
INVOKESTATIC java/lang/Integer.valueOf // thread-1 result boxing
PUTFIELD cn/itcast/AccountUnsafe.balance // thread-1 -> this.balance
  • Mononuclear interleaved instructions
  • Multi-core instruction staggering

Solution Idea-Lock

The first thought was to lock the Account object

public class AccountUnsafe implements Account{

    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public synchronized Integer getBalance() {
        return this.balance;
    }

    @Override
    public synchronized void withdraw(Integer amount) {
        balance -= amount;
    }
}

output

0 cost :231ms

Solution ideas - Unlock

public class AccountSafe implements Account{

    private AtomicInteger balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(Integer amount) {
        while (true) {
            int prev = balance.get();
            int next = prev - amount;
            if (balance.compareAndSet(prev, next))
                break;
        }
        // Can be simplified to the following methods
//        balance.addAndGet(-1 * amount);
    }
}

test

Account.demo(new AccountSafe(10000));

output

0 cost :177ms

2. CAS and volatile

The AtomicInteger solution you saw earlier does not use locks internally to secure threads that share variables. So how does it work?

public void withdraw(Integer amount) {
        // Keep trying
        while (true) {
            // Get old value 1000
            int prev = balance.get();
            // On this basis 1000-10=990
            int next = prev - amount;
            /**
             * compareAndSet For this check, compare the prev with the current value before set ting
             * - Inconsistent, next invalidated, return false indicates failure
             *      For example, other threads have subtracted and the current value has been reduced to 990
             *      Then this 990 of this thread will be aborted, enter while next cycle and try again
             * - Consistently, with next as the new value, returning true indicates success
             */
            if (balance.compareAndSet(prev, next))
                break;
        }
        // Can be simplified to the following methods
//        balance.addAndGet(-1 * amount);
    }

The key is compareAndSet, which is referred to as CAS (also known as Compare And Swap), which must be an atomic operation.

Be careful
In fact, the lower level of CAS is the lock cmpxchg directive (X86 architecture), which can guarantee the atomicity of Compare-Exchange under both single-core and multi-core CPU s.

  • In a multicore state, when a core executes an instruction with lock, the CPU locks the bus. When the core executes the instruction, the bus is opened again. This process will not be interrupted by the thread's scheduling mechanism, which ensures the accuracy of memory operations by multiple threads and is atomic.

Slow motion analysis

@Slf4j(topic = "c.SlowMotion")
public class SlowMotion {

    public static void main(String[] args) {
        AtomicInteger balance = new AtomicInteger(10000);
        int mainPrev = balance.get();
        log.debug("try get : {}", mainPrev);
        new Thread(() -> {
            sleep(1000);
            int prev = balance.get();
            balance.compareAndSet(prev, 9000);
            log.debug(balance.toString());
        }, "t1").start();
        sleep(2000);
        log.debug("try set 8000");
        boolean isSuccess = balance.compareAndSet(mainPrev, 8000);
        log.debug("is Success? {}" , isSuccess);
        if (!isSuccess) {
            mainPrev = balance.get();
            log.debug("try set 8000");
             isSuccess = balance.compareAndSet(mainPrev, 8000);
            log.debug("is Success? {}" , isSuccess);
        }
    }
    
    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

output

2022/03/09-02:14:37.774 [main] c.SlowMotion - try get : 10000
2022/03/09-02:14:38.842 [t1] c.SlowMotion - 9000
2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000
2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? false
2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000
2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? true

volatile

When you get a shared variable, you need to modify it with volatile to ensure its visibility.

It can be used to modify member variables and static member variables. It prevents threads from always finding the value of a variable from their working cache. It must get its value from main memory. Thread-operated volatile variables are all direct-operated main memory. That is, one thread's modifications to the volatile variable are visible to another thread.

Be careful
vloatile only guarantees the visibility of shared variables, allowing other threads to see the latest values, but it does not solve the interleaving of instructions (atomicity is not guaranteed)

CAS must use volatile to read the latest values of shared variables [the effect of comparison and exchange]

Why is unlock-free efficiency high

  • Without locks, even if the retry fails, the thread always runs at a high speed without pausing, and synchronized causes the thread to switch contexts and enter blocking when it does not get a lock. Make a figure of speech
  • Threads are like racing cars on a high-speed runway, running at high speeds and at super-fast speeds. Once context switching occurs, they are like trains slowing down and extinguishing. When awakened, they are fired again, started, accelerated, and resumed at high speeds at a higher cost.
  • But without locks, because threads need extra CPU support to keep running, the CPU is like a high-speed runway here, there is no extra runway, and it is impossible for a thread to run at a high speed. Although it will not enter the blockage, it will still enter the runnable state because there is no time slice, or it will cause context switching.

Characteristics of CAS

Combining CAS with volatile can achieve zero concurrency and is suitable for scenarios with fewer threads and multiple core CPU s.

  • CAS is based on the idea of optimistic locks: the most optimistic estimate, not afraid of other threads to modify shared variables, it's okay to change them in time, I suffer a little and try again.
  • Synnized is based on the idea of pessimistic locks: the most pessimistic estimate is that you have to guard against other threads modifying shared variables. You don't want to change if I lock you. When I do, you have a chance
  • CAS implies lock-free concurrency and block-free concurrency. Think carefully about what these two words mean.
    • Since synchronized is not used, threads do not get stuck, which is one factor in efficiency gains
    • However, if competition is fierce, it is conceivable that retries will occur frequently, but efficiency will be affected

3. Atomic Integer

JUC concurrent packages are available

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

Take AtomicInteger as an example

public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);

        // i++ like
        System.out.println(i.getAndIncrement());

        // Like++ i
        System.out.println(i.incrementAndGet());

        // Similar--i
        System.out.println(i.decrementAndGet());

        // Similar i--
        System.out.println(i.getAndDecrement());

        // Get results and add values
        System.out.println(i.getAndAdd(5));

        // Add value and get results
        System.out.println(i.addAndGet(-5));

        // Gets and updates the value, p being the primary value of i
        System.out.println(i.getAndUpdate(p -> p - 2));

        // Update and get
        System.out.println(i.updateAndGet(p -> p + 2));

        // Get and calculate
        System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

        // Calculate and get
        System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
        
    }

4. Atomic Reference

Why use atomic reference types?

  • AtomicReference
  • AtomICMarkableReference
  • AtomicStampedReference

There are the following methods

public interface DecimalAccount {

    BigDecimal getBalance();

    // Withdraw money
    void withdraw(BigDecimal amount);

    /**
     * If 1000 threads are started within the method, each thread will do -10
     * If the initial balance is 10,000, the correct result should be 0
     *
     * @param account
     */
    static void demo(DecimalAccount account) {
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            threads.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }

}

Try providing different DecimalAccount implementations for secure withdrawals

Unsafe implementation

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;

    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
}

Security implementation - using locks

class DecimalAccountSafeLock implements DecimalAccount {
    private final Object lock = new Object();
    BigDecimal balance;

    public DecimalAccountSafeLock(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        synchronized (lock) {
            BigDecimal balance = this.getBalance();
            this.balance = balance.subtract(amount);
        }
    }
}

Security implementation - using CAS

public class DecimalAccountSafeCas implements DecimalAccount {

    AtomicReference<BigDecimal> balance;

    public DecimalAccountSafeCas(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return this.balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = this.balance.get();
            BigDecimal next = prev.subtract(prev);
            if (balance.compareAndSet(prev, next))
                break;
        }
    }
}

test

DecimalAccount.demo(new DecimalAccountUnsafe(BigDecimal.valueOf(10000)));
        DecimalAccount.demo(new DecimalAccountSafeLock(BigDecimal.valueOf(10000)));
        DecimalAccount.demo(new DecimalAccountSafeCas(BigDecimal.valueOf(10000)));

output

340
0
0

ABA Problems and Solutions

	static AtomicReference<String> ref = new AtomicReference<>("A");

    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // Get Value A
        // Has this shared variable been modified by its thread?
        String prev = ref.get();
        other();
        Thread.sleep(1000);
        // Try changing to C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
    }

    private static void other() throws InterruptedException {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
        }, "t1").start();
        Thread.sleep(500);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
        }, "t2").start();
    }

output

2022/03/09-23:27:35.350 [main] c.Test4 - main start...
2022/03/09-23:27:35.384 [t1] c.Test4 - change A->B true
2022/03/09-23:27:35.889 [t2] c.Test4 - change B->A true
2022/03/09-23:27:36.903 [main] c.Test4 - change A->C true

The main thread can only judge that the value of the shared variable is the same as the initial value A, and cannot perceive that A changes to B and back to A if the main thread wishes:

As long as there are other threads sharing variables, even if their own cas fails, then comparing values is not enough, you need to add a version number

AtomicStampedReference

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // Get Value A
        // Has this shared variable been modified by its thread?
        String prev = ref.getReference();
        Integer stamp = ref.getStamp();
        other();
        Thread.sleep(1000);
        // Try changing to C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
    }

    private static void other() throws InterruptedException {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
        }, "t1").start();
        Thread.sleep(500);
        new Thread(() -> {
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
        }, "t2").start();
    }

output

2022/03/09-23:32:21.802 [main] c.Test4 - main start...
2022/03/09-23:32:21.842 [t1] c.Test4 - change A->B true
2022/03/09-23:32:22.345 [t2] c.Test4 - change B->A true
2022/03/09-23:32:23.348 [main] c.Test4 - change A->C false

AtomicStampedReference allows you to add a version number to an atomic reference to track the entire process of changing the atomic reference, such as A -> B -> A -> C. From AtomicStampedReference, we know that the reference variable has been modified several times in the middle.

Sometimes, however, you don't care how many times the variable has changed, just whether or not it has changed, so there is AtomicMarkableReference


AtomicMarkableReference

public class GarbageBag {
    String desc;

    public GarbageBag(String desc) {
        this.desc = desc;
    }


    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "GarbageBag{" +
                "desc='" + desc + '\'' +
                '}';
    }
}
public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("Fill with garbage");
        // Parameter 2 mark can be used as a marker to indicate that the garbage bag is full
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);

        log.debug("Main Thread start");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());

        new Thread(() -> {
            log.debug("Cleaning the hygiene thread...");
            bag.setDesc("Empty the trash bag");
            while (!ref.compareAndSet(bag, bag, true, false)) {
            }
            log.debug(bag.toString());
        }).start();

        Thread.sleep(1000);
        log.debug("The main thread wants a new trash bag");
        boolean isSuccess = ref.compareAndSet(prev, new GarbageBag("Empty dustbag"), true, false);
        log.debug("Have you changed it?{}", isSuccess);

        log.debug(ref.getReference().toString());

    }

output

2022/03/09-23:47:54.873 [main] c.Test5 - Main Thread start
2022/03/09-23:47:54.874 [main] c.Test5 - GarbageBag{desc='Fill with garbage'}
2022/03/09-23:47:54.916 [Thread-0] c.Test5 - Cleaning the hygiene thread...
2022/03/09-23:47:54.916 [Thread-0] c.Test5 - GarbageBag{desc='Empty the trash bag'}
2022/03/09-23:47:55.922 [main] c.Test5 - The main thread wants a new trash bag
2022/03/09-23:47:55.922 [main] c.Test5 - Have you changed it? false
2022/03/09-23:47:55.923 [main] c.Test5 - GarbageBag{desc='Empty the trash bag'}

Process finished with exit code 0

5. Atomic Array

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

There are the following methods

/**
     * @param arraySupplier Provides an array, which can be a thread-insecure array or a thread-safe array
     * @param lengthFun     How to get the length of the array
     * @param putConsumer   Self-incrementing method, returning array index
     * @param printConsumer Print Array Method
     * @param <T>
     */
    private static <T> void demo(Supplier<T> arraySupplier,
                                 Function<T, Integer> lengthFun,
                                 BiConsumer<T, Integer> putConsumer,
                                 Consumer<T> printConsumer) {
        List<Thread> threads = new ArrayList<>();
        T array = arraySupplier.get();
        int length = lengthFun.apply(array);
        for (int i = 0; i < 1000; i++) {
            threads.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putConsumer.accept(array, j % length);
                }
            }));
        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printConsumer.accept(array);
    }

Unsafe Array

demo(() -> new int[10],
                array -> array.length,
                (array, index) -> array[index]++,
                array -> System.out.println(Arrays.toString(array)));

output

[979287, 979029, 978970, 978921, 978919, 978832, 979217, 979376, 979381, 979636]

Secure Array

demo(() -> new AtomicIntegerArray(10),
                array -> array.length(),
                (array, index) -> array.getAndIncrement(index),
                array -> System.out.println(array));

output

[1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000]

6. Field Updater

  • AtomicReferenceFieldUpdater //Domain, Field
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

The Field updater allows atomic manipulation of a Field of an object and can only be used with volatile-modified keywords, otherwise an exception will occur

Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
	at java.util.concurrent.atomic.AtomicIntegerFieldUpdater$AtomicIntegerFieldUpdaterImpl.<init>(AtomicIntegerFieldUpdater.java:412)
	at java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdater.java:88)
	at nolock.Test7.main(Test7.java:16)
private volatile int field;

    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test7.class, "field");
        Test7 test7 = new Test7();
        fieldUpdater.compareAndSet(test7, 0, 10);
        System.out.println(test7.field);
        fieldUpdater.compareAndSet(test7, 10, 20);
        System.out.println(test7.field);
        fieldUpdater.compareAndSet(test7, 10, 30);
        System.out.println(test7.field);
    }

output

10
20
20

Keywords: Java Multithreading Concurrent Programming cas

Added by Azazeal on Wed, 09 Mar 2022 19:34:20 +0200