Spark source code series: spark job submission process

Spark supports the use of spark shell, spark SQL and spark submit, but the final calling code is submitted through spark submit. The example of spark submit introduced in the previous article:

# spark local mode submit job
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local
  /path/to/examples.jar \
  1000

Let's analyze the whole process of spark submit.

1. Spark submit script analysis

Spark submit script uses spark class to call SparkSubmit as the main class of the program:

#spark-submit.sh

#!/usr/bin/env bash

#Judge whether to configure the ${SPARK_HOME} variable. If not, execute the find spark home script to set it
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

# Call the SparkSubmit class with all parameters
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

Spark submit the spark before submitting the job_ Determine the home variable. If it is not set, the find spark home script will be called to configure it according to the current spark package directory.

# find-spark-home.sh

#!/usr/bin/env bash

# Attempts to find a proper value for SPARK_HOME. Should be included using "source" directive.

FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"

# Short circuit if the user already has this set.
if [ ! -z "${SPARK_HOME}" ]; then
   exit 0
elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
  # If we are not in the same directory as find_spark_home.py we are not pip installed so we don't
  # need to search the different Python directories for a Spark installation.
  # Note only that, if the user has pip installed PySpark but is directly calling pyspark-shell or
  # spark-submit in another directory we want to use that version of PySpark rather than the
  # pip installed version of PySpark.
  export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
else
  # We are pip installed, use the Python script to resolve a reasonable SPARK_HOME
  # Default to standard python interpreter unless told otherwise
  if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
     PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"
  fi
  export SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT")
fi

Then, calling spark-class to construct the job submission command is eventually submitted in java mode.

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi
# Use load spark env SH script loads environment variables and sets logs, hadoop configuration, JVM, classpath, etc
. "${SPARK_HOME}"/bin/load-spark-env.sh

# Get the java path. If java is not configured_ Home, it is obtained through the java command, otherwise an error is reported
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Set SPARK_JARS_DIR, if Park_ If the jars directory does not exist in home, set it to the assembly directory
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
#If SPARK_JARS_DIR Directory does not exist and is not set SPARK_TESTING or SPARK_SQL_TESTING Error reporting. If it exists, then#Setup starts LAUNCH_CLASSPATH
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
# Construct java startup command and load launch_ Add the jar package under classpath, and use org apache. spark. launcher. Main is the main class and carries the parameters in spark submit
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode because it does not allow process substitution 
set +o posix

CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Some JVM failures can cause errors to be printed to stdout (instead of stderr), which can cause confusion in the code that parses the initiator output. In these cases, check whether the exit code is an integer, and if not, treat it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
# The final constructed submission command is: env LD_LIBRARY_PATH=/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64 /usr/local/jdk/bin/java -cp /usr/hdp/current/spark2-client/conf/:/usr/hdp/current/spark2-client/jars/*:/usr/hdp/current/spark2-client/jars/../hive/*:/usr/hdp/3.0.1.0-187/hadoop/conf/:/usr/hdp/current/spark2-thriftserver/jars/ranger-spark-plugin-impl/*:/usr/hdp/current/spark2-thriftserver/jars/servlet-api/*:/usr/hdp/3.0.1.0-187/spark2/jars/dragon_jars/* -Xmx1g -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5055 org.apache.spark.deploy.SparkSubmit --master local --conf spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5055 --class org.apache.spark.examples.SparkPi spark-examples_2.11-2.4.0-cdh6.2.0.jar
exec "${CMD[@]}"

2. SparkSubmit calling process

After the job is submitted, org.org is finally called apache. Spark. deploy. SparkSubmit is the main class. Through debugging, you can also see that the Spark program entry is the main method of SparkSubmit:

After entering the main method, the call constructs the SparkSubmit object, calls the doSubmit method, and internally calls loadEnvironmentArguments to construct the parameter in the environment variable to the SparkSubmitArguments object, carries master, driverMemory, executorMemory and other information:

