JUC study notes

Shangsi Valley JUC source code teaching practical tutorial complete version (java juc thread intensive lecture)

1.volatile keyword and memory visibility

When the program runs, the JVM will allocate an independent cache for each thread executing tasks to improve efficiency.

The memory visibility problem is that when multiple threads operate to share data, they are not visible to each other.

volatile keyword: when multiple threads operate to share data, it can ensure that the data in memory is visible. Compared with synchronized, it is a relatively lightweight synchronization strategy.

volatile is not "mutually exclusive", nor can it guarantee the "atomicity" of variables

public class VolatileTest {

    public static void main(String[] args) {
        ThreadDemo threadDemo = new ThreadDemo();
        new Thread(threadDemo).start();

        while (true) {
//            synchronized (threadDemo) {
                if (threadDemo.isFlag()) {
                    System.out.println("---------");
                    break;
                }
//            }
        }
    }
}

class ThreadDemo implements Runnable {

//    private boolean flag = false;
    private volatile boolean flag = false;

    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (Exception e) {
        }
        flag = true;
        System.out.println("flag=" + isFlag());
    }

    public boolean isFlag() {
        return flag;
    }
}

2. Atomic variables and CAS algorithm

Atomic variable: jdk1 After 5, Java util. concurrent. Common atomic variables (including AtomicInteger) are provided under the atomic package
1. volatile ensures memory visibility
2. CAS (compare and swap) algorithm ensures the atomicity of data. CAS algorithm is the hardware support for concurrent operation and shared data
CAS contains three operands: memory value V, estimated value A and updated value B. If and only if V == A, V = B (assign B to A). Otherwise, nothing will be done

public class AtomicTest {

    public static void main(String[] args) {
        AtomicDemo atomicDemo = new AtomicDemo();
        for (int i = 0; i < 10; i++) {
            new Thread(atomicDemo).start();
        }
    }

}

class AtomicDemo implements Runnable {

//    private int serialNum = 0;
    private AtomicInteger serialNum = new AtomicInteger();

    @Override
    public void run() {

        try {
            Thread.sleep(1000);
        } catch (Exception e) {
        }

        System.out.println(Thread.currentThread().getName() + ":" + getSerialNum());
    }

    public int getSerialNum() {
//        return serialNum++;
        return serialNum.getAndIncrement();
    }
}

3. Simulate CAS algorithm

public class CompareAndSwapTest {

    public static void main(String[] args) {
        final CompareAndSwap cas = new CompareAndSwap();

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int expectedValue = cas.get();
                    boolean flag = cas.compareAndSet(expectedValue, (int)(Math.random() * 100));
                    System.out.println(flag);
                }
            }).start();
        }
    }
}

class CompareAndSwap {
    private int value;

    // Get memory value
    public synchronized int get() {
        return value;
    }

    // compare
    public synchronized int compareAndSwap(int expectedValue, int newValue) {
        int oldValue = value;

        if (oldValue == expectedValue) {
            this.value = newValue;
        }

        return oldValue;
    }

    // set up
    public synchronized boolean compareAndSet(int expectedValue, int newValue) {
        return expectedValue == compareAndSwap(expectedValue, newValue);
    }
}

4. Synchronization container class ConcurrentHashMap

  • Java 5.0 in Java util. The concurrent package provides a variety of concurrent container classes to improve the performance of the synchronization container.
  • The ConcurrentHashMap synchronization container class is a thread safe hash table added to Java 5.
    For multi-threaded operations, it is between HashMap and Hashtable.
    java1. Before 8, the internal "lock segmentation" mechanism was used to replace the exclusive lock of Hashtable. To improve performance, 1.8 is followed by CAS
  • This package also provides a collection implementation designed for use in a multithreaded context:
    ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, and CopyOnWriteArraySet.
    When many threads are expected to access a given collection, concurrent HashMap is usually better than synchronous HashMap,
    ConcurrentSkipListMap is usually better than synchronized TreeMap.
    CopyOnWriteArrayList is better than synchronized ArrayList when the expected reading and traversal are much larger than the number of updates to the list.
/**
 * CopyOnWriteArrayList/CopyOnWriteArraySet : Write and copy
 * Note: it is not suitable for scenes with many adding operations, because each addition will be copied, which is expensive and inefficient
 *      It is suitable for scenarios with many concurrent iterative operations
 */
public class CopyOnWriteArrayListTest {

    public static void main(String[] args) {
        HelloThread ht = new HelloThread();

        for (int i = 0; i < 10; i++) {
            new Thread(ht).start();
        }
    }

}

class HelloThread implements Runnable {

//    private static List<String> list = Collections.synchronizedList(new ArrayList<>());
    private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

