Review Java Concurrency with Zhu Ye: Atomic

In this section, we will explore the Atomic types in concurrent packages.

AtomicXXX and XXXAdder and XXXAccumulator performance tests

Start with a performance test that compares AtomicLong (out of 1.5), LongAdder (out of 1.8), and LongAccumulator (out of 1.8) for simple cumulative performance.

The logic of the program is simple, you can see that we do 1 billion operation tests with a maximum concurrency of 10:

@Slf4j
public class AccumulatorBenchmark {

    private StopWatch stopWatch = new StopWatch();
    static final int threadCount = 100;
    static final int taskCount = 1000000000;
    static final AtomicLong atomicLong = new AtomicLong();
    static final LongAdder longAdder = new LongAdder();
    static final LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0L);

    @Test
    public void test() {
        Map<String, IntConsumer> tasks = new HashMap<>();
        tasks.put("atomicLong", i -> atomicLong.incrementAndGet());
        tasks.put("longAdder", i -> longAdder.increment());
        tasks.put("longAccumulator", i -> longAccumulator.accumulate(1L));
        tasks.entrySet().forEach(item -> benchmark(threadCount, taskCount, item.getValue(), item.getKey()));

        log.info(stopWatch.prettyPrint());
        Assert.assertEquals(taskCount, atomicLong.get());
        Assert.assertEquals(taskCount, longAdder.longValue());
        Assert.assertEquals(taskCount, longAccumulator.longValue());

    }

    private void benchmark(int threadCount, int taskCount, IntConsumer task, String name) {
        stopWatch.start(name);
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(task));
        forkJoinPool.shutdown();
        try {
            forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        stopWatch.stop();
    }
}

The results are as follows:

Similar to what the official website says, LongAdder performance is much better than AtomicLong in high concurrency situations.

AtomicReference Visibility Problem Test

We see AtomicReference in many open source codes. What is it for?Let's write a test program in which we set a Switch type as a switch and then write three dead-loop threads to test it. When the switch is valid, it will continue to dead-loop and close all three switches after 2 seconds:

  • The first is a regular Swich
  • The second is Swich, which uses the volatile declaration
  • The third is Swich packaged in AtomicReference
@Slf4j
public class AtomicReferenceTest {

    private Switch rawValue = new Switch();
    private volatile Switch volatileValue = new Switch();
    private AtomicReference<Switch> atomicValue = new AtomicReference<>(new Switch());

    @Test
    public void test() throws InterruptedException {

        new Thread(() -> {
            log.info("Start:rawValue");
            while (rawValue.get()) {
            }
            log.info("Done:rawValue");
        }).start();

        new Thread(() -> {
            log.info("Start:volatileValue");
            while (volatileValue.get()) {
            }
            log.info("Done:volatileValue");
        }).start();

        new Thread(() -> {
            log.info("Start:atomicValue");
            while (atomicValue.get().get()) {
            }
            log.info("Done:atomicValue");
        }).start();

        Executors.newSingleThreadScheduledExecutor().schedule(rawValue::off, 2, TimeUnit.SECONDS);
        Executors.newSingleThreadScheduledExecutor().schedule(volatileValue::off, 2, TimeUnit.SECONDS);
        Executors.newSingleThreadScheduledExecutor().schedule(atomicValue.get()::off, 2, TimeUnit.SECONDS);

        TimeUnit.HOURS.sleep(1);
    }

    class Switch {
        private boolean enable = true;

        public boolean get() {
            return enable;
        }

        public void off() {
            enable = false;
        }
    }
}

Run the program:

You can see that after 2 seconds a switch got stuck and the thread did not exit.This is a visibility issue, and AtomicReference and volatile ensure that threads flush updates to data into memory.Because we closed the switch on another timer task thread, if we did not use volatile or AtomicReference to define the object, the operation of the object might not be perceived by other threads.Of course, AtomicReference has more features than just visibility.

AtomicInteger Test

Let's take a look at AtomicInteger's compareAndSet() feature.First of all, it doesn't make any sense to say that this program is just a test of its functionality.In this program, we start 10 threads in random order, and each thread's task is to accumulate numbers in sequence.We use AtomicInteger's compareAndSet() to ensure that chaotic threads can also accumulate in the order we want them to.

@Slf4j
public class AtomicIntegerTest {
    @Test
    public void test() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        List<Thread> threadList = IntStream.range(0,10).mapToObj(i-> {
            Thread thread = new Thread(() -> {
                log.debug("Wait {}->{}", i, i+1);
                while (!atomicInteger.compareAndSet(i, i + 1)) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Done {}->{}", i, i+1);
            });
            thread.setName(UUID.randomUUID().toString());
            return thread;
        }).sorted(Comparator.comparing(Thread::getName)).collect(Collectors.toList());

        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
        log.info("result:{}", atomicInteger.get());
    }
}

The results are as follows:

