Java fork/join -- parallel execution of split tasks

concept

Starting from JDK 1.7, Java provides the ForkJoin framework for parallel task execution. Its idea is to divide a large task into several small tasks, and finally summarize the results of each small task to get the results of this large task. As a concurrency framework, it was added to our Java and issued in jdk7 java.util.concurrent And act as the underlying framework in java 8's lambda parallel flow.

Mind mapping

Introduction to core classes

  • ForkJoinPool: as the manager in the fork/join framework, the most original tasks must be handed over to it for processing. It is responsible for controlling the number of workerthreads in the whole fork/join, and the creation and activation of workerthreads are controlled by it. It is also responsible for the creation and allocation of workQueue queues. Whenever a workerThread is created, it is responsible for the allocation of the corresponding workQueue. Then it hands all the received work to the worker thread, which can be said to be the container of the entire frok/join.

  • ForkJoinWorkerThread: the real "worker" in fork/join is a thread in essence. There's a ForkJoinPool.WorkQueue The queue of is used to store the work to be done. Before receiving the work, it needs to register with the forkjoinpool to get the corresponding workQueue. Then take the task out of the workQueue. It is dependent on the forkjoinpool and survives. If the forkjoinpool is destroyed, it will end.

  • ForkJoinPool.WorkQueue : This is the double ended queue, which is responsible for storing the received tasks.

  • ForkJoinTask: it represents the task type in the fork/join. We usually use its two subclasses RecursiveTask and RecursiveAction. The two differences are that the recursive task task has a return value and the recursive action has no return value. The processing logic of tasks, including task segmentation, is concentrated in the compute() method.

code implementation

Let's take printing each element of a List collection from 0 to 10000 as an example:

Implementation class:

package com.forkjoin.test.task;

import java.util.List;
import java.util.concurrent.RecursiveTask;

/**
 * forkjoin Split task implementation
 */
public class ForkJoinTask extends RecursiveTask<List<Integer>> {
    
    private List<Integer> integerList;
    
    public ForkJoinTask(List<Integer> integerList) {
        this.integerList = integerList;
    }

    @Override
    protected List<Integer> compute() {
        System.out.println("Number of data:" + integerList.size());
        // When the number of data is greater than 100, split the task
        if(integerList.size() > 100) {
            int minSize = integerList.size() / 2;
            List<Integer> leftList = integerList.subList(0, minSize);
            List<Integer> rightList = integerList.subList(minSize, integerList.size());
            // Split two tasks and execute them in parallel
            ForkJoinTask leftTask = new ForkJoinTask(leftList);
            ForkJoinTask rightTask = new ForkJoinTask(rightList);
            this.invokeAll(leftTask,rightTask);
        }else {
            // Execute the task until the split data is no more than 100
            integerList.stream().forEach(x -> {
                System.out.println(x.toString());
            });
        }
        return null;
    }

}

Calling class:

package com.forkjoin.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import com.forkjoin.test.task.ForkJoinTask;

public class ForkJoinTest {

    public static void main(String[] args) {
        // Build a 0-10000 Set of integers for
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i <= 10000; i++) {
            list.add(i);
        }
        // forkjoin Split task call
        ForkJoinTask task = new ForkJoinTask(list);
        ForkJoinPool pool=ForkJoinPool.commonPool();
        pool.submit(task);
        pool.shutdown();
    }
    
}

Keywords: Java JDK Lambda

Added by goldenei on Mon, 22 Jun 2020 08:13:24 +0300