Task submission process
Summary
The start-up process of Spark's Master and Worker are expounded. The next step is the Executor process on Worker. This article continues to analyze the whole process of Executor startup and task submission.
Spark-submit
It is Spark-submit that submits a task to the cluster
Start its main class by starting the script, for example, WordCount
spark-submit --class cn.apache.spark.WordCount
Bin/spark-clas-> org.apache.spark.deploy.SparkSubmit calls the main method of this class
The main method of a custom spark application comes in from the doRunMain method
class cn.apache.spark.WordCountReference mainClass = Utils.classForName(childMainClass) to get an instance of the class by reflection
Calling the main method of class cn.apache.spark.WordCount through reflection
Let's look at SparkSubmit's main method
def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { printStream.println(appArgs) } //Matching Task Type appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
The type here is submit, calling the submit method
private[spark] def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { . . . . . . try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { //childMainClass, the full name of the main of your own defined App runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { . . . . . . } } . . . . . . . //Use the doRunMain above doRunMain() }
In submit, doRunMain() is called, and then runMain is called to see runMain
private def runMain( . . . . . . try { //By reflection mainClass = Class.forName(childMainClass, true, loader) } catch { . . . . . . } //Examples of Reflective Surface-to-Surface Method val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } . . . . . . try { //Calling App's main method mainMethod.invoke(null, childArgs.toArray) } catch { case t: Throwable => throw findCause(t) } }
The main process is here. The code comment above is very clear. The main method of the class we wrote is called by reflection, so the general process is here.
Executor startup process
SparkSubmit calls the main method of our program through reflection, and then starts executing our code.
A SparkContext object needs to be created in a Spark program, and we start with that object
SparkContext's construction method code is very long, the main concerns are as follows
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { . . . . . . private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { //Create createDriverEnv through SparkEnv SparkEnv.createDriverEnv(conf, isLocal, listenerBus) } //createSparkEnv is called here, and a SparkEnv object is returned. There are many important attributes in this object, the most important of which is ActorSystem. private[spark] val env = createSparkEnv(conf, isLocal, listenerBus) SparkEnv.set(env) //Create taskScheduler // Create and start the scheduler private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) //Create DAGScheduler dagScheduler = new DAGScheduler(this) //Start Taks Scheduler taskScheduler.start() . . . . . }
Spark's construction method does three main things, creating a SparkEnv, taskScheduler, DAG Scheduler. Let's first look at what is done in createTask Scheduler.
//Create Task Scheduler from a given URL private def createTaskScheduler( ..... //Select different ways to match URL s master match { . . . . . . //This is Spark's Stanalone model. case SPARK_REGEX(sparkUrl) => //First create Task Scheduler val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) //Very important val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) //Initializes a scheduler, which defaults to FIFO scheduler.initialize(backend) (backend, scheduler) . . . . . } }
Match to Standalone mode through master's url: Then initialize ** SparkDeploy Scheduler Backend ** and ** Task Scheduler Impl **, which are important objects and are the core of starting task scheduling, and then call scheduler.initialize(backend) for initialization.
Start the TaksScheduler initialization and go back to our SparkContext constructor to continue calling
taskScheduler.start() Starts TaksScheduler
Let's look at the start method
override def start() { //start method calling the implementation of backend backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { Utils.tryOrExit { checkSpeculatableTasks() } } } }
Here the backend is ** SparkDeploy Scheduler Backend ** calling its start
override def start() { //The start method of CoarseGrained Scheduler Backend, in which a DriverActor is created super.start() // The endpoint for executors to talk to us //The following is to prepare for starting the java subprocess and prepare the parameters val driverUrl = AkkaUtils.address( AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq( "--driver-url", driverUrl, "--executor-id", "{{EXECUTOR_ID}}", "--hostname", "{{HOSTNAME}}", "--cores", "{{CORES}}", "--app-id", "{{APP_ID}}", "--worker-url", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) // When testing, expose the parent class path to the child. This is processed by // compute-classpath.{cmd,sh} and makes all needed jars available to child processes // when the assembly is built with the "*-provided" profiles enabled. val testingClassPath = if (sys.props.contains("spark.testing")) { sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq } else { Nil } // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts //With command splicing parameters, the org.apache.spark.executor.CoarseGrainedExecutorBackend subprocess will eventually be started val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") //Some important parameters are encapsulated with Application Description val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec) //Create ClientActor here client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) //Start ClientActor client.start() waitForRegistration() }
Here are some parameters to start Executor. The class name + parameters are encapsulated as Application Description. Finally, pass it to and create AppClient and call its start method
AppClient Creates Sequence Diagrams
start Method of AppClient
Next, focus on the start method
def start() { // Just launch an actor; it will call back into the listener. actor = actorSystem.actorOf(Props(new ClientActor)) }
In the start method, a ClientActor that communicates with Master is created, and then its preStart method is called to register with Master. Next, look at its preStart.
override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { //ClientActor registers with Master registerWithMaster() } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() context.stop(self) } }
Finally, this method is called to register with all Master s.
def tryRegisterAllMasters() { for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") //t gets the Master reference through actorSelection val actor = context.actorSelection(masterAkkaUrl) //Send Asynchronous Registration App Message to Master actor ! RegisterApplication(appDescription) } }
Application Description, a message sent by ClientActor to register App, contains the required resources, the name of the Executor class to be started, and some parameters
Master's Receiver
case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) //Create App sender: ClientActor val app = createApplication(description, sender) //Register App registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) //Persistent App persistenceEngine.addApplication(app) //Feedback ClientActor and tell him that app registration has been successful sender ! RegisteredApplication(app.id, masterUrl) //TODO Scheduling Tasks schedule() } }
registerApplication(app)
def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.path.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } //Put App in a collection applicationMetricsSystem.registerSource(app.appSource) apps += app idToApp(app.id) = app actorToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app }
Master saves the received information to the collection and serializes it, sends a Registered Application message notification back to ClientActor, and then executes the schedule() method, which traverses the workers collection and executes launchExecutor.
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) //Record how many resources are used by the worker worker.addExecutor(exec) //Master sends a message to Worker to start Executor worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) //Master sends a message to ClientActor telling ClientActor executor that it has started exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
Here Master sends a message to Worker to start Executor
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
application.desc contains startup information for the Executor class
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => . . . . . appDirectories(appId) = appLocalDirs //It's important to create an Executor Runner, which saves Executor's execution configuration and parameters val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, ExecutorState.LOADING) executors(appId + "/" + execId) = manager //TODO starts to start Executor Runner manager.start() . . . . . . } } }
Worker's Receiver receives the message to start Executor, and the appDesc object stores Command commands, Executor's implementation classes and parameters.
A thread is created in manager.start().
def start() { //Start a thread workerThread = new Thread("ExecutorRunner for " + fullId) { //Use a subthread to help Worker start Executor subprocesses override def run() { fetchAndRunExecutor() } } workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { killProcess(Some("Worker shutting down")) } } Runtime.getRuntime.addShutdownHook(shutdownHook) }
The fetchAndRunExecutor() method is called in the thread. Let's look at this method.
def fetchAndRunExecutor() { try { // Launch the process val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory, sparkHome.getAbsolutePath, substituteVariables) //Build commands val command = builder.command() logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) builder.directory(executorDir) builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(",")) // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") // Add webUI log urls val baseUrl = s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") //Promoter process process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) // or with nonzero exit code //Start execution, wait for the end signal val exitCode = process.waitFor() . . . . } }
In this case, class names and parameters are assembled. The specific assembly process is not concerned. Finally, builder.start() will start a sub-process in the way of System Runtime. This is the class name of the process is CoarseGrained Executor Backend.
At this point the Executor process starts
Executor Creates Sequence Diagrams
Executor Task Scheduling Object Start
After the Executor process, the main method is executed first. The main code is as follows
//Executor process startup entry def main(args: Array[String]) { . . . . //Assembly parameters while (!argv.isEmpty) { argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") printUsageAndExit() } } if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit() } //Start Executor run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) }
run method is executed
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) . . . . . //Create CoarseGrained Executor Backend - > Actor through actor System //Coarse Grained Executor Backend - > DriverActor Communication env.actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores, userClassPath, env), name = "Executor") . . . . . . } env.actorSystem.awaitTermination() } }
In the run method, an Actor object is created for CoarseGrainedExecutorBackend to prepare and communicate with DriverActor, and then the preStart lifecycle method is continued to be invoked.
override def preStart() { logInfo("Connecting to driver: " + driverUrl) //Executor connects to DriverActor driver = context.actorSelection(driverUrl) //Executor sends messages to DriverActor driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
Executor sends registered messages to DriverActor
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
After the receiver of DriverActor receives the message
def receiveWithLogging = { //Registration message sent by Executor to DriverActor case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) //DriverActor sends a successful registration message to Executor sender ! RegisteredExecutor addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) //Encapsulating Executor's information val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { //Add Executor's information object to the collection executorDataMap.put(executorId, data) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) //Used to execute real business logic in the future makeOffers() }
In Driver Actor's Reciver, Executor information is encapsulated in Map and saved, and feedback message sender is sent!
To Coarse Grained Executor Backend
override def receiveWithLogging = { case RegisteredExecutor => logInfo("Successfully registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort) executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
When CoarseGrainedExecutor Backend receives a message, it creates an Executor object to prepare for the execution of the task. By this time, the creation of Executor is completed. The scheduling of the task is described in the next section.