    static {
        list.add("AA");
        list.add("BB");
        list.add("CC");
    }

    @Override
    public void run() {
        Iterator<String> it = list.iterator();

        while (it.hasNext()) {
            System.out.println(it.next());
            // When list < string > is used, Java will be thrown here util. Concurrentmodificationexception exception
            list.add("AA");
        }
    }
}

5.CountDownLatch locking

  • Java 5.0 in Java util. The concurrent package provides a variety of concurrent container classes to improve the performance of the synchronization container.

  • CountDownLatch is a synchronization helper class that allows one or more threads to wait until a set of operations are completed in other threads.

  • Locking can delay the progress of a thread until it reaches the termination state. Locking can be used to ensure that some activities do not continue until other activities are completed:

    Ensure that a calculation continues to execute after all the resources it needs are initialized;
    Ensure that a service is started only after all other services it depends on have been started;
    Wait until all participants of an operation are ready before continuing.

/**
 * CountDownLatch Locking: when some operations are completed, the current operation will not continue until the operations of other threads are completed
 */
public class CountDownLatchTest {

    public static void main(String[] args) {
        // Five threads are opened later, and the count 5 is passed in here
        CountDownLatch countDownLatch = new CountDownLatch(5);
        LatchDemo latchDemo = new LatchDemo(countDownLatch);

        long start = System.currentTimeMillis();

        for (int i = 0; i < 5; i++) {
            new Thread(latchDemo).start();
        }


        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Execute when the count is reduced to 0
        long end = System.currentTimeMillis();
        System.out.println("Time consuming:" + (end - start));
    }

}

class LatchDemo implements Runnable {
    public CountDownLatch countDownLatch;

    public LatchDemo(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        synchronized (this) {
            try {
                for (int i = 0; i < 50000; i++) {
                    if (i % 2 == 0) {
                        System.out.println(i);
                    }
                }
            }finally {
                countDownLatch.countDown();
            }
        }
    }
}

6. Implement Callable interface

/**
 * Thread creation mode 3: implement Callable interface
 * Compared with the way of implementing the Runnable interface, the method can have a return value and throw an exception
 */
public class CallableTest {
    public static void main(String[] args) {
        CallableImpl callableImpl = new CallableImpl();

        // The implementation of Callable mode requires the support of FutureTask implementation class to receive operation results. FutureTask is the implementation class of the Future interface
        // Another construction method is futuretask (runnable, runnable, V result)
        FutureTask<Integer> result = new FutureTask<>(callableImpl);

        new Thread(result).start();

        // Receive the result of thread operation
        try {
            Integer sum = result.get(); // futureTask can also be used for locking
            System.out.println(sum);
            System.out.println("---------When the sub thread is executed get()Only when you get the result can you execute here---------");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}

class CallableImpl implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 100; i++) {
            sum += i;
        }
        return sum;
    }
}

7.Lock synchronization lock

I. ways to solve multithreading safety problems:
1. Synchronized code block (synchronized implicit lock)
2. Synchronization method (synchronized implicit lock)
3. Synchronous Lock (it appears after jdk 1.5. It is an explicit Lock. It needs to be locked through the lock() method. The Lock must be released through the unlock() method, so there are certain risks, such as the Lock is not successfully released)

public class LockTest {
    public static void main(String[] args) {
        Ticket ticket = new Ticket();

        new Thread(ticket, "1 Number window").start();
        new Thread(ticket, "2 Number window").start();
        new Thread(ticket, "3 Number window").start();
    }
}

class Ticket implements Runnable {

    private int ticketNum = 100;
    private Lock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            lock.lock();    // Lock
            try {
                if (ticketNum > 0) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "The remaining tickets are:" + --ticketNum);
                }
            } finally {
                lock.unlock();  // Release lock
            }
        }
    }
}

8. Producer consumer case - false Awakening

How to implement the wake-up waiting mechanism with Lock synchronization Lock, that is, the wake-up waiting mechanism like wait() and notify() of synchronized keyword

8.1 problems caused by not using the wake-up waiting mechanism

public class ProductorAndConsumerTest {

    public static void main(String[] args) {
        Clerk clerk = new Clerk();
        Productor productor = new Productor(clerk);
        Consumer consumer = new Consumer(clerk);

        new Thread(productor, "producer A").start();
        new Thread(consumer, "consumer B").start();
    }
}

// clerk
class Clerk {
    private int productNum = 0;

    // Purchase
    public synchronized void get() {
        if (productNum >= 5) {
            System.out.println("The commodity bin is full!");
//            try {
//                this.wait();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
        } else {
            System.out.println(Thread.currentThread().getName() + "Purchase:" + ++productNum);
//            this.notifyAll();
        }
    }