def doSubmit(args: Array[String]): Unit = {
    // Initialize logging before submitting and track whether logging needs to be reset before the application starts. Log4j defaults is used by default Properties file log configuration.
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    // The parsing parameters are constructed as SparkSubmitArguments object, and loadEnvironmentArguments is called internally to parse the master, driverMemory, executorMemory and other running parameters
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    // Perform specific operations according to different operation types. Spark submit supports several parameter types -- kill, -- status, -- status, among which -- kill, -- status are used in standalone mode to kill spark jobs and query status
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

// Load environment variable parameters, which can be carried through the spark summit command, or in spark default Conf configuration file, and the corresponding parameter name is shown in the following code
private def loadEnvironmentArguments(): Unit = {
    master = Option(master)
      .orElse(sparkProperties.get("spark.master"))
      .orElse(env.get("MASTER"))
      .orNull
    driverExtraClassPath = Option(driverExtraClassPath)
      .orElse(sparkProperties.get("spark.driver.extraClassPath"))
      .orNull
    driverExtraJavaOptions = Option(driverExtraJavaOptions)
      .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
      .orNull
    driverExtraLibraryPath = Option(driverExtraLibraryPath)
      .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
      .orNull
    driverMemory = Option(driverMemory)
      .orElse(sparkProperties.get("spark.driver.memory"))
      .orElse(env.get("SPARK_DRIVER_MEMORY"))
      .orNull
    driverCores = Option(driverCores)
      .orElse(sparkProperties.get("spark.driver.cores"))
      .orNull
    executorMemory = Option(executorMemory)
      .orElse(sparkProperties.get("spark.executor.memory"))
      .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
      .orNull
    executorCores = Option(executorCores)
      .orElse(sparkProperties.get("spark.executor.cores"))
      .orElse(env.get("SPARK_EXECUTOR_CORES"))
      .orNull
    totalExecutorCores = Option(totalExecutorCores)
      .orElse(sparkProperties.get("spark.cores.max"))
      .orNull
    name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
    jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
    files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
    pyFiles = Option(pyFiles).orElse(sparkProperties.get("spark.submit.pyFiles")).orNull
    ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
    ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
    packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
    packagesExclusions = Option(packagesExclusions)
      .orElse(sparkProperties.get("spark.jars.excludes")).orNull
    repositories = Option(repositories)
      .orElse(sparkProperties.get("spark.jars.repositories")).orNull
    deployMode = Option(deployMode)
      .orElse(sparkProperties.get("spark.submit.deployMode"))
      .orElse(env.get("DEPLOY_MODE"))
      .orNull
    numExecutors = Option(numExecutors)
      .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
    queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
    keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
    principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
    dynamicAllocationEnabled =
      sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase)

    // Try to set main class from JAR if no --class argument is given
    if (mainClass == null && !isPython && !isR && primaryResource != null) {
      val uri = new URI(primaryResource)
      val uriScheme = uri.getScheme()

      uriScheme match {
        case "file" =>
          try {
            Utils.tryWithResource(new JarFile(uri.getPath)) { jar =>
              // Note that this might still return null if no main-class is set; we catch that later
              mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
            }
          } catch {
            case _: Exception =>
              error(s"Cannot load main class from JAR $primaryResource")
          }
        case _ =>
          error(
            s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
            "Please specify a class through --class.")
      }
    }

    // Global defaults. These should be keep to minimum to avoid confusing behavior.
    master = Option(master).getOrElse("local[*]")

    // In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
    if (master.startsWith("yarn")) {
      name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
    }

    // Set name from main class if not given
    name = Option(name).orElse(Option(mainClass)).orNull
    if (name == null && primaryResource != null) {
      name = new File(primaryResource).getName()
    }

    // Action should be SUBMIT unless otherwise specified
    action = Option(action).getOrElse(SUBMIT)

The submit method parses the environment variables, gets the class name, parameters and classpath information of the concrete execution class, then calls the runMain method to adjust the specific execution class through the reflection mechanism.

 private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
     // The first step is to prepare the startup environment by setting the appropriate classpath, system properties, and application parameters to run the sub main classes based on the cluster manager and deployment mode.
     val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
      // Use this startup environment to call the main method of the child main class
    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        // When submitting a job, if -- proxy user is used to set the proxy user, the proxy user will be used to run the job
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
      }
    }
 private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sparkConf: SparkConf,
      childMainClass: String,
      verbose: Boolean): Unit = {
    // Print out detailed class name, parameters, classpath, default configuration information, etc. in verbose mode
    if (verbose) {
      logInfo(s"Main class:\n$childMainClass")
      logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
      // sysProps may contain sensitive information, so redact before printing
      logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
      logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      logInfo("\n")
    }
    // Judgment parameter spark driver. Whether userclasspathfirst is set. The default value is false. When set to true, a variable class loader ChildFirstURLClassLoader is constructed. When loading classes and resources, it takes precedence over the parent class loader to provide its own URL.
    val loader =
      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)
    //Load the jar package under childClasspath
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        logWarning(s"Failed to load $childMainClass.", e)
        if (childMainClass.contains("thriftserver")) {
          logInfo(s"Failed to load main class $childMainClass.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        }
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          logInfo(s"Failed to load hive class.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        }
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
    }
    // Judge whether the program main class is a subclass of SparkApplication. If not, it will be constructed as JavaMainApplication class. JavaMainApplication is also a subclass of SparkApplication, which is equivalent to making a layer of encapsulation and reflecting the mainclass through the start method of SparkApplication
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      // SPARK-4170
      if (classOf[scala.App].isAssignableFrom(mainClass)) {
        logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
      }
      new JavaMainApplication(mainClass)
    }

    @tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }

    try {
      //Call the SparkApplicationstart method to make reflection calls. The following two lines reflect code
      app.start(childArgs.toArray, sparkConf)
      //val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
      //mainMethod.invoke(null, args)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }
  }

