Let's learn about the use of reduce in Java8 (7) - Stream. The bottom layer of parallel flow is the branch / merge framework.
The core idea of the branch / merge framework is to split a large task into multiple subtasks, and then integrate the execution results of each subtask to return a final result.
ForkJoinPool
The core class of the branch / merge framework is java.util.concurrent.ForkJoinPool. From the name, it can be seen that it is a thread pool, and the number of threads is the default number of processors. You can change the number of threads through the following sentence:
System.setProperty("jaav.util.concurrent.ForkJoinPool.common.parallelism", "8");
RecursiveTask<T>
As mentioned earlier, the ForkJoinPool class is a thread pool, so the purpose of recursive task is to generate a task, and then put the task into the ForkJoinPool.
The recursive task class is an abstract class with an abstract method
protected abstract V compute();
The main function of this method is to split the task logic and return the execution result of the subtask until it cannot be split.
Here is a simple example to illustrate the use of each class.
This example demonstrates adding all the numbers in an array to get a sum. The idea is to split the array in half, get two subarrays, and then split the two subarrays, and so on, until the length of the array is less than or equal to 10.
@AllArgsConstructor static class NumberAddTask extends RecursiveTask<Long> { // Store numbers private long[] numbers; // Start of calculation private int startIndex; // End of calculation private int endIndex; @Override protected Long compute() { int len = endIndex - startIndex; // Array length is less than 10, cannot split, start operation if (len <= 10) { return execute(); } // Split the subtasks on the left NumberAddTask leftTask = new NumberAddTask(numbers, startIndex, startIndex + len / 2); // Add subtasks to the ForkJoinPool leftTask.fork(); // Create task on the right NumberAddTask rightTask = new NumberAddTask(numbers, startIndex + len / 2, endIndex); // Perform the task on the right Long rightSum = rightTask.compute(); // Read the result of the subtask on the left. It will block here Long leftSum = leftTask.join(); // Merge result return leftSum + rightSum; } private long execute() { long sum = 0; for (int i = startIndex; i < endIndex; i++) { sum += numbers[i]; } return sum; } }
Run code:
long startTime = System.currentTimeMillis(); // Generate an array to hold 1,2,3,4 long[] numbers = LongStream.rangeClosed(1, 1000000).toArray(); // Create a task. The start position is 0 and the end position is the length of the array NumberAddTask numberAddTask = new NumberAddTask(numbers, 0, numbers.length); // Add tasks to the thread pool to run and get the total Long sum = new ForkJoinPool().invoke(numberAddTask); long time = System.currentTimeMillis() - startTime; System.out.println("sum:" + sum + ", time consuming:" + time + "Millisecond");
Print:
Sum: 500000.5 million, time: 63 MS
Spliterator
Splitter is a new interface in Java 8, which can be interpreted in two ways: split and iterate, that is, the interface provides the function of dividing and iterating. The splitter interface needs to be used with Stream.
Usage:
// Create sequential streams Stream stream = StreamSupport.stream(Spliterator, false); // Create a parallel Stream Stream stream = StreamSupport.stream(Spliterator, true);
These two lines of code are the default implementation of the Collection.stream() method.
The splitter interface declares four abstract methods, which need to be implemented by the developers themselves.
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
Boolean tryadvance (Consumer <? Super T > action): the method is used to traverse and get the element, and then execute it through the Consumer. If the element is retrieved, true is returned, otherwise false is returned
Splitter < T > trysplit(): perform the split operation. If the collection can continue to split again, a new splitter will be returned. If it cannot continue to split, null will be returned
long estimateSize(): used to return the number of remaining elements
int characteristics(): specifies some properties of the collection.
The characteristics list is as follows:
Characteristic | Meaning |
---|---|
ORDERD | Elements in a set have the concept of order |
DISTINCT | Return false for any pair of traversed elements x,y, x.equals(y) |
SORTED | Traversal elements are ordered in a predefined order |
SIZED | Set element size determinable |
NONNULL | Ensure that the traversal element is not null |
IMMUTABLE | Collection element cannot be modified |
CONCURRENT | The set can be modified by other threads at the same time without synchronization |
SUBSIZED | The splitter and all the sub splitters it splits have the size feature |
characteristics() is used as follows:
@Override public int characteristics() { return Spliterator.SIZED | Spliterator.SUBSIZED; }
Let's implement a custom partition iterator
static class LongSpliterator implements Spliterator<Long> { private long[] array; private int index; private int end; public LongSpliterator(long[] array, int index, int end) { this.array = array; this.index = index; this.end = end; } @Override public boolean tryAdvance(Consumer<? super Long> action) { if (index >= 0 && index < end) { // Extract elements Long l = array[index++]; // implement action.accept(l); return true; } return false; } /** * Try to split the collection. * Cutting rules: array 1 is divided into 2, leaving the second half and dividing the first half into 2. * @return Return null to end split */ @Override public Spliterator<Long> trySplit() { int start = 0; // Take the middle int middle = (start + end) >>> 1; if (start < middle) { return null; } // The current index becomes an intermediate value, that is, the operation range of the current class is: middle ~ end index = middle; // Divide the first half into two return new LongSpliterator(array, start, middle); } @Override public long estimateSize() { return end - index; } @Override public int characteristics() { return Spliterator.SIZED | Spliterator.SUBSIZED; } }
The set of the partition iterator is a long array. The partition rule is to divide the array one into two, leave the second half and divide the first half into two.
Now let's use this partition iterator to calculate from 1 to 1000000. The test cases are as follows:
public void testDo() { // Create an array long[] numbers = LongStream.rangeClosed(1, 1000000).toArray(); long startTime = System.currentTimeMillis(); LongSpliterator spliterator = new LongSpliterator(numbers, 0, numbers.length); // Declare a parallel Stream Stream<Long> stream = StreamSupport.stream(spliterator, true); // From 1 to 1000000, the result should be: 500000 Long sum = stream.reduce((n1, n2) -> n1 + n2).orElse(0L); long time = System.currentTimeMillis() - startTime; System.out.println("sum:" + sum + ", time consuming:" + time + "Millisecond"); }
Print:
Sum: 500000.5 million, time: 32 ms
Regularly share technology dry goods, learn together and make progress together! WeChat official account: the ape knocks the moonlight code.