DAGScheduler--stage partition and creation and stage submission
In this article, I will start with the operation of a spark job and connect all the steps involved in the process of spark operation, including the division of DAG graph, the creation of task set, resource allocation, task serialization, task distribution to executor s, task execution, task result return driver, etc. with the call chain of the whole task operation as the clue, and then link spark-core. In this way, we can have an overall understanding of the role of each infrastructure module of Spark, and then have an impression of the overall framework of Spark, and then break through each module, respectively, in-depth study, through this step-by-step way, we can finally have a more in-depth and comprehensive grasp of spark-core. Of course, the main purpose of this article is to clarify the whole running process of spark jobs.
Entry: SparkContext.runJob
We know that the lazy execution of jobs in spark can bring some operators together like pipelines to form a flow-mode computing model. Personally, I think this feature is also an important reason why spark has better performance than mapreduce. As for some later mapreduce-based optimization frameworks, such as tez, mahout and so on, it is actually an important advantage. The method of transformation is also to put some operators chain which can be pipelined to avoid multiple drops in the middle. Going too far, let's go back to this method. As can be seen from the method annotations, this method is the entrance of all the action operators in spark.
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @param resultHandler callback to pass each result to */
def runJobT, U: ClassTag: Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
// Calling the runJob method of DAGScheduler
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
// Update progress bar information printed by console
progressBar.foreach(_.finishAll())
// checkpoint for processing RDD
rdd.doCheckpoint()
}
- First, remove unnecessary references to closures. This step is mainly to facilitate serialization, because unnecessary references may refer to non-serializable objects, which can lead to non-serializable functions. Most of the time, the user's code is not very reliable, spark takes this into account, so this is also to minimize the difficulty of user development.
- The logic of invoking DAGScheduler to perform the submission task
This method is simple and needn't be repeated.
DAGScheduler.submitJob
After some calls, this method will eventually be called.
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length // Check for illegal partition partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } // Next JobId increases by 1 at a time val jobId = nextJobId.getAndIncrement() // If the number of partitions to run is 0, then there is no need to run, just return to success. if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // Delivery of a task-submitted event to the DAG event handler eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter
}
The logic of this method is also simple. First, do some checks, and then send a job-submitted event to an event handler inside the DAG scheduler. DAGScheduler has its own event handler, which is a very conventional event loop processing. It uses a single thread method to process events in the event queue. The logic is simple, so it will not expand here. After the task is submitted, the handleJobSubmitted method of DAGScheduler is finally invoked. We can see that there are many other similar processing methods in DAGScheduler, which correspond to different event types. Event distribution logic is not expanded in the DAGScheduler Event ProcessLoop. doOnReceive method. We're still back on the main line of job execution and continue to look at handleJob Submitted.
handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. // Create the last stage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } // Setting up Active Tasks val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) // Update some bookkeeping val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) // Delivering an event to the event bus listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) // Submit the last stage submitStage(finalStage)
}
Some of the bookkeeping updates involved are no longer being expanded.
To create the last stage, this step actually divides the entire RDD's Computational Diagram (DAG) into different stages according to the shuffle dependencies. The last step action operator creates the ResultStage and submits the last stage.
In the following summary, we focus on the partition of DAG graph and the creation of stage, which is also the main function of DAGScheduler.
Partition and Creation of Stages
DAGScheduler.createResultStage
private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { // First create the dependent parent stage val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // With the parent stage, you can create the last stage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
The focus is on creating a parent stage.
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
getShuffleDependencies
This method uses a stack to implement depth-first traversal of rdd. You can see that when you find a shuffle dependency, you record it and stop looking for a shuffle dependency.
So this method only finds all shuffle dependencies at the upper level of the rdd on the whole DAG graph, not across multiple shuffle dependencies.
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[, , _]] = {
val parents = new HashSet[ShuffleDependency[, , _]]
val visited = new HashSet[RDD[_]]
// Depth-first traversal with stack
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
// If it's a shuffle dependency, record it and don't continue looking up for the shuffle dependency dependency
case shuffleDep: ShuffleDependency[, , _] =>
parents += shuffleDep
case dependency =>
// For narrow dependencies,
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
getOrCreateShuffleMapStage
Let's move on to another important way to create a stage for shuffle.
private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. // Get all ancestor shuffle dependencies that have not yet created stage s getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies // that were not already in shuffleIdToMapStage, it's possible that by the time we // get to a particular dependency in the foreach loop, it's been added to // shuffleIdToMapStage by the stage creation process for an earlier dependency. See // SPARK-13902 for more information. if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
As you can see, this method will create all ancestor shuffle dependencies that have not yet created stage s.
Let's look at the process of creating ShuffleMapStage:
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { // As you can see here, the rdd of a ShuffleStage is the rdd of the input side of shuffle. val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length // The method to get the parent Stage is called here, and in fact these methods form recursive calls. val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // A Stage is the encapsulation of some references, the more important of which is the mapOutputTracker. val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) // Update some bookkeeping stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) // Register the shuffle in the map output tracker if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
The key steps are:
- Create all parent stage s
- To encapsulate a ShuffleMapStage object, it is more important to refer to the mapOutputTracker object. The main function of this object is to track the location information of the map output in the process of shuffle. Later, we will talk about that the map output is partitioned, sorted and serialized by shuffle Manager, and then stored by blockManager. The location information of the map output is identified by blockId and returned to the driver. There is a MapOutput Tracker Ma in the driver. The ster component is specifically responsible for maintaining the location information of the output of all map tasks in all stage s.
- Registering a newly created stage in mpOutput Tracker Master actually adds a piece of data to the mapping structure
Summary
A brief summary of the stage creation process is given: there are several recursive calls formed by methods; in the process of traversing rdd dependencies, a stage is created for each shuffle dependency, and after all upstream stages are created, a ResultStage is created.
stage submission
Next, let's look at the last step that DAGScheduler is responsible for during the job run: stage submission
submitStage
The first is the submitStage method.
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
This method is relatively simple:
First, commit has not run too many parent stage s, putting itself in a waiting queue
If both parent stages are running, or there is no parent stage, submit the current stage, which calls submitMissing Tasks.
submitMissingTasks
private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // First figure out the indexes of partition ids to compute. // The first is to find out which partition s have not yet been computed. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties // Update bookkeeping runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. // Update of internal bookkeeping in output CommitCoordinator stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } // Find out the biased position of each Task. For general shuffle stage, the biased position of Task is calculated by map Output Tracker. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } // Update information on the last attempt of stage stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // If there are tasks to execute, record the submission time of the stage. Otherwise, // post the even without the submission time, which indicates that this stage was // skipped. if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } // Delivery of an event to the event bus listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. // Serialization of tasks, where RDD and ShuffleDependency objects are serialized var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } // The serialized data of RDD and Shuffle Dependency are transmitted to the executor through broadcast variables // Broadcast variables actually write data to memory or disk through blockManager, and then executor remotely pulls data through rpc taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } val tasks: Seq[Task[_]] = try { // Serialization of Statistical Accumulator Objects for Task Running // There is one interesting aspect of accumulator object serialization, which you can see in the readObject method val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() // Create a Task for each partition partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") // From here DAGScheduler handed the baton to the Task dispatcher. taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) submitWaitingChildStages(stage) } }
This method is relatively long, but should be said to be the most important method in the process of job submission by the DAG scheduler. The main thing is actually to create a task set based on the stage s to be submitted, each partition creates a Task, and all the Tasks to be computed form a task set.
- Update some bookkeeping
- Find out the biased position of each Task. For general shuffle stage, the biased position of Task is calculated by map Output Tracker.
- Deliver a stage-submitted event to the event bus
- Serialize the calculation function func of RDD and Shuffle Dependency or ResultStage for transmission
- There is an interesting point in serializing the accumulator object of the task run statistics. In the readObject method, you can see that
- Create a Task for each partition to be computed and divide it into ShuffleMapTask and ResultTask according to stage type
- Finally, the Task Scheduler method is called to submit the task.
So far, DAGScheduler has fulfilled his mission, successfully handed the baton to Task Scheduler, and then the performance of Task Scheduler.
In the next article, we will continue to analyze Task Scheduler Impl for task submission, mainly for resource allocation. We need to consider locality, blacklist, balance and other issues.
Legacy issues
- How to calculate the biased position of the task?
- The role of output CommitCoordinator?
- What is the underlying mechanism of broadcast variables? This will be followed by a special analysis of broadcast variables, in fact, using block manager blockManager (block manager should be the most important infrastructure)