Lock free concurrency of JUC (leguan lock)

1. Quote

There are the following requirements to ensure account Thread safety of withdraw withdrawal method

interface Account {
    /**
     * Get balance
     *
     * @return Integer balance
     */
    Integer getBalance();

    /**
     * withdraw money
     *
     * @param amount Withdrawal amount
     */
    void withdraw(Integer amount);

    /**
     * 1000 threads will be started in the method, and each thread will do the operation of - 10 yuan
     * If the initial balance is 10000, the correct result should be 0
     *
     * @param account Implementation class
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end - start) / 1000_000 + " ms");
    }
}

Unsafe implementation. As for why, I don't elaborate. It's obvious

@Slf4j
public class Test {

    public static void main(String[] args) {
        Account.demo(new AccountUnsafe(10000));
    }
}

class AccountUnsafe implements Account {
    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        return balance;
    }

    @Override
    public void withdraw(Integer amount) {
        balance -= amount;
    }
}

Implementation of locking

class AccountUnsafe implements Account {
    private Integer balance;

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public synchronized Integer getBalance() {
        return balance;
    }

    @Override
    public synchronized void withdraw(Integer amount) {
        balance -= amount;
    }
}

Lock free implementation

class AccountCas implements Account {
    private final AtomicInteger balance;

    public AccountCas(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public synchronized Integer getBalance() {
        return balance.get();
    }

    @Override
    public synchronized void withdraw(Integer amount) {
        while (true) {
            int prev = balance.get();
            int next = prev - amount;
            if (balance.compareAndSet(prev, next)) {
                break;
            }
        }
        // It can be simplified to the following method
        // balance.addAndGet(-1 * amount);
    }
}

2. CAS and volatile

The AtomicInteger solution seen earlier does not use locks internally to protect the thread safety of shared variables. So how is it realized?

  • compareAndSet does this check. Before setting, compare prev with the current value. If it is inconsistent, next will be invalidated, and return false to indicate failure. For example, other threads have made subtraction, and the current value has been reduced to 990, then this 990 of this thread will be invalidated. Enter while and retry the next cycle. If it is consistent, set next to the new value, Returning true indicates success

In fact, both the underlying CPU (XCMG) and the underlying CPU (XCMG) can exchange instructions in a multi-core architecture.

Enter AtomicInteger source code

When obtaining a shared variable, volatile modification is needed to ensure the visibility of the variable. It can be used to modify member variables and static member variables. It can prevent threads from looking up the value of variables from their own work cache. They must get its value from main memory. Threads operate volatile variables directly in main memory. That is, the modification of volatile variable by one thread is visible to another thread. CAS must use volatile to read the latest value of shared variables to achieve the effect of [compare and exchange]

We can also see that the lock free implementation is more efficient than the first two. Why?

  • In the case of no lock, even if the retry fails, the thread always runs at a high speed without stopping (blocking), and synchronized will cause the thread to switch the context and enter the blocking when it does not obtain the lock. However, if the competition is fierce, it can be expected that the retry will occur frequently, but the efficiency will be affected
  • CAS is based on the idea of optimistic lock: the most optimistic estimate is that you are not afraid of other threads to modify shared variables. Even if you do, it doesn't matter. I'll try again at a loss.
  • synchronized is based on the idea of pessimistic lock: the most pessimistic estimate is to prevent other threads from modifying shared variables. When I lock, you don't want to change it. Only after I change and know how to unlock, can you have a chance.

3. Atomic integer

JUC also provided:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

Take AtomicInteger as an example. The other two are similar

AtomicInteger i = new AtomicInteger(0);

// Get and auto increment (i = 0, result i = 1, return 0), similar to i++
System.out.println(i.getAndIncrement());

// Auto increment and get (i = 1, result i = 2, return 2), similar to + + i
System.out.println(i.incrementAndGet());

// Subtract and get (i = 2, result i = 1, return 1), similar to -- i
System.out.println(i.decrementAndGet());

// Gets and subtracts itself (i = 1, result i = 0, returns 1), similar to i--
System.out.println(i.getAndDecrement());

// Get and add value (i = 0, result i = 5, return 0)
System.out.println(i.getAndAdd(5));

// Add value and get (i = 5, result i = 0, return 0)
System.out.println(i.addAndGet(-5));

// Get and update (i = 0, p is the current value of i, result i = -2, return 0)
System.out.println(i.getAndUpdate(p -> p - 2));

// Update and get (i = -2, p is the current value of i, result i = 0, return 0)
System.out.println(i.updateAndGet(p -> p + 2));

// Get and calculate (i = 0, p is the current value of i, x is parameter 1, result i = 10, return 0)
// If getAndUpdate refers to an external local variable in lambda, ensure that the local variable is final
// getAndAccumulate can refer to external local variables through parameter 1, but it does not have to be final because it is not in lambda
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

// Calculate and obtain (i = 10, p is the current value of i, x is parameter 1, result i = 0, return 0)
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
  • Observation method updateAndGet source code
public final int updateAndGet(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        prev = get();
        next = updateFunction.applyAsInt(prev);
    } while (!compareAndSet(prev, next));
    return next;
}

Because there is multiplication and division after addition and subtraction, this operation is abstracted as a functional interface IntUnaryOperator

In this way, we only need to put the original value and the changed value into the IntUnaryOperator, and then AtomicInteger calls the CAS operation to modify it

4. Atomic reference

The basic type of data that can be referenced by jgdec is not necessarily the type of atom. Why do you need to use the basic type of atom

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference

4.1,AtomicReference

First, make an unsafe implementation that does not use AtomicReference to withdraw money

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;

    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }

    @Override
    public BigDecimal getBalance() {
        return balance;
    }

    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        // subtraction
        this.balance = balance.subtract(amount);
    }
}

interface DecimalAccount {
    /**
     * Get balance
     */
    BigDecimal getBalance();