11:46:30.611 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 4->5
11:46:30.611 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 5->6
11:46:30.612 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 9->10
11:46:30.612 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 7->8
11:46:30.613 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 6->7
11:46:30.611 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 3->4
11:46:30.611 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 1->2
11:46:30.611 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 8->9
11:46:30.611 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 0->1
11:46:30.611 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Wait 2->3
11:46:30.627 [94465214-27bf-4543-80e2-dbaeeb6ddc94] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 0->1
11:46:30.681 [4f15d9ce-044e-4657-b418-4874d03e5d22] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 1->2
11:46:30.681 [60f9cb50-21e6-45bc-9b4d-867783ab033b] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 2->3
11:46:30.734 [037e9595-73cb-4aa1-afee-4250347746c8] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 3->4
11:46:30.780 [2c80b367-d80e-46b5-94f5-b7b172e79dad] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 4->5
11:46:30.785 [7bccbb54-4573-4b77-979b-840613406428] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 5->6
11:46:30.785 [ccc05b0f-11da-41fa-b8fc-59a90dfc2250] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 6->7
11:46:30.787 [949b0c26-febb-4830-ad98-f43521ce4382] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 7->8
11:46:30.838 [3a96c35c-bc4e-45f4-aae4-9fd8611acaea] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 8->9
11:46:30.890 [c0792831-6201-4f6c-b702-79c1b798c3aa] DEBUG me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - Done 9->10
11:46:30.890 [main] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicIntegerTest - result:10

You can see that the output of Wait is out of order, and the output of Done is out of order.

AtomicStampedReference test

AtomicStampedReference can be used to solve ABA problem. What is ABA problem Let's see this example:
Thread 1 reads the number, waits for one second, and then attempts to modify 1 to 3.
Thread 2 starts, reads the number 1, modifies 2, and later modifies it back to 1.
Although AtomicInteger ensures atomic operations for multiple threads, it cannot ensure that 1 is the same 1 that was originally read and has not been modified.
For another example, if we have 100 yuan in our account now and want to modify it to 200 yuan, before the modification, the account has been operated from 100 yuan to 150 yuan and then brought to 100. Although we finally returned to 100, at this time, we should consider that 100 is not the original 100 yuan., the version of this account has changed. If we use optimistic row locks, although the balance is 100, the versions of row locks are definitely inconsistent. AtomicStampedReference is a similar concept of row optimistic locks.

 @Test
    public void test() throws InterruptedException {

        AtomicInteger atomicInteger = new AtomicInteger(1);
        Thread thread1 = new Thread(() -> {
            int value = atomicInteger.get();
            log.info("thread 1 read value: " + value);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (atomicInteger.compareAndSet(value, 3)) {
                log.info("thread 1 update from " + value + " to 3");
            } else {
                log.info("thread 1 update fail!");
            }
        });
        thread1.start();

        Thread thread2 = new Thread(() -> {
            int value = atomicInteger.get();
            log.info("thread 2 read value: " + value);
            if (atomicInteger.compareAndSet(value, 2)) {
                log.info("thread 2 update from " + value + " to 2");

                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                value = atomicInteger.get();
                log.info("thread 2 read value: " + value);
                if (atomicInteger.compareAndSet(value, 1)) {
                    log.info("thread 2 update from " + value + " to 1");
                }
            }
        });
        thread2.start();

        thread1.join();
        thread2.join();
    }

Look at the results:

11:56:20.373 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1
11:56:20.381 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 2
11:56:20.373 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1
11:56:20.483 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2
11:56:20.484 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 1
11:56:21.386 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update from 1 to 3

Here we use AtomicStampedReference to fix this problem:

@Test
public void test2() throws InterruptedException {
    AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);

    Thread thread1 = new Thread(() -> {
        int[] stampHolder = new int[1];
        int value = atomicStampedReference.get(stampHolder);
        int stamp = stampHolder[0];
        log.info("thread 1 read value: " + value + ", stamp: " + stamp);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (atomicStampedReference.compareAndSet(value, 3, stamp, stamp + 1)) {
            log.info("thread 1 update from " + value + " to 3");
        } else {
            log.info("thread 1 update fail!");
        }
    });
    thread1.start();

    Thread thread2 = new Thread(() -> {
        int[] stampHolder = new int[1];
        int value = atomicStampedReference.get(stampHolder);
        int stamp = stampHolder[0];
        log.info("thread 2 read value: " + value + ", stamp: " + stamp);
        if (atomicStampedReference.compareAndSet(value, 2, stamp, stamp + 1)) {
            log.info("thread 2 update from " + value + " to 2");

            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            value = atomicStampedReference.get(stampHolder);
            stamp = stampHolder[0];
            log.info("thread 2 read value: " + value + ", stamp: " + stamp);
            if (atomicStampedReference.compareAndSet(value, 1, stamp, stamp + 1)) {
                log.info("thread 2 update from " + value + " to 1");
            }
            value = atomicStampedReference.get(stampHolder);
            stamp = stampHolder[0];
            log.info("thread 2 read value: " + value + ", stamp: " + stamp);
        }
    });
    thread2.start();

    thread1.join();
    thread2.join();
}

The results are as follows:

