Prior to JDK7, parallel processing of data sets was cumbersome. First, you need to clearly divide the data structure containing data into several sub parts. Second, you need to allocate an independent thread to each sub part; Third, you need to synchronize them at the right time to avoid unwanted race conditions, wait for all threads to complete, and finally merge these parts.
Doug Lea introduced the fork/join framework in JDK7 to make these operations more stable and less error prone.
Main contents of this section:
1. Use parallel stream to process data in parallel
2. Performance analysis of parallel flow
3. fork/join framework
4. Use splitter to split the flow
After learning this section, you are expected to achieve:
1. Skillfully use parallel flow to accelerate business performance
2. Understand the internal working principle of the flow to prevent misuse
3. Control the division mode of data block through splitter
Parallel stream
You can convert a data source to a parallel stream by calling the parallelStream method on the data source. Parallel flow is a flow that divides the content into multiple data blocks and processes each data block with different threads. This automatically transfers the workload to multi-core parallel processing.
Consider the following implementation: given a positive integer n, calculate the sum of 1 + 2 +... N.
Implementation using stream:
private static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum); }
Convert the above sequential stream to parallel stream, and the implementation is as follows:
private static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); }
That is, the sequential flow can be converted into parallel flow by calling the method parallel.
However, it should be noted that the stream starts to execute only when the terminal operates, so whether the current stream is a sequential stream or a parallel stream depends on the stream type closest to the terminal operation, for example:
list.stream().parallel().filter(e -> e.age > 20).sequential().map(...).parallel().collect(...);
In this case, it is not expected to use parallel streams to perform filtering and then sequential streams to perform mapping transformation. Instead, the entire pipeline operation is executed as a parallel stream.
Configure the thread pool used by parallel streams
The default ForkJoinPool is used inside the parallel stream, and its default number of threads is the number of processors (runtime. Getruntime() availableProcessors()). It can also be changed by setting system properties (System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12")). However, it is a global setting that will affect all parallel streams. Generally speaking, the number of threads equal to the number of processors is a reasonable value and does not need to be modified.
Test flow performance
Generally speaking, the same function gives us the impression that the performance of parallel flow will be better than that of sequential flow. However, in software engineering, the golden rule of optimizing performance is measurement. We have developed a program to measure the accumulation of four writing methods to see how the performance is:
@Slf4j public class SumSample { /** * Performance test of sequential flow and parallel flow * Realize 100 million to 100 million integer digital accumulation * */ public static void main(String[] args) { CostUtil.cost(() -> log.info("==> for: 1 + ... + 100_000_000, result: {}", forSum(100_000_000))); log.info("================================================================================"); CostUtil.cost(() -> log.info("==> sequential: 1 + ... + 100_000_000, result: {}", sequentialSum(100_000_000))); log.info("================================================================================"); CostUtil.cost(() -> log.info("==> parallel: 1 + ... + 100_000_000, result: {}", parallelSum(100_000_000))); log.info("================================================================================"); CostUtil.cost(() -> log.info("==> longParallel: 1 + ... + 100_000_000, result: {}", longParallelSum(100_000_000))); } /** * Accumulation is realized by internal iteration */ private static long forSum(long n) { long result = 0; for (int i = 1; i <= n; i ++) { result += i; } return result; } /** * Sequential flow accumulation */ private static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum); } /** * Parallel stream accumulation */ private static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum); } /** * long Accumulation of native stream range */ private static long longParallelSum(long n) { return LongStream.rangeClosed(1L, n).parallel().reduce(0L, Long::sum); } } // result: 2022-01-18 10:53:59.035 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> for: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 58 2022-01-18 10:53:59.039 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sequential: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 1420 2022-01-18 10:54:00.459 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.627 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> parallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 4167 2022-01-18 10:54:04.628 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-================================================================================ 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> longParallel: 1 + ... + 100_000_000, result: 5000000050000000 2022-01-18 10:54:04.688 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 60
Four methods are used to realize the accumulation of 100 ~ 100 million numbers, which is the execution result of i7 2.4GHz 6core/12threads CPU. Surprisingly, the performance of parallel flow is not the best, but the worst. The most simple for loop has the best single thread performance.
reason:
- Iterate generates boxed objects, which must be unpacked into numbers to sum. This is easy to understand because the stream element generated by iterate is of Long type. For cumulative calculation, the next stream element needs to be unpacked first, and then boxed after calculation.
- Iterate is difficult to be divided into multiple independent blocks for parallel execution. The reason is that the application of this function depends on the results of the previous application, that is, essentially, iterate needs to be executed sequentially. Although the marked stream is a parallel stream, it does not mean that it can be executed in parallel. On the contrary, it increases additional overhead and affects the performance.
Through the above comparison, we need to realize that parallel programming is complex and sometimes even counterintuitive. If it is not used correctly (for example, in this example, an operation iterate that is not easy to parallelize is adopted), the performance will even be worse. Therefore, it is necessary to understand the execution details behind the parallel method.
LongStream.rangeClosed instead of iterate
For example of efficient summation only, longstream.com can be used Rangeclosed efficiently replaces iterate to realize parallel computing. Its advantages are:
- LongStream.rangeClosed directly generates long numbers of the original type without the cost of packing and unpacking
- LongStream.rangeClosed generates a range of numbers that can be easily split into separate pieces.
An example shows that its parallel execution performance is 70 times faster than the iterate version of the same parallel stream. It can be seen that it makes effective use of parallelism.
Why is parallel flow still slower than for?
As can be seen from the above execution results, longstream The performance of rangeclosed is slightly slower than that of for because:
Parallelization comes at a cost. In the parallel process, the streams need to be recursively divided, the inductive operations of the streams are allocated to different threads, and finally merged. And the cost of moving data between multiple cores is also great.
Use parallel streams correctly
Using parallel stream to accelerate performance needs to ensure that it is used correctly. If the calculation result is wrong, no matter how fast it is, it doesn't make sense.
The primary reason for errors caused by misuse of parallel streams is that the algorithm used changes some shared states. As an example:
class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } //result: 2022-01-18 11:40:16.943 [main] INFO win.elegentjs.java8.stream.parallel.SumSample-==> sideEffectSum: 1 + ... + 100_000_000, result: 1037016191509285 2022-01-18 11:40:16.944 [main] INFO win.elegentjs.util.CostUtil-==> cost time: 40
As can be seen from the above example, although it is fast, the result is wrong. The reason is that total += value is a non atomic operation, and there is a race condition. If you use synchronization to repair, you lose the meaning of parallelism. Therefore, when writing parallel streams, we must consider whether multiple threads will modify the variable state of shared objects.
Efficient use of parallel streams
Some suggestions for efficient use of parallel streams:
- If in doubt, test. Parallel flows are not always faster and sometimes inconsistent with intuition. Test with appropriate benchmarks to check its performance.
- Pay attention to the automatic disassembly box. Frequent automatic disassembly and assembly of the box is very loss of performance. In this case, try to use the original data stream: intstream, longstream and doublestream.
- Some operations that are inherently parallel flow have worse performance than sequential flow, such as operations that depend on element order: limit(), findFirst(), etc.
- The total computational cost of flow operation pipeline needs to be considered. Let n be the total number of elements, q is the approximate processing cost of an element through the pipeline, then N * Q is a rough estimate of the total cost. A higher Q value means that there is a greater possibility of better performance when using parallel streams. (use the for loop to calculate that 1... N is larger than the parallel stream block because q is too small, although n is large enough)
- For a small amount of data, the choice of parallel flow is almost never optimal. Because the overhead of parallelism itself is large, if there are not many elements, the overhead of parallelism itself cannot be covered.
- You need to consider whether the underlying data structure is easy to decompose. For example, the original flow created by the range factory method can be quickly decomposed. Later, you can customize the splitter to fully control the decomposition process.
- The cost of merging step in terminal operation should also be considered. If this step costs a lot, the cost of combining some results generated by each subflow will exceed the performance improvement obtained through parallelism.
Decomposability summary of some common data sources:
Fork/Join framework
To use parallel flow correctly, it is very important to understand the implementation principle behind it. Behind the parallel flow is the Fork/Join framework.
//TODO: to be added
Spliterator
//TODO: to be added
Summary
- Internal iteration allows you to process a stream in parallel without explicitly using and coordinating different threads in your code.
- Although it is easy to process a stream in parallel, there is no guarantee that the program will run faster in all cases. Parallel software
Behavior and performance are sometimes counterintuitive, so be sure to measure to make sure you don't slow down the program. - Performing operations on a data set in parallel like a parallel stream can improve performance, especially with a large number of elements to be processed,
Or when processing a single element is particularly time-consuming. - From a performance point of view, it is almost impossible to use the correct data structure, such as using the original stream instead of the generalized stream as much as possible
It is always more important than trying to parallelize certain operations. - The branch / merge framework allows you to recursively split parallel tasks into smaller tasks on different threads
The results of each subtask are combined to generate the overall results. - The splitter defines how a parallel stream splits the data it traverses.