    /**
     * withdraw money
     */
    void withdraw(BigDecimal amount);

    /**
     * 1000 threads will be started in the method, and each thread will do the operation of - 10 yuan
     * If the initial balance is 10000, the correct result should be 0
     */
    static void demo(DecimalAccount account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }
}

Using locks to implement

@Override
public synchronized BigDecimal getBalance() {
    return balance;
}

@Override
public synchronized void withdraw(BigDecimal amount) {
    BigDecimal balance = this.getBalance();
    this.balance = balance.subtract(amount);
}

Implementation using AtomicReference

class DecimalAccountUnsafe implements DecimalAccount {
    AtomicReference<BigDecimal> balance;

    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = balance.get();
            BigDecimal next = prev.subtract(amount);
            if (balance.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}

4.2 ABA problem

Execute the following code

static AtomicReference<String> ref = new AtomicReference<>("A");

@SneakyThrows
public static void main(String[] args) {
    log.info("main start...");
    // Get value A
    // Has this shared variable been modified by its thread?
    String prev = ref.get();
    other();
    TimeUnit.SECONDS.sleep(1);
    // Try to change to C
    log.info("change A->C {}", ref.compareAndSet(prev, "C"));
}

@SneakyThrows
private static void other() {
    new Thread(() -> {
        log.info("change A->B {}", ref.compareAndSet(ref.get(), "B"));
    }, "t1").start();
    TimeUnit.MILLISECONDS.sleep(500);
    new Thread(() -> {
        log.info("change B->A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();
}

It can be found that the main thread can only judge whether the value of the shared variable is the same as the initial value a, and can not perceive the situation of changing from a to B and back to A. for most scenarios, this has no impact on our business. However, if the main thread wants, as long as other threads [have moved] the shared variable, its CAS will fail, What should I do? At this time, you can't use AtomicReference, but AtomicStampedReference

4.3,AtomicStampedReference

AtomicStampedReference can add version number to atomic reference and track the whole change process of atomic reference, such as a - > b - > A - > C. through AtomicStampedReference, we can know that the reference variable has been changed several times in the middle.

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

@SneakyThrows
public static void main(String[] args) {
    log.info("main start...");
    // Get value A
    String prev = ref.getReference();
    // Get version number
    int stamp = ref.getStamp();
    log.info("edition {}", stamp);
    // If other threads interfere in the middle, ABA occurs
    other();
    TimeUnit.SECONDS.sleep(1);
    // Try to change to C
    log.info("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}

@SneakyThrows
private static void other() {
    new Thread(() -> {
        log.info("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
                ref.getStamp(), ref.getStamp() + 1));
        log.info("Updated version is {}", ref.getStamp());
    }, "t1").start();
    TimeUnit.MILLISECONDS.sleep(500);
    new Thread(() -> {
        log.info("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
                ref.getStamp(), ref.getStamp() + 1));
        log.info("Updated version is {}", ref.getStamp());
    }, "t2").start();
}

However, sometimes, I don't care about how many times the reference variable has been changed, but just whether it has been changed. Therefore, there is AtomicMarkableReference

4.4,AtomicMarkableReference

@Slf4j
public class Test {

    @SneakyThrows
    public static void main(String[] args) {
        GarbageBag bag = new GarbageBag("Full of garbage");
        // Parameter 2 mark can be regarded as a mark, indicating that the garbage bag is full
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
        log.info("Main thread start...");
        GarbageBag prev = ref.getReference();
        log.info(prev.toString());
        new Thread(() -> {
            log.info("Cleaning thread start...");
            bag.setDesc("Empty garbage bag");
            while (!ref.compareAndSet(bag, bag, true, false)) {
            }
            log.info(bag.toString());
        }).start();
        Thread.sleep(1000);
        log.info("The main thread wants to change a new garbage bag?");
        boolean success = ref.compareAndSet(prev, new GarbageBag("Empty garbage bag"), true, false);
        log.info("Did you change it?" + success);
        log.info(ref.getReference().toString());

    }
}

class GarbageBag {
    String desc;

    public GarbageBag(String desc) {
        this.desc = desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return super.toString() + " " + desc;
    }
}

5. Atomic array

For the AtomicReference mentioned earlier, if an array is stored, we cannot thread safely access each element stored in the array, so there is an atomic array

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

Now there are the following methods

/**
 * supplier Provider makes something out of nothing () - > result
 * function Function one parameter, one result (parameter) - > result, BiFunction (parameter 1, parameter 2) - > result
 * consumer Consumer has no result for one parameter (parameter) - > void, biconsumer (parameter 1, parameter 2) - > void
 *
 * @param arraySupplier Provides an array, which can be a thread unsafe array or a thread safe array
 * @param lengthFun     Method to get array length
 * @param putConsumer   Auto increment method, return array, index
 * @param printConsumer Method of printing array
 */
private static <T> void demo(
    Supplier<T> arraySupplier,
    Function<T, Integer> lengthFun,
    BiConsumer<T, Integer> putConsumer,
    Consumer<T> printConsumer) {
    List<Thread> ts = new ArrayList<>();
    T array = arraySupplier.get();
    int length = lengthFun.apply(array);
    for (int i = 0; i < length; i++) {
        ts.add(new Thread(() -> {
            // Each thread performs 10000 operations on the array
            for (int j = 0; j < 10000; j++) {
                // Allocate to each element 10000 / 10 = 1000 self increment, outer loop, 10 threads are self increment 1000 respectively
                putConsumer.accept(array, j % length);
            }
        }));
    }
    // Start all threads
    ts.forEach(Thread::start);
    // Wait until all threads end
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    printConsumer.accept(array);
}

5.1 realization of insecurity

If we don't use atomic arrays, use ordinary arrays:

demo(
        () -> new int[10],
        (array) -> array.length,
        (array, index) -> array[index]++,
        array -> System.out.println(Arrays.toString(array))
);

5.2 realization of safety

As can be seen from the above example, ordinary arrays are not safe under multithreading. Now we use AtomicIntegerArray

demo(
        () -> new AtomicIntegerArray(10),
        (AtomicIntegerArray::length),
        (AtomicIntegerArray::getAndIncrement),
        atomicIntegerArray -> System.out.println(atomicIntegerArray.toString())
);

6. Field Updater

Using the Field updater, you can perform atomic operations on a Field of the object. JUC also provides:

  • AtomicReferenceFieldUpdater / / domain field
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

It can only be used with volatile modified fields, otherwise an exception will occur

Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type

Simple implementation

@Slf4j
public class Test {

    @SneakyThrows
    public static void main(String[] args) {
        Student student = new Student();
        AtomicReferenceFieldUpdater<Student, String> updater =
                AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
        updater.compareAndSet(student, null, "People below two meters are mortals");
        System.out.println(student.toString());
    }

}

@Data
class Student {
    volatile String name;
}

7. Atomic accumulator

After JDK 8, several classes dedicated to accumulation have been added, and their performance is much higher than that of AtomicInteger

7.1 Accumulator Performance Comparison

@Slf4j
public class Test {

    @SneakyThrows
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            demo(
                    AtomicLong::new,
                    AtomicLong::incrementAndGet
            );
        }
        for (int i = 0; i < 5; i++) {
            demo(
                    LongAdder::new,
                    LongAdder::increment
            );
        }
    }

    private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
        T adder = adderSupplier.get();
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 4 threads, each accumulating 500000
        for (int i = 0; i < 40; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    action.accept(adder);
                }
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start) / 1000_000);
    }
}

