concept
Starting from JDK 1.7, Java provides the ForkJoin framework for parallel task execution. Its idea is to divide a large task into several small tasks, and finally summarize the results of each small task to get the results of this large task. As a concurrency framework, it was added to our Java and issued in jdk7 java.util.concurrent And act as the underlying framework in java 8's lambda parallel flow.
Mind mapping
Introduction to core classes
-
ForkJoinPool: as the manager in the fork/join framework, the most original tasks must be handed over to it for processing. It is responsible for controlling the number of workerthreads in the whole fork/join, and the creation and activation of workerthreads are controlled by it. It is also responsible for the creation and allocation of workQueue queues. Whenever a workerThread is created, it is responsible for the allocation of the corresponding workQueue. Then it hands all the received work to the worker thread, which can be said to be the container of the entire frok/join.
-
ForkJoinWorkerThread: the real "worker" in fork/join is a thread in essence. There's a ForkJoinPool.WorkQueue The queue of is used to store the work to be done. Before receiving the work, it needs to register with the forkjoinpool to get the corresponding workQueue. Then take the task out of the workQueue. It is dependent on the forkjoinpool and survives. If the forkjoinpool is destroyed, it will end.
-
ForkJoinPool.WorkQueue : This is the double ended queue, which is responsible for storing the received tasks.
-
ForkJoinTask: it represents the task type in the fork/join. We usually use its two subclasses RecursiveTask and RecursiveAction. The two differences are that the recursive task task has a return value and the recursive action has no return value. The processing logic of tasks, including task segmentation, is concentrated in the compute() method.
code implementation
Let's take printing each element of a List collection from 0 to 10000 as an example:
Implementation class:
package com.forkjoin.test.task; import java.util.List; import java.util.concurrent.RecursiveTask; /** * forkjoin Split task implementation */ public class ForkJoinTask extends RecursiveTask<List<Integer>> { private List<Integer> integerList; public ForkJoinTask(List<Integer> integerList) { this.integerList = integerList; } @Override protected List<Integer> compute() { System.out.println("Number of data:" + integerList.size()); // When the number of data is greater than 100, split the task if(integerList.size() > 100) { int minSize = integerList.size() / 2; List<Integer> leftList = integerList.subList(0, minSize); List<Integer> rightList = integerList.subList(minSize, integerList.size()); // Split two tasks and execute them in parallel ForkJoinTask leftTask = new ForkJoinTask(leftList); ForkJoinTask rightTask = new ForkJoinTask(rightList); this.invokeAll(leftTask,rightTask); }else { // Execute the task until the split data is no more than 100 integerList.stream().forEach(x -> { System.out.println(x.toString()); }); } return null; } }
Calling class:
package com.forkjoin.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import com.forkjoin.test.task.ForkJoinTask; public class ForkJoinTest { public static void main(String[] args) { // Build a 0-10000 Set of integers for List<Integer> list = new ArrayList<>(); for (int i = 0; i <= 10000; i++) { list.add(i); } // forkjoin Split task call ForkJoinTask task = new ForkJoinTask(list); ForkJoinPool pool=ForkJoinPool.commonPool(); pool.submit(task); pool.shutdown(); } }