    // Sell goods
    public synchronized void sale() {
        if (productNum <= 0) {
            System.out.println("The goods have been sold out!");
//            try {
//                this.wait();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
        } else {
            System.out.println(Thread.currentThread().getName() + "Sold:" + --productNum);
//            this.notifyAll();
        }
    }
}

// producer
class Productor implements Runnable {

    private Clerk clerk;

    public Productor(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.get();
        }
    }
}

// consumer
class Consumer implements Runnable {

    private Clerk clerk;

    public Consumer(Clerk clerk) {
        this.clerk = clerk;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            clerk.sale();
        }
    }
}

Even if the warehouse is full, the producer keeps buying; Even if the goods are sold out, consumers still want to spend

8.2 use the wake-up waiting mechanism

Release the code annotated in 8.1. At this time, the running results are valid data

8.3 problem correction 1

There are some problems with the waiting wake-up mechanism in 8.2
When changing the position to 1 and adding 0.2 second delay to the producer

// Purchase
public synchronized void get() {
    if (productNum >= 1) {		// Change the position to 1
        System.out.println("The commodity bin is full!");
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } else {
        System.out.println(Thread.currentThread().getName() + "Purchase:" + ++productNum);
        this.notifyAll();
    }
}

@Override
public void run() {
    for (int i = 0; i < 10; i++) {
        try {
            Thread.sleep(200);	// Give the producer a delay of 0.2 seconds
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        clerk.get();
    }
}

Execute the program, the program fails to stop

reason:
Producers have delays, so producers produce slowly and consumers consume quickly
Suppose that the consumer thread loop has the last time and the producer thread loop has the last two times,
The consumer enters sale(), productNum=0, and then waits for wake-up
The producer counts down the penultimate cycle, enters get(), enters else to make productNum=1, and then notififyall()
At this time, the consumer is waiting for the awakened thread to compete with the producer for resources for the last time,
Assuming that the consumer grabs it, the consumer continues to complete the execution from the wait and does not consume productNum, so the productNum is still 1
At this time, the last producer enters get(), meets the condition productnum > = 1 and waits (), and will not be awakened again (the last sentence of the execution result is "the commodity bin is full!")

solve:
Take out the wake-up statement and no longer put it in the else statement

// Purchase
public synchronized void get() {
    if (productNum >= 1) {
        System.out.println("The commodity bin is full!");
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //        else {
    System.out.println(Thread.currentThread().getName() + "Purchase:" + ++productNum);
    this.notifyAll();
    //        }
}

// Sell goods
public synchronized void sale() {
    if (productNum <= 0) {
        System.out.println("The goods have been sold out!");
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //        else {
    System.out.println(Thread.currentThread().getName() + "Sold:" + --productNum);
    this.notifyAll();
    //        }
}

results of enforcement

8.4 problem correction 2

8.3 after the problem is corrected, if multiple producers and consumers still have problems

In some versions, interrupts and false wakes are possible, so methods should always be used in loops.

solve:
Change the if in the purchase and sale methods to while, so that the awakened thread needs to re judge the value of productNum

// Purchase
public synchronized void get() {
//	if (productNum >= 1) {
	while (productNum >= 1) {   // To avoid false wake-up problems, it should always be used in the loop

// ------------------

// Sell goods
public synchronized void sale() {
//	if (productNum <= 0) {
	while (productNum <= 0) {

9.Condition control thread communication

  • The Condition interface describes Condition variables that may be associated with locks. These variables are similar in usage to object Wait access is similar to the implicit monitor, but provides more powerful functionality.
    Special note: a single Lock may be associated with multiple Condition objects. To avoid compatibility problems, the name of the Condition method is different from that in the corresponding Object version.
  • In the Condition object, await(), signal() and signalAll() correspond to wait(), notify() and notifyAll() methods, respectively.
  • The Condition instance is essentially bound to a Lock. To get a Condition instance for a particular Lock instance, use its newCondition() method.
// clerk
class ClerkForLock {
    private int productNum = 0;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    // Purchase
    public void get() {
        lock.lock();

        try {
            while (productNum >= 1) {
                System.out.println("The commodity bin is full!");
                try {
//                    this.wait();
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "Purchase:" + ++productNum);
//            this.notifyAll();
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    // Sell goods
    public void sale() {
        lock.lock();

        try {
            while (productNum <= 0) {
                System.out.println("The goods have been sold out!");
                try {
//                    this.wait();
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "Sold:" + --productNum);
//            this.notifyAll();
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

10. Threads alternate in sequence

Write A program and start three threads. The IDs of these three threads are A, B and C respectively. Each thread prints its own ID 10 times. It is required that the output results must be displayed in order.
For example: abcabcabcabc... Recursion in sequence

public class ABCAlternateTest {

    public static void main(String[] args) {
        AlternateDemo alternateDemo = new AlternateDemo();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    alternateDemo.loopA(i);
                }
            }
        }, "A").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    alternateDemo.loopB(i);
                }
            }
        }, "B").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    alternateDemo.loopC(i);
                    System.out.println("---------");
                }
            }
        }, "C").start();
    }

}

class AlternateDemo {
    private int threadNum = 1;  // A token of the thread currently executing

    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void loopA(int loopIndex) {
        lock.lock();

        try {
            if (threadNum != 1) {
                condition1.await();
            }

            System.out.println(Thread.currentThread().getName() + "\t" + threadNum + "\t" + loopIndex);
            threadNum = 2;
            condition2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void loopB(int loopIndex) {
        lock.lock();

        try {
            if (threadNum != 2) {
                condition2.await();
            }

            System.out.println(Thread.currentThread().getName() + "\t" + threadNum + "\t" + loopIndex);
            threadNum = 3;
            condition3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void loopC(int loopIndex) {
        lock.lock();

        try {
            if (threadNum != 3) {
                condition3.await();
            }

            System.out.println(Thread.currentThread().getName() + "\t" + threadNum + "\t" + loopIndex);
            threadNum = 1;
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

11.ReadWriteLock

A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.

ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. As long as there is no writer, the read lock can be held by multiple reader threads at the same time. Write locks are exclusive.

// Writing, reading and writing are mutually exclusive; Reading does not need to be mutually exclusive
public class ReadWriteLockTest {
    public static void main(String[] args) {
        ReadWriteLockDemo readWriteLockDemo = new ReadWriteLockDemo();

        new Thread(new Runnable() {
            @Override
            public void run() {
                readWriteLockDemo.set((int)(Math.random() * 101));
            }
        }, "Write thread").start();

        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    readWriteLockDemo.get();
                }
            }).start();
        }

    }
}

class ReadWriteLockDemo {
    private int num = 0;
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    // Read operation
    public void get() {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " : " + num);
        } finally {
            readWriteLock.readLock().unlock();
        }
    }

    // Write lock
    public void set(int num) {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " write " + num);
            this.num = num;
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
}

12. Thread eight lock

  1. Two common synchronization methods, one object, two threads, standard print / / 12 or 21 randomly execute the one who grabs the lock first
  2. A new sleep is added in getOne(). Two threads of the same object print / / 12 or 21 random sleep does not release the lock
  3. Add a new common method getThree(), and print three threads of the same object (sleep in getOne()) / / 3. You can enter the method body to print regardless of who is holding the lock. 1 and 2 randomly look at 123 who grabs the CPU time slice first and enters the corresponding method body
  4. Two common synchronization methods, two objects, two threads (sleep in getOne()) print / / 21
  5. Getone () prints / / the difference between class 21 locks and object locks for static two threads of the same object (sleep in getOne())
  6. getOne() and getTwo() are static objects. Two threads (sleep in getOne() print / / the difference between class 12 locks and object locks
  7. Only getOne() is static, and two objects and two threads (sleep in getOne()) print / / 21
  8. getOne() and getTwo() are static objects. Two threads (sleep in getOne() print / / 12

The lock of non static method is this by default, and the lock of static method is the corresponding Class instance

public class Thread8MonitorTest {

    public static void main(String[] args) {
        Number number = new Number();
        Number number2 = new Number();

        new Thread(new Runnable() {
            @Override
            public void run() {
                number.getOne();
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
//                number.getTwo();
                number2.getTwo();
            }
        }).start();

//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                number.getThree();
//            }
//        }).start();
    }
}

class Number {

//    public synchronized void getOne() {
    public static synchronized void getOne() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("1");
    }

//    public synchronized void getTwo() {
    public static synchronized void getTwo() {
        System.out.println("2");
    }

    public void getThree() {
        System.out.println("3");
    }
}

13. Thread pool

ThreadPoolExecutor class API document:

To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks.
However, programmers are urged to use the more convenient Executors factory methods
Executors.newCachedThreadPool() (unbounded thread pool, with automatic thread reclamation),
Executors.newFixedThreadPool(int) (fixed size thread pool)
Executors.newSingleThreadExecutor() (single background thread),
that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class:

In order to work in all situations, this class provides many adjustable parameters and extensible hooks.
However, programmers are advised to use the more convenient Executors factory method
Executors.newCachedThreadPool() (unbounded thread pool, automatic thread recycling),
Executors.newFixedThreadPool(int) (fixed size thread pool),
Executors.newSingleThreadExecutor() (single background thread),
They have preconfigured settings for the most common usage scenarios. Otherwise, use the following guidelines when manually configuring and adjusting the class.

public class ThreadPoolTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. Create thread pool
        ExecutorService pool = Executors.newFixedThreadPool(5);

        ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();

        // 2. Assign Callable tasks to threads in the thread pool
        List<Future<Integer>> resultList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future<Integer> future = pool.submit(new Callable<Integer>() {
                int sum = 0;

                @Override
                public Integer call() throws Exception {
                    int i = 0;
                    for (int i1 = 0; i1 <= 100; i1++) {
                        sum += i1;
                    }
                    return sum;
                }
            });
            resultList.add(future);
        }

        for (Future<Integer> future : resultList) {
            System.out.println(future.get());
        }

//        // 2. Assign a task Runnable to threads in the thread pool
//        for (int i = 0; i < 10; i++) {
//            pool.submit(threadPoolDemo);
//        }

        // 3. Close thread pool
        // Shut down waits for all tasks in the thread pool to be completed before shutting down, and no new tasks will be received; shutdown now is to end immediately, regardless of whether the task is completed or not
        pool.shutdown();
    }
}

class ThreadPoolDemo implements Runnable {
    private int i = 0;

    @Override
    public void run() {
        while (i < 20) {
            System.out.println(Thread.currentThread().getName() + " : " + i++);
        }
    }
}

14. Thread scheduling

public class ScheduledThreadPoolTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

        for (int i = 0; i < 5; i++) {
            Future<Integer> future = pool.schedule(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int num = new Random().nextInt(100); // Random number within 100
                    System.out.println(Thread.currentThread().getName() + " : " + num);
                    return num;
                }
            }, 3, TimeUnit.SECONDS);
            System.out.println(future.get());
        }

        pool.shutdown();
    }
}

