Executor Startup and Task Submission for spark Source Code Analysis

Task submission process

Summary

The start-up process of Spark's Master and Worker are expounded. The next step is the Executor process on Worker. This article continues to analyze the whole process of Executor startup and task submission.

Spark-submit

It is Spark-submit that submits a task to the cluster
Start its main class by starting the script, for example, WordCount
spark-submit --class cn.apache.spark.WordCount

  1. Bin/spark-clas-> org.apache.spark.deploy.SparkSubmit calls the main method of this class

  2. The main method of a custom spark application comes in from the doRunMain method
    class cn.apache.spark.WordCount

  3. Reference mainClass = Utils.classForName(childMainClass) to get an instance of the class by reflection

  4. Calling the main method of class cn.apache.spark.WordCount through reflection

Let's look at SparkSubmit's main method

  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      printStream.println(appArgs)
    }
    //Matching Task Type
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

The type here is submit, calling the submit method

  private[spark] def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)


    def doRunMain(): Unit = {
     . . . . . . 
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              //childMainClass, the full name of the main of your own defined App
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
            . . . . . .       
        }
      }     
        . . . . . . . 
        //Use the doRunMain above
        doRunMain()
    }

In submit, doRunMain() is called, and then runMain is called to see runMain

  private def runMain(
    . . . . . . 

    try {
      //By reflection
      mainClass = Class.forName(childMainClass, true, loader)
    } catch {
        . . . . . . 
    }

    //Examples of Reflective Surface-to-Surface Method
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    . . . . . . 

    try {
      //Calling App's main method
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }
  }

The main process is here. The code comment above is very clear. The main method of the class we wrote is called by reflection, so the general process is here.

SparkSubmit Sequence Diagram

Executor startup process

SparkSubmit calls the main method of our program through reflection, and then starts executing our code.
A SparkContext object needs to be created in a Spark program, and we start with that object

SparkContext's construction method code is very long, the main concerns are as follows

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
  . . . . . . 

 private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    //Create createDriverEnv through SparkEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  }
  //createSparkEnv is called here, and a SparkEnv object is returned. There are many important attributes in this object, the most important of which is ActorSystem.
  private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
  SparkEnv.set(env)


  //Create taskScheduler
  // Create and start the scheduler
  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

  //Create DAGScheduler
  dagScheduler = new DAGScheduler(this)

  //Start Taks Scheduler
  taskScheduler.start()
    . . . . . 
}

Spark's construction method does three main things, creating a SparkEnv, taskScheduler, DAG Scheduler. Let's first look at what is done in createTask Scheduler.

 //Create Task Scheduler from a given URL
  private def createTaskScheduler(
     .....

    //Select different ways to match URL s
    master match {
        . . . . . . 

      //This is Spark's Stanalone model.
      case SPARK_REGEX(sparkUrl) =>
        //First create Task Scheduler
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)

        //Very important
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        //Initializes a scheduler, which defaults to FIFO
        scheduler.initialize(backend)
        (backend, scheduler)

        . . . . . 
    }       
}

Match to Standalone mode through master's url: Then initialize ** SparkDeploy Scheduler Backend ** and ** Task Scheduler Impl **, which are important objects and are the core of starting task scheduling, and then call scheduler.initialize(backend) for initialization.

Start the TaksScheduler initialization and go back to our SparkContext constructor to continue calling
taskScheduler.start() Starts TaksScheduler
Let's look at the start method

override def start() {
    //start method calling the implementation of backend
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
  }

Here the backend is ** SparkDeploy Scheduler Backend ** calling its start

override def start() {
    //The start method of CoarseGrained Scheduler Backend, in which a DriverActor is created
    super.start()

    // The endpoint for executors to talk to us
    //The following is to prepare for starting the java subprocess and prepare the parameters
    val driverUrl = AkkaUtils.address(
      AkkaUtils.protocol(actorSystem),
      SparkEnv.driverActorSystemName,
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts

    //With command splicing parameters, the org.apache.spark.executor.CoarseGrainedExecutorBackend subprocess will eventually be started
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")

    //Some important parameters are encapsulated with Application Description
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec)

    //Create ClientActor here
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    //Start ClientActor
    client.start()

    waitForRegistration()
  }

Here are some parameters to start Executor. The class name + parameters are encapsulated as Application Description. Finally, pass it to and create AppClient and call its start method

AppClient Creates Sequence Diagrams

start Method of AppClient

Next, focus on the start method

  def start() {
    // Just launch an actor; it will call back into the listener.

    actor = actorSystem.actorOf(Props(new ClientActor))
  }

In the start method, a ClientActor that communicates with Master is created, and then its preStart method is called to register with Master. Next, look at its preStart.

  override def preStart() {
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      try {
        //ClientActor registers with Master
        registerWithMaster()
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          context.stop(self)
      }
    }

