background
Translate pyspark code that implements a business logic into sparksql to supplement the historical data for the past six months (run by day) based on sparksql;
Core Point
1) Translate pyspark to sparksql;
2) Based on sparksql, supplement the historical data of the past half year (run by day);
Realization
1) First, pyspark is translated into spark sql, most of which are translated directly; Based on what was previously common, a cache table can be buffered without multiple subsequent calculations.
2) Secondly, after translating sparksql, you need to supplement the data for the past six months; There are usually several ways: Mode 1, a day-to-day round robin run; Mode 2, run side by side;
What I'm trying to do is run in parallel to sort out business logic tables: an input table; An output table; n intermediate tables;
Parallel Running Code Framework:
from multiprocessing import Process from pyspark.sql import SparkSession sc = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .enableHiveSupport() \ .getOrCreate() date_range_list = ['20210215', '20201130', '20201214'] def run_sql_process(date): run_sql = " xxx ... " run_sql = run_sql.replace('${UOW_TO_DT}', date) run_sql_list = run_sql.split(';') for run_sql in run_sql_list: if run_sql: sc.sql(run_sql) def mutil_process(): for date in date_range_list: # Create process run_sql_process(date) if __name__ == '__main__': mutil_process() print("Program run successfully!!!!")
Parallel Run Strategy 1:
Through python's multi-process call, each spark session runs spark sql for multiple days at the same time, and finds that it cannot run (from parallel number 5 to parallel number 2) and reports that OOM resources are insufficient;
Guess: because a spark session itself requests a resource that is already determined, if multiple spark SQLs are running at the same time, the resource takes up a large amount, so memory overflows;
In the above code, the same session Down, run multiple at once run_sql(run_sql Is pure image insert overwrite table...)
Parallel Run Strategy 2:
Suddenly woke up, my parallel is a little unreasonable, the total resources have not changed, but a few more spark sql tasks run in parallel at once, obviously did not achieve the effect of parallel speed increase, only may cause everyone to run out, because the total resources are limited; Therefore, the correct way is to submit spark_in parallel Submit mode, running tasks with parallel resources;
In the above code, the same session Down, run multiple at once run_sql(image insert overwrite table...) Parallel Task 1: /apache/spark2.3/bin/spark-submit --master yarn --deploy-mode client --queue xxx --conf spark.pyspark.driver.python=/usr/share/anaconda2/bin/python --conf spark.pyspark.python=/usr/share/anaconda2/bin/python --executor-cores 4 --executor-memory 20g --driver-memory 10g /apache/releases/spark-2.3.1.1.1.6-bin-ebay/python/pyspark/tests.py Parallel Task 2: /apache/spark2.3/bin/spark-submit --master yarn --deploy-mode client --queue xxx --conf spark.pyspark.driver.python=/usr/share/anaconda2/bin/python --conf spark.pyspark.python=/usr/share/anaconda2/bin/python --executor-cores 4 --executor-memory 20g --driver-memory 10g /apache/releases/spark-2.3.1.1.1.6-bin-ebay/python/pyspark/tests2.py
However, this approach is theoretically feasible, but there is also a problem: when sparksql is inserted into different partitions of the same table in parallel, a lock error is reported, so the intermediate tables are all set to temporary tables with dates, so that each run will not conflict; The final approach is to run linearly one day a day on the company's visual submission platform because it only fills in historical data once a day. The next step is to run regularly every day.