15.ForkJoinPool branch / merge framework work theft

jdk1. After 7. Java 8 has improved it

The difference between Fork/Join framework and thread pool

Adopt the "work stealing" mode: when executing a new task, it can split it into smaller tasks for execution, add the small task to the thread queue, and then steal one from the queue of a random thread and put it in its own queue.

Compared with the general implementation of thread pool, the advantage of fork/join framework is reflected in the processing mode of the tasks contained in it.
In a general thread pool, if the task being executed by a thread cannot continue to run for some reason, the thread will be in a waiting state.
In the implementation of fork/join framework, if a subproblem cannot continue to run because it waits for the completion of another subproblem, the thread dealing with the subproblem will actively look for other subproblems that have not yet run to execute. This method reduces the waiting time of threads and improves the performance.

public class ForkJoinPoolTest {
    public static void main(String[] args) {
        Instant start = Instant.now();

        ForkJoinPool pool = new ForkJoinPool();

        ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 10000000000L);

        Long sum = pool.invoke(task);
        System.out.println(sum);

        Instant end = Instant.now();
        System.out.println("Time consuming:" + Duration.between(start, end).toMillis());
    }

    /**
     * Normal for loop mode
     */
    @Test
    public void Test() {
        Instant start = Instant.now();

        long sum = 0L;
        for (long i = 0; i <= 10000000000L; i++) {
            sum += i;
        }
        System.out.println(sum);

        Instant end = Instant.now();
        System.out.println("Time consuming:" + Duration.between(start, end).toMillis());
    }

    /**
     * java8 New features
     */
    @Test
    public void test1() {
        Instant start = Instant.now();

        Long sum = LongStream.rangeClosed(0L, 10000000000L)
                                .parallel()
                                .reduce(0L, Long::sum);
        System.out.println(sum);

        Instant end = Instant.now();
        System.out.println("Time consuming:" + Duration.between(start, end).toMillis());
    }

}

class ForkJoinSumCalculate extends RecursiveTask<Long> {    // The Task has a return value, but the Action does not

    private Long start;
    private Long end;

    private static final long THURSHOLD = 10000L;   // critical value

    public ForkJoinSumCalculate(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        Long length = end - start;
        if (length <= THURSHOLD) {
            Long sum = 0L;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            Long middle = (start + end) / 2;

            ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
            left.fork();    // Split and push into the thread queue at the same time

            ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
            right.fork();    // Split and push into the thread queue at the same time

            return left.join() + right.join();
        }
    }
}

Keywords: Multithreading Concurrent Programming JUC

Added by Minor Threat on Thu, 03 Feb 2022 00:08:56 +0200