The reason for performance improvement is very simple. When there is competition, multiple accumulation units are set (reasonably allocated according to the intensity of competition, but they will not exceed the number of system cores in the end). Therad-0 accumulates Cell[0], while Thread-1 accumulates Cell[1]... Finally summarize the results. In this way, they operate on different Cell variables when accumulating, thus reducing CAS retry failures and improving performance.

7.2. LongAdder source code

LongAdder class has several key fields. See its parent class Striped64 for details

// Accumulation cell array, lazy initialization
transient volatile Cell[] cells;

// Base value. If there is no competition, use cas to accumulate this field
transient volatile long base;

// When cells are created or expanded, it is set to 1, indicating locking
transient volatile int cellsBusy;

7.2.1 CAS implementation lock

Why does cellsBusy mean locking? Isn't CAS a lock free concept? In fact, CAS can implement a lock like mechanism

Only for testing and trial use, not for production environment

@Slf4j
class CasLock {
    private final AtomicInteger state = new AtomicInteger(0);

    public void lock() {
        while (true) {
            if (state.compareAndSet(0, 1)) {
                break;
            }
        }
    }

    public void unlock() {
        log.info("unlock...");
        state.set(0);
    }
}

test

@SneakyThrows
public static void main(String[] args) {
    CasLock lock = new CasLock();
    new Thread(() -> {
        log.info("begin...");
        lock.lock();
        try {
            log.info("lock...");
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }).start();
    new Thread(() -> {
        log.info("begin...");
        lock.lock();
        try {
            log.info("lock...");
        } finally {
            lock.unlock();
        }
    }).start();
}

In fact, cellsBusy is equivalent to state in the code in LongAdder

7.2.2,Cell

//Prevent cache rows and pseudo sharing
@sun.misc.Contended 
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanism
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

To understand this code, you need to start with caching

The comparison between cache and memory elements is as follows (cycle indicates clock cycle):

From cpu toApproximately required clock cycles
register1 cycle (about 0.25ns for 4GHz CPU)
L13~4 cycle
L210~20 cycle
L340~45 cycle
Memory120~240 cycle
  • Because the speed of pre reading to the memory is very different from that of the cache.
  • The cache is in the unit of cache behavior, and each cache line corresponds to a piece of memory, generally 64 byte s (8 long)
  • The addition of cache will result in the generation of data copies, that is, the same data will be cached in cache lines of different cores
  • The CPU should ensure the consistency of data. If a CPU core changes data, the entire cache line corresponding to other CPU cores must be invalidated

For example, in the existing Cell[2] array, because a Cell occupies 24 bytes (object header 16 + long 8 bytes), the cells of two units occupy 48, which is less than one cache line 64. Therefore, for two cores, they will put the Cell[2] into their own cache line respectively. In this way, no matter which Core modifies one of the cells, The other Cell cache has to be invalidated, so you have to read the latest Cell[2] in memory again (because Cell[2] exists in one row)

@sun. misc. Contented is used to solve this problem. Its principle is to add 128 byte padding before and after the object or field using this annotation, so that the CPU can occupy different cache lines when pre reading the object to the cache. In this way, if one Cell is modified, it will not cause the invalidation of the opposite cache line

7.2.3 self increasing source code tracking

Select the increment method for the entry

public void increment() {
    add(1L);
}
public void add(long x) {
    Cell[] as;
    long b, v;
    int m;
    Cell a;
    if (
        (as = cells) != null || 
        !casBase(b = base, b + x)
    ) {
        boolean uncontended = true;
        if (
            as == null || 
            (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x))
        )
            longAccumulate(x, null, uncontended);
    }
}

