1. Quote
There are the following requirements to ensure account Thread safety of withdraw withdrawal method
interface Account { /** * Get balance * * @return Integer balance */ Integer getBalance(); /** * withdraw money * * @param amount Withdrawal amount */ void withdraw(Integer amount); /** * 1000 threads will be started in the method, and each thread will do the operation of - 10 yuan * If the initial balance is 10000, the correct result should be 0 * * @param account Implementation class */ static void demo(Account account) { List<Thread> ts = new ArrayList<>(); long start = System.nanoTime(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(10); })); } ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms"); } }
Unsafe implementation. As for why, I don't elaborate. It's obvious
@Slf4j public class Test { public static void main(String[] args) { Account.demo(new AccountUnsafe(10000)); } } class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public Integer getBalance() { return balance; } @Override public void withdraw(Integer amount) { balance -= amount; } }
Implementation of locking
class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public synchronized Integer getBalance() { return balance; } @Override public synchronized void withdraw(Integer amount) { balance -= amount; } }
Lock free implementation
class AccountCas implements Account { private final AtomicInteger balance; public AccountCas(Integer balance) { this.balance = new AtomicInteger(balance); } @Override public synchronized Integer getBalance() { return balance.get(); } @Override public synchronized void withdraw(Integer amount) { while (true) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break; } } // It can be simplified to the following method // balance.addAndGet(-1 * amount); } }
2. CAS and volatile
The AtomicInteger solution seen earlier does not use locks internally to protect the thread safety of shared variables. So how is it realized?
- compareAndSet does this check. Before setting, compare prev with the current value. If it is inconsistent, next will be invalidated, and return false to indicate failure. For example, other threads have made subtraction, and the current value has been reduced to 990, then this 990 of this thread will be invalidated. Enter while and retry the next cycle. If it is consistent, set next to the new value, Returning true indicates success
In fact, both the underlying CPU (XCMG) and the underlying CPU (XCMG) can exchange instructions in a multi-core architecture.
Enter AtomicInteger source code
When obtaining a shared variable, volatile modification is needed to ensure the visibility of the variable. It can be used to modify member variables and static member variables. It can prevent threads from looking up the value of variables from their own work cache. They must get its value from main memory. Threads operate volatile variables directly in main memory. That is, the modification of volatile variable by one thread is visible to another thread. CAS must use volatile to read the latest value of shared variables to achieve the effect of [compare and exchange]
We can also see that the lock free implementation is more efficient than the first two. Why?
- In the case of no lock, even if the retry fails, the thread always runs at a high speed without stopping (blocking), and synchronized will cause the thread to switch the context and enter the blocking when it does not obtain the lock. However, if the competition is fierce, it can be expected that the retry will occur frequently, but the efficiency will be affected
- CAS is based on the idea of optimistic lock: the most optimistic estimate is that you are not afraid of other threads to modify shared variables. Even if you do, it doesn't matter. I'll try again at a loss.
- synchronized is based on the idea of pessimistic lock: the most pessimistic estimate is to prevent other threads from modifying shared variables. When I lock, you don't want to change it. Only after I change and know how to unlock, can you have a chance.
3. Atomic integer
JUC also provided:
- AtomicBoolean
- AtomicInteger
- AtomicLong
Take AtomicInteger as an example. The other two are similar
AtomicInteger i = new AtomicInteger(0); // Get and auto increment (i = 0, result i = 1, return 0), similar to i++ System.out.println(i.getAndIncrement()); // Auto increment and get (i = 1, result i = 2, return 2), similar to + + i System.out.println(i.incrementAndGet()); // Subtract and get (i = 2, result i = 1, return 1), similar to -- i System.out.println(i.decrementAndGet()); // Gets and subtracts itself (i = 1, result i = 0, returns 1), similar to i-- System.out.println(i.getAndDecrement()); // Get and add value (i = 0, result i = 5, return 0) System.out.println(i.getAndAdd(5)); // Add value and get (i = 5, result i = 0, return 0) System.out.println(i.addAndGet(-5)); // Get and update (i = 0, p is the current value of i, result i = -2, return 0) System.out.println(i.getAndUpdate(p -> p - 2)); // Update and get (i = -2, p is the current value of i, result i = 0, return 0) System.out.println(i.updateAndGet(p -> p + 2)); // Get and calculate (i = 0, p is the current value of i, x is parameter 1, result i = 10, return 0) // If getAndUpdate refers to an external local variable in lambda, ensure that the local variable is final // getAndAccumulate can refer to external local variables through parameter 1, but it does not have to be final because it is not in lambda System.out.println(i.getAndAccumulate(10, (p, x) -> p + x)); // Calculate and obtain (i = 10, p is the current value of i, x is parameter 1, result i = 0, return 0) System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
- Observation method updateAndGet source code
public final int updateAndGet(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return next; }
Because there is multiplication and division after addition and subtraction, this operation is abstracted as a functional interface IntUnaryOperator
In this way, we only need to put the original value and the changed value into the IntUnaryOperator, and then AtomicInteger calls the CAS operation to modify it
4. Atomic reference
The basic type of data that can be referenced by jgdec is not necessarily the type of atom. Why do you need to use the basic type of atom
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
4.1,AtomicReference
First, make an unsafe implementation that does not use AtomicReference to withdraw money
class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } @Override public void withdraw(BigDecimal amount) { BigDecimal balance = this.getBalance(); // subtraction this.balance = balance.subtract(amount); } } interface DecimalAccount { /** * Get balance */ BigDecimal getBalance(); /** * withdraw money */ void withdraw(BigDecimal amount); /** * 1000 threads will be started in the method, and each thread will do the operation of - 10 yuan * If the initial balance is 10000, the correct result should be 0 */ static void demo(DecimalAccount account) { List<Thread> ts = new ArrayList<>(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(BigDecimal.TEN); })); } ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(account.getBalance()); } }
Using locks to implement
@Override public synchronized BigDecimal getBalance() { return balance; } @Override public synchronized void withdraw(BigDecimal amount) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); }
Implementation using AtomicReference
class DecimalAccountUnsafe implements DecimalAccount { AtomicReference<BigDecimal> balance; public DecimalAccountUnsafe(BigDecimal balance) { this.balance = new AtomicReference<>(balance); } @Override public BigDecimal getBalance() { return balance.get(); } @Override public void withdraw(BigDecimal amount) { while (true) { BigDecimal prev = balance.get(); BigDecimal next = prev.subtract(amount); if (balance.compareAndSet(prev, next)) { break; } } } }
4.2 ABA problem
Execute the following code
static AtomicReference<String> ref = new AtomicReference<>("A"); @SneakyThrows public static void main(String[] args) { log.info("main start..."); // Get value A // Has this shared variable been modified by its thread? String prev = ref.get(); other(); TimeUnit.SECONDS.sleep(1); // Try to change to C log.info("change A->C {}", ref.compareAndSet(prev, "C")); } @SneakyThrows private static void other() { new Thread(() -> { log.info("change A->B {}", ref.compareAndSet(ref.get(), "B")); }, "t1").start(); TimeUnit.MILLISECONDS.sleep(500); new Thread(() -> { log.info("change B->A {}", ref.compareAndSet(ref.get(), "A")); }, "t2").start(); }
It can be found that the main thread can only judge whether the value of the shared variable is the same as the initial value a, and can not perceive the situation of changing from a to B and back to A. for most scenarios, this has no impact on our business. However, if the main thread wants, as long as other threads [have moved] the shared variable, its CAS will fail, What should I do? At this time, you can't use AtomicReference, but AtomicStampedReference
4.3,AtomicStampedReference
AtomicStampedReference can add version number to atomic reference and track the whole change process of atomic reference, such as a - > b - > A - > C. through AtomicStampedReference, we can know that the reference variable has been changed several times in the middle.
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); @SneakyThrows public static void main(String[] args) { log.info("main start..."); // Get value A String prev = ref.getReference(); // Get version number int stamp = ref.getStamp(); log.info("edition {}", stamp); // If other threads interfere in the middle, ABA occurs other(); TimeUnit.SECONDS.sleep(1); // Try to change to C log.info("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1)); } @SneakyThrows private static void other() { new Thread(() -> { log.info("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1)); log.info("Updated version is {}", ref.getStamp()); }, "t1").start(); TimeUnit.MILLISECONDS.sleep(500); new Thread(() -> { log.info("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1)); log.info("Updated version is {}", ref.getStamp()); }, "t2").start(); }
However, sometimes, I don't care about how many times the reference variable has been changed, but just whether it has been changed. Therefore, there is AtomicMarkableReference
4.4,AtomicMarkableReference
@Slf4j public class Test { @SneakyThrows public static void main(String[] args) { GarbageBag bag = new GarbageBag("Full of garbage"); // Parameter 2 mark can be regarded as a mark, indicating that the garbage bag is full AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true); log.info("Main thread start..."); GarbageBag prev = ref.getReference(); log.info(prev.toString()); new Thread(() -> { log.info("Cleaning thread start..."); bag.setDesc("Empty garbage bag"); while (!ref.compareAndSet(bag, bag, true, false)) { } log.info(bag.toString()); }).start(); Thread.sleep(1000); log.info("The main thread wants to change a new garbage bag?"); boolean success = ref.compareAndSet(prev, new GarbageBag("Empty garbage bag"), true, false); log.info("Did you change it?" + success); log.info(ref.getReference().toString()); } } class GarbageBag { String desc; public GarbageBag(String desc) { this.desc = desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return super.toString() + " " + desc; } }
5. Atomic array
For the AtomicReference mentioned earlier, if an array is stored, we cannot thread safely access each element stored in the array, so there is an atomic array
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
Now there are the following methods
/** * supplier Provider makes something out of nothing () - > result * function Function one parameter, one result (parameter) - > result, BiFunction (parameter 1, parameter 2) - > result * consumer Consumer has no result for one parameter (parameter) - > void, biconsumer (parameter 1, parameter 2) - > void * * @param arraySupplier Provides an array, which can be a thread unsafe array or a thread safe array * @param lengthFun Method to get array length * @param putConsumer Auto increment method, return array, index * @param printConsumer Method of printing array */ private static <T> void demo( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer) { List<Thread> ts = new ArrayList<>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0; i < length; i++) { ts.add(new Thread(() -> { // Each thread performs 10000 operations on the array for (int j = 0; j < 10000; j++) { // Allocate to each element 10000 / 10 = 1000 self increment, outer loop, 10 threads are self increment 1000 respectively putConsumer.accept(array, j % length); } })); } // Start all threads ts.forEach(Thread::start); // Wait until all threads end ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); }
5.1 realization of insecurity
If we don't use atomic arrays, use ordinary arrays:
demo( () -> new int[10], (array) -> array.length, (array, index) -> array[index]++, array -> System.out.println(Arrays.toString(array)) );
5.2 realization of safety
As can be seen from the above example, ordinary arrays are not safe under multithreading. Now we use AtomicIntegerArray
demo( () -> new AtomicIntegerArray(10), (AtomicIntegerArray::length), (AtomicIntegerArray::getAndIncrement), atomicIntegerArray -> System.out.println(atomicIntegerArray.toString()) );
6. Field Updater
Using the Field updater, you can perform atomic operations on a Field of the object. JUC also provides:
- AtomicReferenceFieldUpdater / / domain field
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
It can only be used with volatile modified fields, otherwise an exception will occur
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
Simple implementation
@Slf4j public class Test { @SneakyThrows public static void main(String[] args) { Student student = new Student(); AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name"); updater.compareAndSet(student, null, "People below two meters are mortals"); System.out.println(student.toString()); } } @Data class Student { volatile String name; }
7. Atomic accumulator
After JDK 8, several classes dedicated to accumulation have been added, and their performance is much higher than that of AtomicInteger
7.1 Accumulator Performance Comparison
@Slf4j public class Test { @SneakyThrows public static void main(String[] args) { for (int i = 0; i < 5; i++) { demo( AtomicLong::new, AtomicLong::incrementAndGet ); } for (int i = 0; i < 5; i++) { demo( LongAdder::new, LongAdder::increment ); } } private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); long start = System.nanoTime(); List<Thread> ts = new ArrayList<>(); // 4 threads, each accumulating 500000 for (int i = 0; i < 40; i++) { ts.add(new Thread(() -> { for (int j = 0; j < 500000; j++) { action.accept(adder); } })); } ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start) / 1000_000); } }
The reason for performance improvement is very simple. When there is competition, multiple accumulation units are set (reasonably allocated according to the intensity of competition, but they will not exceed the number of system cores in the end). Therad-0 accumulates Cell[0], while Thread-1 accumulates Cell[1]... Finally summarize the results. In this way, they operate on different Cell variables when accumulating, thus reducing CAS retry failures and improving performance.
7.2. LongAdder source code
LongAdder class has several key fields. See its parent class Striped64 for details
// Accumulation cell array, lazy initialization transient volatile Cell[] cells; // Base value. If there is no competition, use cas to accumulate this field transient volatile long base; // When cells are created or expanded, it is set to 1, indicating locking transient volatile int cellsBusy;
7.2.1 CAS implementation lock
Why does cellsBusy mean locking? Isn't CAS a lock free concept? In fact, CAS can implement a lock like mechanism
Only for testing and trial use, not for production environment
@Slf4j class CasLock { private final AtomicInteger state = new AtomicInteger(0); public void lock() { while (true) { if (state.compareAndSet(0, 1)) { break; } } } public void unlock() { log.info("unlock..."); state.set(0); } }
test
@SneakyThrows public static void main(String[] args) { CasLock lock = new CasLock(); new Thread(() -> { log.info("begin..."); lock.lock(); try { log.info("lock..."); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }).start(); new Thread(() -> { log.info("begin..."); lock.lock(); try { log.info("lock..."); } finally { lock.unlock(); } }).start(); }
In fact, cellsBusy is equivalent to state in the code in LongAdder
7.2.2,Cell
//Prevent cache rows and pseudo sharing @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanism private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
To understand this code, you need to start with caching
The comparison between cache and memory elements is as follows (cycle indicates clock cycle):
From cpu to | Approximately required clock cycles |
---|---|
register | 1 cycle (about 0.25ns for 4GHz CPU) |
L1 | 3~4 cycle |
L2 | 10~20 cycle |
L3 | 40~45 cycle |
Memory | 120~240 cycle |
- Because the speed of pre reading to the memory is very different from that of the cache.
- The cache is in the unit of cache behavior, and each cache line corresponds to a piece of memory, generally 64 byte s (8 long)
- The addition of cache will result in the generation of data copies, that is, the same data will be cached in cache lines of different cores
- The CPU should ensure the consistency of data. If a CPU core changes data, the entire cache line corresponding to other CPU cores must be invalidated
For example, in the existing Cell[2] array, because a Cell occupies 24 bytes (object header 16 + long 8 bytes), the cells of two units occupy 48, which is less than one cache line 64. Therefore, for two cores, they will put the Cell[2] into their own cache line respectively. In this way, no matter which Core modifies one of the cells, The other Cell cache has to be invalidated, so you have to read the latest Cell[2] in memory again (because Cell[2] exists in one row)
@sun. misc. Contented is used to solve this problem. Its principle is to add 128 byte padding before and after the object or field using this annotation, so that the CPU can occupy different cache lines when pre reading the object to the cache. In this way, if one Cell is modified, it will not cause the invalidation of the opposite cache line
7.2.3 self increasing source code tracking
Select the increment method for the entry
public void increment() { add(1L); } public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ( (as = cells) != null || !casBase(b = base, b + x) ) { boolean uncontended = true; if ( as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)) ) longAccumulate(x, null, uncontended); } }
Look at the flow chart. Let's follow it
- Cells are loaded lazily, that is, when there is no competition at the beginning, cells are still null, so at this time, the basic value base is used for CAS auto increment operation. If the auto increment is successful, the corresponding casBase method will directly return. Otherwise, enter the if branch of the first layer and directly go to the longAccumulate method
- If there is competition, directly enter the if branch of the first layer, and then start to judge whether the corresponding Cell unit exists according to the current thread, as [getprobe() & M]. If it does not exist, directly enter the longAccumulate method. If it has been created, use the unit to perform CAS operation, corresponding to uncontended = a.cas(v = a.value, v + x). Of course, If the execution is successful, it will be returned directly. Otherwise, enter the longAccumulate method
The above steps involve the longAccumulate method. Now let's enter this method. Because this code is too long, we can take it apart
- First, let's look at the second if branch. When the cells do not exist, they will be transferred from the first branch to the second branch. However, it is also necessary that the cells are not locked and not created by other threads
graph subgraph cells establish cells --> lock("Lock") lock --success--> create("establish cells And initialize a cell") create --> return lock --fail--> base("cas base accumulation") base --success--> return base --fail--> a a("Circulation inlet") --> cells("cells Does not exist and is not locked and not created") end
- The locking operation is performed by the casCellsBusy method. If the locking fails, it will go to the last branch and directly self increment the base. If the self increment is successful, it will return directly. Otherwise, the loop will judge again. If the locking is successful, it will enter the second if branch
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table // Again, determine whether someone else initializes the cells if (cells == as) { // The initial value is 2 Cell[] rs = new Cell[2]; // The initial value of the accumulation unit corresponding to the current thread is 1, because this x is the self increment 1 passed in from the outer layer rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { // Unlock cellsBusy = 0; } if (init) break; }
- Then, when the accumulation unit cells already exist, after entering this method again, the cell slot corresponding to the current thread may not exist, so enter the first branch to see if it does not exist
if ((as = cells) != null && (n = as.length) > 0) { // Judge whether the slot corresponding to the current thread exists if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell // When creating a cell object, the initial value is also x Cell r = new Cell(x); // Optimistically create // Try locking if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // Confirm again whether the slot is empty if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // Save the created cells into cells rs[j] = r; created = true; } } finally { // Unlock cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if { ... } }
- If both cells and cells exist, enter the third branch of the first if branch and directly perform CAS operation on the accumulation unit
if ((as = cells) != null && (n = as.length) > 0) { // Judge whether the slot corresponding to the current thread exists if ((a = as[(n - 1) & h]) == null) { ... } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // Direct CAS accumulation else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // If the accumulation fails, judge whether the array length exceeds the number of CPU cores, set the collapse flag bit and control it to the next branch else if (n >= NCPU || cells != as) collide = false; // At max size or stale // If you take this branch, you won't take the lower expansion branch else if (!collide) collide = true; //Capacity expansion operation else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale //Double expansion Cell[] rs = new Cell[n << 1]; //Transfer raw data for (int i = 0; i < n; ++i) rs[i] = as[i]; //cover cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } // Try to change the accumulation unit of the current thread (old failure, change one for you) h = advanceProbe(h); }
7.2.4,sum
After all the accumulation units are completed, you need to add up all the data, so you use the sum method. This part of the code is easy to understand. Don't talk about it
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
8,Unsafe
8.1 acquisition
Unsafe objects provide very low-level methods for operating memory and threads. Unsafe objects cannot be called directly, but can only be obtained through reflection. Because part of their design is low-level, it is not recommended that our developers call them at will
static Unsafe unsafe; static { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); unsafe = (Unsafe) theUnsafe.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { throw new Error(e); } } static Unsafe getUnsafe() { return unsafe; }
8.2 realization of CAS
@SneakyThrows public static void main(String[] args) { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); Unsafe unsafe = (Unsafe) theUnsafe.get(null); Field id = Student.class.getDeclaredField("id"); Field name = Student.class.getDeclaredField("name"); // Gets the offset of the member variable long idOffset = unsafe.objectFieldOffset(id); long nameOffset = unsafe.objectFieldOffset(name); Student student = new Student(); System.out.println(student); // Replace the value of the member variable with the cas method // Return true unsafe.compareAndSwapInt(student, idOffset, 0, 22); // Return true unsafe.compareAndSwapObject(student, nameOffset, null, "People below two meters are mortals"); System.out.println(student); } @Data class Student { volatile int id; volatile String name; }
8.3. Realize atomic integer
public class Test { @SneakyThrows public static void main(String[] args) { Account.demo(new MyAtomicInteger(10000)); } } interface Account { /** * Get balance */ Integer getBalance(); /** * withdraw money */ void withdraw(Integer amount); /** * 1000 threads will be started in the method, and each thread will do the operation of - 10 yuan * If the initial balance is 10000, the correct result should be 0 */ static void demo(Account account) { List<Thread> ts = new ArrayList<>(); for (int i = 0; i < 1000; i++) { ts.add(new Thread(() -> { account.withdraw(10); })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end - start) / 1000_000 + " ms"); } } class MyAtomicInteger implements Account { private final int value; private static final long VALUE_OFFSET; private static final Unsafe UNSAFE; static { UNSAFE = UnsafeAccessor.getUnsafe(); try { VALUE_OFFSET = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value")); } catch (NoSuchFieldException e) { e.printStackTrace(); throw new RuntimeException(e); } } public int getValue() { return value; } public void decrement(int amount) { while (true) { int prev = this.value; int next = prev - amount; if (UNSAFE.compareAndSwapInt(this, VALUE_OFFSET, prev, next)) { break; } } } public MyAtomicInteger(int value) { this.value = value; } @Override public Integer getBalance() { return getValue(); } @Override public void withdraw(Integer amount) { decrement(amount); } } class UnsafeAccessor { private static final Unsafe UNSAFE; static { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); UNSAFE = (Unsafe) theUnsafe.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { throw new Error(e); } } public static Unsafe getUnsafe() { return UNSAFE; } }