Will Python script be invoked in Spark Scala/Java application?

Abstract: This article will introduce how to call Python script in Spark scala program, and the procedure of calling Spark java program is basically the same.

This article is shared from Huawei cloud community< [Spark] how to invoke Python script in Spark Scala/Java application >, author: little rabbit 615.

1.PythonRunner

For programs running on the JVM (i.e. Scala and Java programs), Spark provides the python runner class. Just calling the main method of PythonRunner, you can call Python script in Scala or Java program. In terms of implementation, python runner is based on py4j and allows Python programs to communicate with the JVM through local network socket s by constructing a gateway server instance.

// Launch a Py4J gateway server for the process to connect to; this will let it see our
    // Java system properties and such
    val localhost = InetAddress.getLoopbackAddress()
    val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
      .authToken(secret)
      .javaPort(0)
      .javaAddress(localhost)
      .callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
      .build()
    val thread = new Thread(new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions {
        gatewayServer.start()
      }
    })
    thread.setName("py4j-gateway-init")
    thread.setDaemon(true)
    thread.start()
 
    // Wait until the gateway server has started, so that we know which port is it bound to.
    // `gatewayServer.start()` will start a new thread and run the server code there, after
    // initializing the socket, so the thread started above will end as soon as the server is
    // ready to serve connections.
    thread.join()

After starting the gateway server, construct a sub process through ProcessBuilder to execute the Python script. After the Python script is executed, judge whether the execution is successful according to exitCode. If the execution fails, throw an exception, and finally close the gateway server.

  // Launch Python process
    val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
    try {
      val process = builder.start()
 
      new RedirectThread(process.getInputStream, System.out, "redirect output").start()
 
      val exitCode = process.waitFor()
      if (exitCode != 0) {
        throw new SparkUserAppException(exitCode)
      }
    } finally {
      gatewayServer.shutdown()
    }

2. Call method

2.1 calling code

Three parameters need to be passed in the main method of Python runner:

  • python file: python script executed
  • pyFiles: additional python scripts that need to be added to PYTHONPATH
  • otherArgs: parameter array passed into python script
   val pythonFile = args(0)
    val pyFiles = args(1)
    val otherArgs = args.slice(2, args.length)

The specific sample code is as follows: scala sample code:

package com.huawei.bigdata.spark.examples
 
import org.apache.spark.deploy.PythonRunner
import org.apache.spark.sql.SparkSession
 
object RunPythonExample {
  def main(args: Array[String]) {
    val pyFilePath = args(0)
    val pyFiles = args(1)
    val spark = SparkSession
      .builder()
      .appName("RunPythonExample")
      .getOrCreate()
 
    runPython(pyFilePath, pyFiles)
 
    spark.stop()
  }
 
  def runPython(pyFilePath: String, pyFiles :String) : Unit = {
    val inputPath = "-i /input"
    val outputPath = "-o /output"
    PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
  }
}

python sample code:

#!/usr/bin/env python
# coding: utf-8
import sys
import argparse
 
argparser = argparse.ArgumentParser(description="ParserMainEntrance")
argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
arglist = argparser.parse_args()
 
def getTargetPath(input_path, output_path):
    try:
        print("input path: {}".format(input_path))
        print("output path: {}".format(output_path))
        return True
    except Exception as ex:
        print("error with: {}".format(ex))
        return False
 
if __name__ == "__main__":
    ret = getTargetPath(arglist.input, arglist.output)
    if ret:
        sys.exit(0)
    else:
        sys.exit(1)

2.2 operation command

To execute a python script, you need to set pythonExec, that is, the execution environment used to execute the python script. By default, the actuators used are python (Spark 2.4 and below) or Python 3 (Spark 3.0 and above).

  //Spark 2.4.5
    val sparkConf = new SparkConf()
    val secret = Utils.createSecret(sparkConf)
    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
      .orElse(sparkConf.get(PYSPARK_PYTHON))
      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
      .orElse(sys.env.get("PYSPARK_PYTHON"))
      .getOrElse("python")
 
    //Spark 3.1.1
    val sparkConf = new SparkConf()
    val secret = Utils.createSecret(sparkConf)
    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
      .orElse(sparkConf.get(PYSPARK_PYTHON))
      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
      .orElse(sys.env.get("PYSPARK_PYTHON"))
      .getOrElse("python3")

If you want to specify pythonExec manually, you need to set the environment variable before execution (it cannot be passed in through spark defaults). In cluster mode, it can be set through -- conf "spark.executorEnv.PYSPARK_PYTHON=python3" -- conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3". The driver side can also export pyspark_ Python = Python 3 sets the environment variable.
If you need to upload the pyhton package, you can use -- archive python tar. GZ upload.

In order to enable the application to obtain the py script file, you also need to add -- File Python file py uploads the python script to yarn.

The operation command reference is as follows:

spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py
If you need to use another Python environment instead of the one installed on the node, you can upload the python compressed package through -- archives, and then specify pythonExec through the environment variable, for example:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
 

Click focus to learn about Huawei cloud's new technologies for the first time~

Keywords: Python Java Spark

Added by MrRosary on Thu, 13 Jan 2022 09:18:20 +0200