Contents of this Chapter
- CAS and volatile
- Atomic Integer
- Atomic Reference
- Atomic accumulator
- Unsafe
1. Questions raised
There are requirements below to ensure accont. Thread security for withdraw withdraw method
public interface Account { // Get Balance Integer getBalance(); // Withdraw money void withdraw(Integer amount); /** * If 1000 threads are started within the method, each thread will do -10 * If the initial balance is 10,000, the correct result should be 0 * * @param account */ static void demo(Account account) { List<Thread> threads = new ArrayList<>(); long start = System.nanoTime(); for (int i = 0; i < 1000; i++) { threads.add(new Thread(() -> { account.withdraw(10); })); } threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost :" + (end - start) / 1000_000 + "ms"); } }
The original implementation is not thread safe
public class AccountUnsafe implements Account{ private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public Integer getBalance() { return this.balance; } @Override public void withdraw(Integer amount) { balance -= amount; } }
test
Account.demo(new AccountUnsafe(10000));
output
90 cost :185ms
Why is it unsafe
withdraw method
public void withdraw(Integer amount) { balance -= amount; }
Corresponding byte code
ALOAD 0 // <- this ALOAD 0 GETFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // <- this.balance INVOKEVIRTUAL java/lang/Integer.intValue ()I // Unpacking ALOAD 1 // <- amount INVOKEVIRTUAL java/lang/Integer.intValue ()I // Unpacking ISUB // subtraction INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer; // Result packing PUTFIELD cn/itcast/AccountUnsafe.balance : Ljava/lang/Integer; // -> this.balance
Multithreaded Execution Process
ALOAD 0 // thread-0 <- this ALOAD 0 GETFIELD cn/itcast/AccountUnsafe.balance // thread-0 <- this.balance INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 Unboxing ALOAD 1 // thread-0 <- amount INVOKEVIRTUAL java/lang/Integer.intValue // thread-0 Unboxing ISUB // thread-0 subtraction INVOKESTATIC java/lang/Integer.valueOf // thread-0 result boxing PUTFIELD cn/itcast/AccountUnsafe.balance // thread-0 -> this.balance ALOAD 0 // thread-1 <- this ALOAD 0 GETFIELD cn/itcast/AccountUnsafe.balance // thread-1 <- this.balance INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 Unboxing ALOAD 1 // thread-1 <- amount INVOKEVIRTUAL java/lang/Integer.intValue // thread-1 Unboxing ISUB // thread-1 subtraction INVOKESTATIC java/lang/Integer.valueOf // thread-1 result boxing PUTFIELD cn/itcast/AccountUnsafe.balance // thread-1 -> this.balance
- Mononuclear interleaved instructions
- Multi-core instruction staggering
Solution Idea-Lock
The first thought was to lock the Account object
public class AccountUnsafe implements Account{ private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public synchronized Integer getBalance() { return this.balance; } @Override public synchronized void withdraw(Integer amount) { balance -= amount; } }
output
0 cost :231ms
Solution ideas - Unlock
public class AccountSafe implements Account{ private AtomicInteger balance; public AccountSafe(Integer balance) { this.balance = new AtomicInteger(balance); } @Override public Integer getBalance() { return balance.get(); } @Override public void withdraw(Integer amount) { while (true) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) break; } // Can be simplified to the following methods // balance.addAndGet(-1 * amount); } }
test
Account.demo(new AccountSafe(10000));
output
0 cost :177ms
2. CAS and volatile
The AtomicInteger solution you saw earlier does not use locks internally to secure threads that share variables. So how does it work?
public void withdraw(Integer amount) { // Keep trying while (true) { // Get old value 1000 int prev = balance.get(); // On this basis 1000-10=990 int next = prev - amount; /** * compareAndSet For this check, compare the prev with the current value before set ting * - Inconsistent, next invalidated, return false indicates failure * For example, other threads have subtracted and the current value has been reduced to 990 * Then this 990 of this thread will be aborted, enter while next cycle and try again * - Consistently, with next as the new value, returning true indicates success */ if (balance.compareAndSet(prev, next)) break; } // Can be simplified to the following methods // balance.addAndGet(-1 * amount); }
The key is compareAndSet, which is referred to as CAS (also known as Compare And Swap), which must be an atomic operation.
Be careful
In fact, the lower level of CAS is the lock cmpxchg directive (X86 architecture), which can guarantee the atomicity of Compare-Exchange under both single-core and multi-core CPU s.
- In a multicore state, when a core executes an instruction with lock, the CPU locks the bus. When the core executes the instruction, the bus is opened again. This process will not be interrupted by the thread's scheduling mechanism, which ensures the accuracy of memory operations by multiple threads and is atomic.
Slow motion analysis
@Slf4j(topic = "c.SlowMotion") public class SlowMotion { public static void main(String[] args) { AtomicInteger balance = new AtomicInteger(10000); int mainPrev = balance.get(); log.debug("try get : {}", mainPrev); new Thread(() -> { sleep(1000); int prev = balance.get(); balance.compareAndSet(prev, 9000); log.debug(balance.toString()); }, "t1").start(); sleep(2000); log.debug("try set 8000"); boolean isSuccess = balance.compareAndSet(mainPrev, 8000); log.debug("is Success? {}" , isSuccess); if (!isSuccess) { mainPrev = balance.get(); log.debug("try set 8000"); isSuccess = balance.compareAndSet(mainPrev, 8000); log.debug("is Success? {}" , isSuccess); } } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
output
2022/03/09-02:14:37.774 [main] c.SlowMotion - try get : 10000 2022/03/09-02:14:38.842 [t1] c.SlowMotion - 9000 2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000 2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? false 2022/03/09-02:14:39.843 [main] c.SlowMotion - try set 8000 2022/03/09-02:14:39.843 [main] c.SlowMotion - is Success? true
volatile
When you get a shared variable, you need to modify it with volatile to ensure its visibility.
It can be used to modify member variables and static member variables. It prevents threads from always finding the value of a variable from their working cache. It must get its value from main memory. Thread-operated volatile variables are all direct-operated main memory. That is, one thread's modifications to the volatile variable are visible to another thread.
Be careful
vloatile only guarantees the visibility of shared variables, allowing other threads to see the latest values, but it does not solve the interleaving of instructions (atomicity is not guaranteed)
CAS must use volatile to read the latest values of shared variables [the effect of comparison and exchange]
Why is unlock-free efficiency high
- Without locks, even if the retry fails, the thread always runs at a high speed without pausing, and synchronized causes the thread to switch contexts and enter blocking when it does not get a lock. Make a figure of speech
- Threads are like racing cars on a high-speed runway, running at high speeds and at super-fast speeds. Once context switching occurs, they are like trains slowing down and extinguishing. When awakened, they are fired again, started, accelerated, and resumed at high speeds at a higher cost.
- But without locks, because threads need extra CPU support to keep running, the CPU is like a high-speed runway here, there is no extra runway, and it is impossible for a thread to run at a high speed. Although it will not enter the blockage, it will still enter the runnable state because there is no time slice, or it will cause context switching.
Characteristics of CAS
Combining CAS with volatile can achieve zero concurrency and is suitable for scenarios with fewer threads and multiple core CPU s.
- CAS is based on the idea of optimistic locks: the most optimistic estimate, not afraid of other threads to modify shared variables, it's okay to change them in time, I suffer a little and try again.
- Synnized is based on the idea of pessimistic locks: the most pessimistic estimate is that you have to guard against other threads modifying shared variables. You don't want to change if I lock you. When I do, you have a chance
- CAS implies lock-free concurrency and block-free concurrency. Think carefully about what these two words mean.
- Since synchronized is not used, threads do not get stuck, which is one factor in efficiency gains
- However, if competition is fierce, it is conceivable that retries will occur frequently, but efficiency will be affected
3. Atomic Integer
JUC concurrent packages are available
- AtomicBoolean
- AtomicInteger
- AtomicLong
Take AtomicInteger as an example
public static void main(String[] args) { AtomicInteger i = new AtomicInteger(0); // i++ like System.out.println(i.getAndIncrement()); // Like++ i System.out.println(i.incrementAndGet()); // Similar--i System.out.println(i.decrementAndGet()); // Similar i-- System.out.println(i.getAndDecrement()); // Get results and add values System.out.println(i.getAndAdd(5)); // Add value and get results System.out.println(i.addAndGet(-5)); // Gets and updates the value, p being the primary value of i System.out.println(i.getAndUpdate(p -> p - 2)); // Update and get System.out.println(i.updateAndGet(p -> p + 2)); // Get and calculate System.out.println(i.getAndAccumulate(10, (p, x) -> p + x)); // Calculate and get System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x)); }
4. Atomic Reference
Why use atomic reference types?
- AtomicReference
- AtomICMarkableReference
- AtomicStampedReference
There are the following methods
public interface DecimalAccount { BigDecimal getBalance(); // Withdraw money void withdraw(BigDecimal amount); /** * If 1000 threads are started within the method, each thread will do -10 * If the initial balance is 10,000, the correct result should be 0 * * @param account */ static void demo(DecimalAccount account) { List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 1000; i++) { threads.add(new Thread(() -> { account.withdraw(BigDecimal.TEN); })); } threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(account.getBalance()); } }
Try providing different DecimalAccount implementations for secure withdrawals
Unsafe implementation
class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } @Override public void withdraw(BigDecimal amount) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); } }
Security implementation - using locks
class DecimalAccountSafeLock implements DecimalAccount { private final Object lock = new Object(); BigDecimal balance; public DecimalAccountSafeLock(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } @Override public void withdraw(BigDecimal amount) { synchronized (lock) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); } } }
Security implementation - using CAS
public class DecimalAccountSafeCas implements DecimalAccount { AtomicReference<BigDecimal> balance; public DecimalAccountSafeCas(BigDecimal balance) { this.balance = new AtomicReference<>(balance); } @Override public BigDecimal getBalance() { return this.balance.get(); } @Override public void withdraw(BigDecimal amount) { while (true) { BigDecimal prev = this.balance.get(); BigDecimal next = prev.subtract(prev); if (balance.compareAndSet(prev, next)) break; } } }
test
DecimalAccount.demo(new DecimalAccountUnsafe(BigDecimal.valueOf(10000))); DecimalAccount.demo(new DecimalAccountSafeLock(BigDecimal.valueOf(10000))); DecimalAccount.demo(new DecimalAccountSafeCas(BigDecimal.valueOf(10000)));
output
340 0 0
ABA Problems and Solutions
static AtomicReference<String> ref = new AtomicReference<>("A"); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // Get Value A // Has this shared variable been modified by its thread? String prev = ref.get(); other(); Thread.sleep(1000); // Try changing to C log.debug("change A->C {}", ref.compareAndSet(prev, "C")); } private static void other() throws InterruptedException { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B")); }, "t1").start(); Thread.sleep(500); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A")); }, "t2").start(); }
output
2022/03/09-23:27:35.350 [main] c.Test4 - main start... 2022/03/09-23:27:35.384 [t1] c.Test4 - change A->B true 2022/03/09-23:27:35.889 [t2] c.Test4 - change B->A true 2022/03/09-23:27:36.903 [main] c.Test4 - change A->C true
The main thread can only judge that the value of the shared variable is the same as the initial value A, and cannot perceive that A changes to B and back to A if the main thread wishes:
As long as there are other threads sharing variables, even if their own cas fails, then comparing values is not enough, you need to add a version number
AtomicStampedReference
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // Get Value A // Has this shared variable been modified by its thread? String prev = ref.getReference(); Integer stamp = ref.getStamp(); other(); Thread.sleep(1000); // Try changing to C log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1)); } private static void other() throws InterruptedException { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1)); }, "t1").start(); Thread.sleep(500); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1)); }, "t2").start(); }
output
2022/03/09-23:32:21.802 [main] c.Test4 - main start... 2022/03/09-23:32:21.842 [t1] c.Test4 - change A->B true 2022/03/09-23:32:22.345 [t2] c.Test4 - change B->A true 2022/03/09-23:32:23.348 [main] c.Test4 - change A->C false
AtomicStampedReference allows you to add a version number to an atomic reference to track the entire process of changing the atomic reference, such as A -> B -> A -> C. From AtomicStampedReference, we know that the reference variable has been modified several times in the middle.
Sometimes, however, you don't care how many times the variable has changed, just whether or not it has changed, so there is AtomicMarkableReference
AtomicMarkableReference
public class GarbageBag { String desc; public GarbageBag(String desc) { this.desc = desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "GarbageBag{" + "desc='" + desc + '\'' + '}'; } }
public static void main(String[] args) throws InterruptedException { GarbageBag bag = new GarbageBag("Fill with garbage"); // Parameter 2 mark can be used as a marker to indicate that the garbage bag is full AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true); log.debug("Main Thread start"); GarbageBag prev = ref.getReference(); log.debug(prev.toString()); new Thread(() -> { log.debug("Cleaning the hygiene thread..."); bag.setDesc("Empty the trash bag"); while (!ref.compareAndSet(bag, bag, true, false)) { } log.debug(bag.toString()); }).start(); Thread.sleep(1000); log.debug("The main thread wants a new trash bag"); boolean isSuccess = ref.compareAndSet(prev, new GarbageBag("Empty dustbag"), true, false); log.debug("Have you changed it?{}", isSuccess); log.debug(ref.getReference().toString()); }
output
2022/03/09-23:47:54.873 [main] c.Test5 - Main Thread start 2022/03/09-23:47:54.874 [main] c.Test5 - GarbageBag{desc='Fill with garbage'} 2022/03/09-23:47:54.916 [Thread-0] c.Test5 - Cleaning the hygiene thread... 2022/03/09-23:47:54.916 [Thread-0] c.Test5 - GarbageBag{desc='Empty the trash bag'} 2022/03/09-23:47:55.922 [main] c.Test5 - The main thread wants a new trash bag 2022/03/09-23:47:55.922 [main] c.Test5 - Have you changed it? false 2022/03/09-23:47:55.923 [main] c.Test5 - GarbageBag{desc='Empty the trash bag'} Process finished with exit code 0
5. Atomic Array
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
There are the following methods
/** * @param arraySupplier Provides an array, which can be a thread-insecure array or a thread-safe array * @param lengthFun How to get the length of the array * @param putConsumer Self-incrementing method, returning array index * @param printConsumer Print Array Method * @param <T> */ private static <T> void demo(Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer) { List<Thread> threads = new ArrayList<>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0; i < 1000; i++) { threads.add(new Thread(() -> { for (int j = 0; j < 10000; j++) { putConsumer.accept(array, j % length); } })); } threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); }
Unsafe Array
demo(() -> new int[10], array -> array.length, (array, index) -> array[index]++, array -> System.out.println(Arrays.toString(array)));
output
[979287, 979029, 978970, 978921, 978919, 978832, 979217, 979376, 979381, 979636]
Secure Array
demo(() -> new AtomicIntegerArray(10), array -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array));
output
[1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000]
6. Field Updater
- AtomicReferenceFieldUpdater //Domain, Field
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
The Field updater allows atomic manipulation of a Field of an object and can only be used with volatile-modified keywords, otherwise an exception will occur
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type at java.util.concurrent.atomic.AtomicIntegerFieldUpdater$AtomicIntegerFieldUpdaterImpl.<init>(AtomicIntegerFieldUpdater.java:412) at java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdater.java:88) at nolock.Test7.main(Test7.java:16)
private volatile int field; public static void main(String[] args) { AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test7.class, "field"); Test7 test7 = new Test7(); fieldUpdater.compareAndSet(test7, 0, 10); System.out.println(test7.field); fieldUpdater.compareAndSet(test7, 10, 20); System.out.println(test7.field); fieldUpdater.compareAndSet(test7, 10, 30); System.out.println(test7.field); }
output
10 20 20