Learn java 8 together -- ForkJoin

Let's learn about the use of reduce in Java8 (7) - Stream. The bottom layer of parallel flow is the branch / merge framework.

The core idea of the branch / merge framework is to split a large task into multiple subtasks, and then integrate the execution results of each subtask to return a final result.

ForkJoinPool

The core class of the branch / merge framework is java.util.concurrent.ForkJoinPool. From the name, it can be seen that it is a thread pool, and the number of threads is the default number of processors. You can change the number of threads through the following sentence:

System.setProperty("jaav.util.concurrent.ForkJoinPool.common.parallelism", "8");

RecursiveTask<T>

As mentioned earlier, the ForkJoinPool class is a thread pool, so the purpose of recursive task is to generate a task, and then put the task into the ForkJoinPool.

The recursive task class is an abstract class with an abstract method

protected abstract V compute();

The main function of this method is to split the task logic and return the execution result of the subtask until it cannot be split.

Here is a simple example to illustrate the use of each class.

This example demonstrates adding all the numbers in an array to get a sum. The idea is to split the array in half, get two subarrays, and then split the two subarrays, and so on, until the length of the array is less than or equal to 10.

@AllArgsConstructor
    static class NumberAddTask extends RecursiveTask<Long> {

        // Store numbers
        private long[] numbers;
        // Start of calculation
        private int startIndex;
        // End of calculation
        private int endIndex;

        @Override
        protected Long compute() {
            int len = endIndex - startIndex;
            // Array length is less than 10, cannot split, start operation
            if (len <= 10) {
                return execute();
            }
            // Split the subtasks on the left
            NumberAddTask leftTask = new NumberAddTask(numbers, startIndex, startIndex + len / 2);
            // Add subtasks to the ForkJoinPool
            leftTask.fork();
            // Create task on the right
            NumberAddTask rightTask = new NumberAddTask(numbers, startIndex + len / 2, endIndex);
            // Perform the task on the right
            Long rightSum = rightTask.compute();
            // Read the result of the subtask on the left. It will block here
            Long leftSum = leftTask.join();
            // Merge result
            return leftSum + rightSum;
        }

        private long execute() {
            long sum = 0;
            for (int i = startIndex; i < endIndex; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }

Run code:

long startTime = System.currentTimeMillis();
// Generate an array to hold 1,2,3,4
long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
// Create a task. The start position is 0 and the end position is the length of the array
NumberAddTask numberAddTask = new NumberAddTask(numbers, 0, numbers.length);
// Add tasks to the thread pool to run and get the total
Long sum = new ForkJoinPool().invoke(numberAddTask);
long time = System.currentTimeMillis() - startTime;
System.out.println("sum:" + sum + ", time consuming:" + time + "Millisecond");

Print:

Sum: 500000.5 million, time: 63 MS

Spliterator

Splitter is a new interface in Java 8, which can be interpreted in two ways: split and iterate, that is, the interface provides the function of dividing and iterating. The splitter interface needs to be used with Stream.

Usage:

// Create sequential streams
Stream stream = StreamSupport.stream(Spliterator, false);

// Create a parallel Stream
Stream stream = StreamSupport.stream(Spliterator, true);

These two lines of code are the default implementation of the Collection.stream() method.

The splitter interface declares four abstract methods, which need to be implemented by the developers themselves.

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();
}

Boolean tryadvance (Consumer <? Super T > action): the method is used to traverse and get the element, and then execute it through the Consumer. If the element is retrieved, true is returned, otherwise false is returned

Splitter < T > trysplit(): perform the split operation. If the collection can continue to split again, a new splitter will be returned. If it cannot continue to split, null will be returned

long estimateSize(): used to return the number of remaining elements

int characteristics(): specifies some properties of the collection.

The characteristics list is as follows:

Characteristic Meaning
ORDERD Elements in a set have the concept of order
DISTINCT Return false for any pair of traversed elements x,y, x.equals(y)
SORTED Traversal elements are ordered in a predefined order
SIZED Set element size determinable
NONNULL Ensure that the traversal element is not null
IMMUTABLE Collection element cannot be modified
CONCURRENT The set can be modified by other threads at the same time without synchronization
SUBSIZED The splitter and all the sub splitters it splits have the size feature

characteristics() is used as follows:

@Override
public int characteristics() {
    return Spliterator.SIZED | Spliterator.SUBSIZED;
}

Let's implement a custom partition iterator

static class LongSpliterator implements Spliterator<Long> {

        private long[] array;
        private int index;
        private int end;

        public LongSpliterator(long[] array, int index, int end) {
            this.array = array;
            this.index = index;
            this.end = end;
        }

        @Override
        public boolean tryAdvance(Consumer<? super Long> action) {
            if (index >= 0 && index < end) {
                // Extract elements
                Long l = array[index++];
                // implement
                action.accept(l);
                return true;
            }
            return false;
        }

        /**
         * Try to split the collection.
         * Cutting rules: array 1 is divided into 2, leaving the second half and dividing the first half into 2.
         * @return Return null to end split
         */
        @Override
        public Spliterator<Long> trySplit() {
            int start = 0;
            // Take the middle
            int middle = (start + end) >>> 1;
            if (start < middle) {
                return null;
            }
            // The current index becomes an intermediate value, that is, the operation range of the current class is: middle ~ end
            index = middle;
            // Divide the first half into two
            return new LongSpliterator(array, start, middle);
        }

        @Override
        public long estimateSize() {
            return end - index;
        }

        @Override
        public int characteristics() {
            return Spliterator.SIZED | Spliterator.SUBSIZED;
        }
    }

The set of the partition iterator is a long array. The partition rule is to divide the array one into two, leave the second half and divide the first half into two.

Now let's use this partition iterator to calculate from 1 to 1000000. The test cases are as follows:

    public void testDo() {
        // Create an array
        long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
        long startTime = System.currentTimeMillis();
        LongSpliterator spliterator = new LongSpliterator(numbers, 0, numbers.length);
        // Declare a parallel Stream
        Stream<Long> stream = StreamSupport.stream(spliterator, true);
        // From 1 to 1000000, the result should be: 500000
        Long sum = stream.reduce((n1, n2) -> n1 + n2).orElse(0L);
        long time = System.currentTimeMillis() - startTime;
        System.out.println("sum:" + sum + ", time consuming:" + time + "Millisecond");
    }

Print:

Sum: 500000.5 million, time: 32 ms

Regularly share technology dry goods, learn together and make progress together! WeChat official account: the ape knocks the moonlight code.

Keywords: Programming Java less

Added by Snorkel on Thu, 20 Feb 2020 09:54:11 +0200