Look at the flow chart. Let's follow it

  • Cells are loaded lazily, that is, when there is no competition at the beginning, cells are still null, so at this time, the basic value base is used for CAS auto increment operation. If the auto increment is successful, the corresponding casBase method will directly return. Otherwise, enter the if branch of the first layer and directly go to the longAccumulate method
  • If there is competition, directly enter the if branch of the first layer, and then start to judge whether the corresponding Cell unit exists according to the current thread, as [getprobe() & M]. If it does not exist, directly enter the longAccumulate method. If it has been created, use the unit to perform CAS operation, corresponding to uncontended = a.cas(v = a.value, v + x). Of course, If the execution is successful, it will be returned directly. Otherwise, enter the longAccumulate method

The above steps involve the longAccumulate method. Now let's enter this method. Because this code is too long, we can take it apart

  • First, let's look at the second if branch. When the cells do not exist, they will be transferred from the first branch to the second branch. However, it is also necessary that the cells are not locked and not created by other threads
graph  
subgraph cells establish
cells --> lock("Lock")
lock --success--> create("establish cells And initialize a cell")
create --> return
lock --fail--> base("cas base accumulation")
base --success--> return
base --fail--> a
a("Circulation inlet") --> cells("cells Does not exist and is not locked and not created")
end
  • The locking operation is performed by the casCellsBusy method. If the locking fails, it will go to the last branch and directly self increment the base. If the self increment is successful, it will return directly. Otherwise, the loop will judge again. If the locking is successful, it will enter the second if branch
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
    try {                           // Initialize table
        // Again, determine whether someone else initializes the cells
        if (cells == as) {
            // The initial value is 2
            Cell[] rs = new Cell[2];
            // The initial value of the accumulation unit corresponding to the current thread is 1, because this x is the self increment 1 passed in from the outer layer
            rs[h & 1] = new Cell(x);
            cells = rs;
            init = true;
        }
    } finally {
        // Unlock
        cellsBusy = 0;
    }
    if (init)
        break;
}
  • Then, when the accumulation unit cells already exist, after entering this method again, the cell slot corresponding to the current thread may not exist, so enter the first branch to see if it does not exist