11:59:11.946 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 1
11:59:11.951 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 1 to 2
11:59:11.946 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 read value: 1, stamp: 1
11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 2, stamp: 2
11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 update from 2 to 1
11:59:12.053 [Thread-1] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 2 read value: 1, stamp: 3
11:59:12.954 [Thread-0] INFO me.josephzhu.javaconcurrenttest.atomic.AtomicStampedReferenceTest - thread 1 update fail!

As you can see, now when we modify data, we not only modify it with a value, but also provide the version number, which can be read when reading the data.In this case, although the value is unchanged, the version of the data changed from 1 to 3 after thread 2 was modified twice. Thread 1 will fail if it takes version 1 to modify the data again.

An interesting question

This article is short, let's look at an interesting question asked by netizens before, the procedure is as follows.

@Slf4j
public class InterestingProblem {
    int a = 1;
    int b = 1;

    void add() {
        a++;
        b++;
    }

    void compare() {
        if (a < b)
            log.info("a:{},b:{},{}", a, b, a>b);
    }

    @Test
    public void test() throws InterruptedException {

        new Thread(() -> {
            while (true)
                add();
        }).start();
        new Thread(() -> {
            while (true)
                compare();
        }).start();

        TimeUnit.MILLISECONDS.sleep(100);
    }
}

This netizen asked, he said ghostly, not only can he see the log output, but I found that a < B was judged once before, and then output a > b was actually true. As a result, you can see true, JVM may appear bugs:

He felt that a and b were not static. Why should there be any concurrent problems and asked his colleagues:

  • Colleague A said that multithreading was definitely a problem and adding volatile would solve it, but he found that adding volatile to a and b would not work either.
  • Colleague B says AtomicInteger can solve concurrency problems, but it's no use using both a and B
  • Colleague C seems to have noticed the problem that a lock is needed, adding a synchronized keyword lock to the add() method, but it's useless

In fact, this netizen didn't understand the visibility and atomicity problems under multi-threaded conditions. Colleagues also confused the concepts of concurrency.
Let's look at this code, where a thread continuously operates on a and B to accumulate, and a thread judges a and B and then outputs the results.The reason for this problem is essentially that a <b is a three-step operation, taking a, taking b, and comparing are not atomic, and operations a and B may be interpolated by the add thread throughout the process.If you get a first, then a++ b++, then b, then a<b at this time, if a++ first, then a, then b, then b++, then a>b at this time.Looking at the byte code of the compare() method, it is clear that the comparison of ab is clearly a 4-line instruction. We cannot use the number of lines of code to determine whether the operation is atomic or not, which means that other code may have been inserted into other threads during the operation:

 0 aload_0
 1 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a>
 4 aload_0
 5 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b>
 8 if_icmpge 67 (+59)
11 getstatic #4 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.log>
14 ldc #5 <a:{},b:{},{}>
16 iconst_3
17 anewarray #6 <java/lang/Object>
20 dup
21 iconst_0
22 aload_0
23 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a>
26 invokestatic #7 <java/lang/Integer.valueOf>
29 aastore
30 dup
31 iconst_1
32 aload_0
33 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b>
36 invokestatic #7 <java/lang/Integer.valueOf>
39 aastore
40 dup
41 iconst_2
42 aload_0
43 getfield #2 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.a>
46 aload_0
47 getfield #3 <me/josephzhu/javaconcurrenttest/atomic/InterestingProblem.b>
50 if_icmple 57 (+7)
53 iconst_1
54 goto 58 (+4)
57 iconst_0
58 invokestatic #8 <java/lang/Boolean.valueOf>
61 aastore
62 invokeinterface #9 <org/slf4j/Logger.info> count 3
67 return

So this netizen has several questions about his understanding:

  • It does not matter whether the object is secure or static for multithreaded operations, even if it is not static, it may be operated on concurrently by multiple threads
  • You can't tell if the code is atomic based on the number of lines or simplicity of the code, let alone Java code, or even byte code.

Let's take another look at what three of his colleagues said:

  • Colleague A may not have a thorough understanding of the role of volatile. It is not a visibility issue that two threaded operations a and b interfere with each other, and adding volatile only exacerbates the problem (you can write a code comparison and add or leave volatile with or without the probability that it will eventually appear true).
  • Colleague B may not have carefully considered the role of Atomic Integer, which implements atomic operation of Integer in a multithreaded environment. There are no multiple threads to modify a and B concurrently. Using Atomic Integer does not solve the problem
  • Colleague C seems to have seen the problem, but he doesn't understand that it's useless to just have one thread executing the add() method to lock it. The problem now is the interference of add() and compare(), which require serial execution to ensure the overall integrity of a and b.

So the simple way to fix this is to add() and compare() both with the synchronized keyword. Is there any other way to do this than by locking?You can think about it.

Summary

This article briefly tests some common Atomic operation classes under the java.util.concurrent.atomic package, and finally shares a friend's questions and doubts, hoping the text will be useful to you.

Again, see the code My Github Welcome to clone for fun and compliment.

Welcome to my WeChat Public Number: Home Garden

Keywords: PHP Java jvm github

Added by FFFF on Sun, 21 Jul 2019 04:45:04 +0300