Finally, this method is called to register with all Master s.

    def tryRegisterAllMasters() {
      for (masterAkkaUrl <- masterAkkaUrls) {
        logInfo("Connecting to master " + masterAkkaUrl + "...")
        //t gets the Master reference through actorSelection
        val actor = context.actorSelection(masterAkkaUrl)
        //Send Asynchronous Registration App Message to Master
        actor ! RegisterApplication(appDescription)
      }
    }

Application Description, a message sent by ClientActor to register App, contains the required resources, the name of the Executor class to be started, and some parameters
Master's Receiver

 case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //Create App sender: ClientActor
        val app = createApplication(description, sender)
        //Register App
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //Persistent App
        persistenceEngine.addApplication(app)
        //Feedback ClientActor and tell him that app registration has been successful
        sender ! RegisteredApplication(app.id, masterUrl)
        //TODO Scheduling Tasks
        schedule()
      }
    }

registerApplication(app)

 def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.path.address
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }
    //Put App in a collection
    applicationMetricsSystem.registerSource(app.appSource)
    apps += app
    idToApp(app.id) = app
    actorToApp(app.driver) = app
    addressToApp(appAddress) = app
    waitingApps += app
  }

Master saves the received information to the collection and serializes it, sends a Registered Application message notification back to ClientActor, and then executes the schedule() method, which traverses the workers collection and executes launchExecutor.

  def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    //Record how many resources are used by the worker
    worker.addExecutor(exec)
    //Master sends a message to Worker to start Executor
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    //Master sends a message to ClientActor telling ClientActor executor that it has started
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

Here Master sends a message to Worker to start Executor
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
application.desc contains startup information for the Executor class

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
         . . . . . 

          appDirectories(appId) = appLocalDirs
          //It's important to create an Executor Runner, which saves Executor's execution configuration and parameters
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            akkaUrl,
            conf,
            appLocalDirs, ExecutorState.LOADING)
          executors(appId + "/" + execId) = manager
          //TODO starts to start Executor Runner
          manager.start()

            . . . . . . 
          }
        }
      }

Worker's Receiver receives the message to start Executor, and the appDesc object stores Command commands, Executor's implementation classes and parameters.

A thread is created in manager.start().

  def start() {
    //Start a thread
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      //Use a subthread to help Worker start Executor subprocesses
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = new Thread() {
      override def run() {
        killProcess(Some("Worker shutting down"))
      }
    }
    Runtime.getRuntime.addShutdownHook(shutdownHook)
  }

The fetchAndRunExecutor() method is called in the thread. Let's look at this method.

def fetchAndRunExecutor() {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
        sparkHome.getAbsolutePath, substituteVariables)
      //Build commands
      val command = builder.command()
      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

      builder.directory(executorDir)
      builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      //Promoter process
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        command.mkString("\"", "\" \"", "\""), "=" * 40)

      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      //Start execution, wait for the end signal
      val exitCode = process.waitFor()
      . . . . 
    }
  }

In this case, class names and parameters are assembled. The specific assembly process is not concerned. Finally, builder.start() will start a sub-process in the way of System Runtime. This is the class name of the process is CoarseGrained Executor Backend.
At this point the Executor process starts

Executor Creates Sequence Diagrams

Executor Task Scheduling Object Start

After the Executor process, the main method is executed first. The main code is as follows

  //Executor process startup entry
  def main(args: Array[String]) {
    . . . . 

    //Assembly parameters
    while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          printUsageAndExit()
      }
    }

    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }
    //Start Executor
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
  }

run method is executed

  private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL])   

        . . . . . 

      //Create CoarseGrained Executor Backend - > Actor through actor System
      //Coarse Grained Executor Backend - > DriverActor Communication
      env.actorSystem.actorOf(
        Props(classOf[CoarseGrainedExecutorBackend],
          driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
        name = "Executor")

            . . . . . . 
      }
      env.actorSystem.awaitTermination()
    }
  }

In the run method, an Actor object is created for CoarseGrainedExecutorBackend to prepare and communicate with DriverActor, and then the preStart lifecycle method is continued to be invoked.

  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    //Executor connects to DriverActor
    driver = context.actorSelection(driverUrl)
    //Executor sends messages to DriverActor
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

Executor sends registered messages to DriverActor
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)

After the receiver of DriverActor receives the message

def receiveWithLogging = {
      //Registration message sent by Executor to DriverActor
      case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorDataMap.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          //DriverActor sends a successful registration message to Executor
          sender ! RegisteredExecutor

          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val (host, _) = Utils.parseHostPort(hostPort)

         //Encapsulating Executor's information
          val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            //Add Executor's information object to the collection
            executorDataMap.put(executorId, data)
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))

          //Used to execute real business logic in the future
          makeOffers()
        }

In Driver Actor's Reciver, Executor information is encapsulated in Map and saved, and feedback message sender is sent!
To Coarse Grained Executor Backend

override def receiveWithLogging = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

When CoarseGrainedExecutor Backend receives a message, it creates an Executor object to prepare for the execution of the task. By this time, the creation of Executor is completed. The scheduling of the task is described in the next section.

Keywords: Spark Apache Java shell

Added by DanArosa on Fri, 05 Jul 2019 00:48:03 +0300