So far, the steps of submitting Spark Application program through spark submit are completed, and then the next step is to enter the specific application code. In this example, to realize the calculation of SparkPi, first construct SparkSession (build SparkContext internally), and then implement the specific algorithm through the transform and action operators provided by spark and submit it for execution.

object SparkPi {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y <= 1) 1 else 0
    }.reduce(_ + _)
    println(s"Pi is roughly ${4.0 * count / (n - 1)}")
    spark.stop()
  }
}

3. SparkSession structure

When developing Spark programs in earlier versions of Spark, sparkcontext is used as the entry. RDD is an important API in Spark, but its creation and operation must use the API provided by sparkcontext; For things other than RDD, we need to use other Context. For example, for stream processing, we have to use StreamingContext; SqlContext is used for SQL; For hive, use HiveContext. However, the APIs provided by DataSet and Dataframe are gradually called new standard APIs. We need a pointcut to build them. Therefore, a new entry point: SparkSession is introduced in Spark 2.0.

SparkSession is essentially a combination of SQLContext, HiveContext and StreamingContext, so the API s available on SQLContext and HiveContext can also be used on SparkSession. SparkSession encapsulates sparkContext internally, so the calculation is actually completed by sparkContext.

The design of SparkSession follows the factory design pattern. In the Spark program, the SparkSession.builder object is first used for a series of configurations, including appName (setting program name), master (running mode: local, yarn, standalone, etc.), enablehivesupport (enabling hive metadata, i.e. using HiveContext) Withextensions (inject custom extensions into [[SparkSession]]. This allows users to add analyzer rules, optimizer rules, planning policies or custom parsers). It also provides config method, setting custom parameters, etc.

The getOrCreate method initializes specific sparkcentertext (described in the next issue) and SparkSession objects:

def getOrCreate(): SparkSession = synchronized {
      assertOnDriver()
      // Get the Session object from the current Session thread. If it is still active, you only need to reload the next configuration
      var session = activeThreadSession.get()
      if ((session ne null) && !session.sparkContext.isStopped) {
        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
        if (options.nonEmpty) {
          logWarning("Using an existing SparkSession; some configuration may not take effect.")
        }
        return session
      }

      // Locking ensures that the defaultsession is set only once
      SparkSession.synchronized {
        // If the current thread has no active session, get from the global session
        session = defaultSession.get()
        if ((session ne null) && !session.sparkContext.isStopped) {
          options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
          if (options.nonEmpty) {
            logWarning("Using an existing SparkSession; some configuration may not take effect.")
          }
          return session
        }

        // There are no active or global default sessions. Create a new This is created through sparkContext
        val sparkContext = userSuppliedContext.getOrElse {
          val sparkConf = new SparkConf()
          options.foreach { case (k, v) => sparkConf.set(k, v) }

          // set a random app name if not given.
          if (!sparkConf.contains("spark.app.name")) {
            sparkConf.setAppName(java.util.UUID.randomUUID().toString)
          }
          //When you create a sparkcontext object, you will eventually call new Context(SparkConf) for construction
          SparkContext.getOrCreate(sparkConf)
          // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
        }

        applyExtensions(
          sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
          extensions)

        session = new SparkSession(sparkContext, None, None, extensions)
        options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
        setDefaultSession(session)
        setActiveSession(session)

        //Register the successfully instantiated context with singleton. This should be at the end of the class definition so that the singleton is updated only if there are no exceptions in the instance's construction.
        sparkContext.addSparkListener(new SparkListener {
          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
            defaultSession.set(null)
          }
        })
      }

      return session
    }
  }

 

Keywords: Spark

Added by wmac on Sun, 19 Dec 2021 13:12:36 +0200