Optimized points (window functions) of sparksql over hivesql

Sometimes, a select statement contains multiple window functions whose window definitions (OVER clauses) may be the same or different.

For the same windows, there is no need to partition and sort them again. We can merge them into a Window operator.

such as The realization principle of window function in spark and hive Case in:

 select     id,    sq,    cell_type,    rank,    row_number() over(partition by id  order by rank ) naturl_rank,    rank() over(partition by id order by rank) as r,    dense_rank() over(partition by  cell_type order by id) as dr   from window_test_table  group by id,sq,cell_type,rank;

The window of row_number() rank() can be completed in a partition and sorting. The performance of hive sql is consistent with that of spark sql.

But in another case:

 

 select    id,    rank,    row_number() over(partition by id  order by rank ) naturl_rank,    sum(rank) over(partition by id) as snum from window_test_table

Although the two windows are not exactly the same, sum(rank) does not care about the order within the partition, and can reuse the window of row Hou number().

As can be seen from the execution plan below, spark sql sum(rank) and row ou number() reuse the same window, while hive sql does not.

The execution plan of spark sql:

spark-sql>  explain select  id,rank,row_number() over(partition by id  order by rank ) naturl_rank,sum(rank) over(partition by id) as snum from window_test_table;      == Physical Plan ==*(3) Project [id#13, rank#16, naturl_rank#8, snum#9L]+- Window [row_number() windowspecdefinition(id#13, rank#16 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS naturl_rank#8], [id#13], [rank#16 ASC NULLS FIRST]   +- *(2) Sort [id#13 ASC NULLS FIRST, rank#16 ASC NULLS FIRST], false, 0      +- Window [sum(cast(rank#16 as bigint)) windowspecdefinition(id#13, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS snum#9L], [id#13]         +- *(1) Sort [id#13 ASC NULLS FIRST], false, 0            +- Exchange hashpartitioning(id#13, 200)               +- Scan hive tmp.window_test_table [id#13, rank#16], HiveTableRelation `tmp`.`window_test_table`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#13, sq#14, cell_type#15, rank#16]Time taken: 0.278 seconds, Fetched 1 row(s)

 

hive sql execution plan:

hive> explain select  id,rank,row_number() over(partition by id  order by rank ) naturl_rank,sum(rank) over(partition by id) as snum from window_test_table;OKSTAGE DEPENDENCIES:  Stage-1 is a root stage  Stage-2 depends on stages: Stage-1  Stage-0 depends on stages: Stage-2STAGE PLANS:  Stage: Stage-1    Map Reduce      Map Operator Tree:          TableScan            alias: window_test_table            Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE            Reduce Output Operator              key expressions: id (type: int), rank (type: int)              sort order: ++              Map-reduce partition columns: id (type: int)              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE      Reduce Operator Tree:        Select Operator          expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: int)          outputColumnNames: _col0, _col3          Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE          PTF Operator            Function definitions:                Input definition                  input alias: ptf_0                  output shape: _col0: int, _col3: int                  type: WINDOWING                Windowing table definition                  input alias: ptf_1                  name: windowingtablefunction                  order by: _col3 ASC NULLS FIRST                  partition by: _col0                  raw input shape:                  window functions:                      window function definition                        alias: row_number_window_0                        name: row_number                        window function: GenericUDAFRowNumberEvaluator                        window frame: PRECEDING(MAX)~FOLLOWING(MAX)                        isPivotResult: true            Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE            Select Operator              expressions: _col0 (type: int), _col3 (type: int), row_number_window_0 (type: int)              outputColumnNames: _col0, _col3, row_number_window_0              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE              File Output Operator                compressed: false                table:                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat                    serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe  Stage: Stage-2    Map Reduce      Map Operator Tree:          TableScan            Reduce Output Operator              key expressions: _col0 (type: int)              sort order: +              Map-reduce partition columns: _col0 (type: int)              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE              value expressions: row_number_window_0 (type: int), _col3 (type: int)      Reduce Operator Tree:        Select Operator          expressions: VALUE._col0 (type: int), KEY.reducesinkkey0 (type: int), VALUE._col3 (type: int)          outputColumnNames: _col0, _col1, _col4          Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE          PTF Operator            Function definitions:                Input definition                  input alias: ptf_0                  output shape: _col0: int, _col1: int, _col4: int                  type: WINDOWING                Windowing table definition                  input alias: ptf_1                  name: windowingtablefunction                  order by: _col1 ASC NULLS FIRST                  partition by: _col1                  raw input shape:                  window functions:                      window function definition                        alias: sum_window_1                        arguments: _col4                        name: sum                        window function: GenericUDAFSumLong                        window frame: PRECEDING(MAX)~FOLLOWING(MAX)            Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE            Select Operator              expressions: _col1 (type: int), _col4 (type: int), _col0 (type: int), sum_window_1 (type: bigint)              outputColumnNames: _col0, _col1, _col2, _col3              Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE              File Output Operator                compressed: false                Statistics: Num rows: 13 Data size: 104 Basic stats: COMPLETE Column stats: NONE                table:                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe  Stage: Stage-0    Fetch Operator      limit: -1      Processor Tree:        ListSinkTime taken: 0.244 seconds, Fetched: 106 row(s)

 

Keywords: Big Data hive SQL Apache Hadoop

Added by serverman on Tue, 07 Apr 2020 17:52:21 +0300