if ((as = cells) != null && (n = as.length) > 0) {
    // Judge whether the slot corresponding to the current thread exists
    if ((a = as[(n - 1) & h]) == null) {
        if (cellsBusy == 0) {       // Try to attach new Cell
            // When creating a cell object, the initial value is also x
            Cell r = new Cell(x);   // Optimistically create
            // Try locking
            if (cellsBusy == 0 && casCellsBusy()) {
                boolean created = false;
                try {               // Recheck under lock
                    Cell[] rs; int m, j;
                    // Confirm again whether the slot is empty
                    if ((rs = cells) != null &&
                        (m = rs.length) > 0 &&
                        rs[j = (m - 1) & h] == null) {
                        // Save the created cells into cells
                        rs[j] = r;
                        created = true;
                    }
                } finally {
                    // Unlock
                    cellsBusy = 0;
                }
                if (created)
                    break;
                continue;           // Slot is now non-empty
            }
        }
        collide = false;
    } else if {
        ...
    }
}
  • If both cells and cells exist, enter the third branch of the first if branch and directly perform CAS operation on the accumulation unit
if ((as = cells) != null && (n = as.length) > 0) {
    // Judge whether the slot corresponding to the current thread exists
    if ((a = as[(n - 1) & h]) == null) {
        ...
    }
    else if (!wasUncontended)       // CAS already known to fail
        wasUncontended = true;      // Continue after rehash
    // Direct CAS accumulation
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                 fn.applyAsLong(v, x))))
        break;
    // If the accumulation fails, judge whether the array length exceeds the number of CPU cores, set the collapse flag bit and control it to the next branch
    else if (n >= NCPU || cells != as)
        collide = false;            // At max size or stale
    // If you take this branch, you won't take the lower expansion branch
    else if (!collide)
        collide = true;
    //Capacity expansion operation
    else if (cellsBusy == 0 && casCellsBusy()) {
        try {
            if (cells == as) {      // Expand table unless stale
                //Double expansion
                Cell[] rs = new Cell[n << 1];
                //Transfer raw data
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                //cover
                cells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }
    // Try to change the accumulation unit of the current thread (old failure, change one for you)
    h = advanceProbe(h);
}

7.2.4,sum

After all the accumulation units are completed, you need to add up all the data, so you use the sum method. This part of the code is easy to understand. Don't talk about it

public long sum() {
    Cell[] as = cells;
    Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

8,Unsafe

8.1 acquisition

Unsafe objects provide very low-level methods for operating memory and threads. Unsafe objects cannot be called directly, but can only be obtained through reflection. Because part of their design is low-level, it is not recommended that our developers call them at will

static Unsafe unsafe;

static {
    try {
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        unsafe = (Unsafe) theUnsafe.get(null);
    } catch (NoSuchFieldException | IllegalAccessException e) {
        throw new Error(e);
    }
}

static Unsafe getUnsafe() {
    return unsafe;
}

8.2 realization of CAS

@SneakyThrows
public static void main(String[] args) {
    Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
    theUnsafe.setAccessible(true);
    Unsafe unsafe = (Unsafe) theUnsafe.get(null);
    Field id = Student.class.getDeclaredField("id");
    Field name = Student.class.getDeclaredField("name");
    // Gets the offset of the member variable
    long idOffset = unsafe.objectFieldOffset(id);
    long nameOffset = unsafe.objectFieldOffset(name);
    Student student = new Student();
    System.out.println(student);
    // Replace the value of the member variable with the cas method
    // Return true
    unsafe.compareAndSwapInt(student, idOffset, 0, 22);
    // Return true
    unsafe.compareAndSwapObject(student, nameOffset, null, "People below two meters are mortals");
    System.out.println(student);
}

@Data
class Student {
    volatile int id;
    volatile String name;
}

8.3. Realize atomic integer

public class Test {

    @SneakyThrows
    public static void main(String[] args) {
        Account.demo(new MyAtomicInteger(10000));
    }
}

interface Account {
    /**
     * Get balance
     */
    Integer getBalance();

    /**
     * withdraw money
     */
    void withdraw(Integer amount);

    /**
     * 1000 threads will be started in the method, and each thread will do the operation of - 10 yuan
     * If the initial balance is 10000, the correct result should be 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        long start = System.nanoTime();
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end - start) / 1000_000 + " ms");
    }
}

class MyAtomicInteger implements Account {
    private final int value;
    private static final long VALUE_OFFSET;
    private static final Unsafe UNSAFE;

    static {
        UNSAFE = UnsafeAccessor.getUnsafe();
        try {
            VALUE_OFFSET = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public int getValue() {
        return value;
    }

    public void decrement(int amount) {
        while (true) {
            int prev = this.value;
            int next = prev - amount;
            if (UNSAFE.compareAndSwapInt(this, VALUE_OFFSET, prev, next)) {
                break;
            }
        }
    }

    public MyAtomicInteger(int value) {
        this.value = value;
    }

    @Override
    public Integer getBalance() {
        return getValue();
    }

    @Override
    public void withdraw(Integer amount) {
        decrement(amount);
    }
}

class UnsafeAccessor {
    private static final Unsafe UNSAFE;

    static {
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            UNSAFE = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error(e);
        }
    }

    public static Unsafe getUnsafe() {
        return UNSAFE;
    }
}

Keywords: Java Back-end JUC

Added by volomike on Sun, 13 Feb 2022 14:10:52 +0200