spark task assignment - TaskSchedulerImpl source code analysis TaskSchedulerImpl

TaskSchedulerImpl

As mentioned in the previous article, after the DAGScheduler divides the entire computing chain of jobs into multiple stages according to the shuffle dependency, it starts to submit the last ResultStage. In fact, due to the dependency between stages, it finally submits stages from top to bottom along the computing chain. Each time a stage is submitted, it will be divided into multiple tasks, and the bias position of each Task will be calculated. RDD, shufflependency, TaskMetrics and other objects will be serialized for remote transmission. Finally, all tasks of a stage will be packaged into a Task set and submitted to TaskSchedulerImpl for operation. This section will analyze the Task scheduler impl. First, translate the description of TaskSchedulerImpl:

The main function of TaskSchedulerImpl is to schedule tasks and transmit actual tasks internally through the scheduling backend. Different cluster types correspond to different specific implementation of scheduling backend. For example, the implementation of scheduling backend in local mode is LocalSchedulerBackend, while there is only one implementation of Task scheduler, TaskSchedulerImpl. Taskscheduler impl mainly deals with some general logic, such as determining the scheduling order among multiple jobs, executing the logic of speculative execution, and so on.

TaskSchedulerImpl should call the initialize() and start() methods before submitting the task set.

Let's insert a sentence here. Where are the two methods called respectively? All are called during SparkContext initialization. See the SparkContext initialization code for details.

Some tips on thread safety: since multiple threads submit tasks at the same time, external public methods must be locked to maintain the consistency of internal state quantity and bookkeeping quantity.
In addition, Some scheduling backend The method of (SchedulerBackend) will first obtain its own lock, and then obtain the lock of TaskSchedulerImpl object. You should avoid trying to obtain the lock of the scheduling backend when holding the lock of TaskSchedulerImpl object, which will cause deadlock. In fact, the meaning of this sentence is that some operations need to hold both the lock of the scheduling backend and the lock of TaskSchedulerImpl When multiple locks need to be held at the same time, the order of obtaining locks should be consistent, so as to avoid deadlock.
For example, some operations need to acquire A lock and b lock at the same time. If the acquisition order of method m1 is to acquire A lock first and then acquire b lock, and m2 is to acquire B first and then A, if two threads t1 and t2 execute methods m1 and m2 respectively at the same time, it may form A situation at some time: t1 acquires A and waits for B, while t2 acquires B and waits for A, In this way, waiting for each other without releasing the lock will form A deadlock.

Well, let's continue the analysis of the task submission logic.

submitTasks

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// Lock and update some bookkeeping quantity and status quantity
this.synchronized {
  // Create a TaskSetManager to further encapsulate the Task set
  val manager = createTaskSetManager(taskSet, maxTaskFailures)
  val stage = taskSet.stageId
  // Update the mapping relationship between stageId and TaskSetManager. Due to the failure retry mechanism, a stage may be tried multiple times
  val stageTaskSets =
    taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  // Add the TaskSetManager of this attempt to the map
  stageTaskSets(taskSet.stageAttemptId) = manager
  // Check whether there are still running stage attempts. If there are repeated submissions, an exception needs to be thrown
  val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
    ts.taskSet != taskSet && !ts.isZombie
  }
  if (conflictingTaskSet) {
    throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
      s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
  }
  // The newly created TaskSetManager is added to the scheduling pool. The scheduling pool determines how multiple TaskSet queues should be sorted.
  // You can use taskset Properties set the queue name, taskset Properties is set through a ThreadLocal variable of SparkContext
  schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

  // Start a scheduled task to print prompt information
  if (!isLocal && !hasReceivedTask) {
    starvationTimer.scheduleAtFixedRate(new TimerTask() {
      override def run() {
        if (!hasLaunchedTask) {
          logWarning("Initial job has not accepted any resources; " +
            "check your cluster UI to ensure that workers are registered " +
            "and have sufficient resources")
        } else {
          this.cancel()
        }
      }
    }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
  }
  hasReceivedTask = true
}
// Here, the lock has been released
// Notify the scheduling backend that there are tasks to run,
backend.reviveOffers()
}
  • Get the lock and update some bookkeeping
  • Encapsulate the new task set as a TaskSetManager and add it to the scheduling pool
  • Call SchedulerBackEnd to assign available resources to the task

CoarseGrainedSchedulerBackend.reviveOffers

override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

This method sends a message to driverandpoint through the rpc module, and calls the rpc method in the local process, mainly for the unification of code modules. At driverentoint In the receive method, we can see that after receiving the ReviveOffers message, the makeOffers method will be called,

DriverEndpoint.makeOffers

private def makeOffers() {
  // Make sure no executor is killed while some task is launching on it
  // Acquire lock, synchronize
  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
    // Filter out executors under killing
    // Filter out the executor being killed
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    // Encapsulate all available executor s into resource objects
    val workOffers = activeExecutors.map {
      case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    // Give these available resources to taskscheduler impl for scheduling
    // TaskSchedulerImpl will comprehensively consider the factors such as task locality, blacklist, scheduling order of scheduling pool, etc., and return the TaskDescription set
    // The TaskDescription object is a complete description of a Task,
    // It includes serialized task data, which executor the task runs on, dependent files and jar packages
    scheduler.resourceOffers(workOffers)
  }
  if (!taskDescs.isEmpty) {
    launchTasks(taskDescs)
  }
}

This method mainly encapsulates all currently available resources (executors) into resource objects (WorkerOffer) give it to TaskSchedulerImpl. TaskSchedulerImpl will comprehensively consider factors such as Task locality, blacklist, scheduling order of scheduling pool, and return the TaskDescription set. The TaskDescription object is a complete description of a Task, including serialized Task data, which executor the Task runs on, dependent files and jar packages.

It can also be seen from here that the responsibilities of the scheduling backend are relatively few, mainly for the management of the executor and the reference of calling the rpc remote service to send task data. Most of the scheduling work is still completed by TaskSchedulerImpl.
Next, let's analyze the most important method of Task scheduling, taskscheduler impl resourceOffers

TaskSchedulerImpl.resourceOffers

// This method is called by the scheduling backend, which will tell TaskSchedulerImpl the available executor resources,
// TaskSchedulerImpl gives the tasks to be actually run according to TaskSet priority (scheduling pool), blacklist, locality and other factors.
// We use the round robin method to allocate tasks to each executor to make the use of computing resources more balanced.
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// Flag whether a new executor is available to join
var newExecAvail = false
// The main purposes of this cycle are two:
// 1. Update some bookkeeping quantities, such as the mapping relationship between physical nodes and executor, the mapping relationship between rack and host, the task information running on host and executor, etc
// 2. Check whether a new available executor is added
for (o <- offers) {
  if (!hostToExecutors.contains(o.host)) {
    hostToExecutors(o.host) = new HashSet[String]()
  }
  if (!executorIdToRunningTaskIds.contains(o.executorId)) {
    hostToExecutors(o.host) += o.executorId
    executorAdded(o.executorId, o.host)
    executorIdToHost(o.executorId) = o.host
    executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
    newExecAvail = true
  }
  for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
  }
}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
// Trigger the timeout check of the blacklist. The node or executor added to the blacklist is determined by a certain timeout time,
// You cannot submit tasks to them within the timeout, and after the timeout, these resources will be put back into use
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

// Filter out the computing resources in the blacklist according to the latest blacklist, including host and executor
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
  offers.filter { offer =>
    !blacklistTracker.isNodeBlacklisted(offer.host) &&
      !blacklistTracker.isExecutorBlacklisted(offer.executorId)
  }
}.getOrElse(offers)

// Shuffle the resources to make the distribution more uniform. Use Random of scala library for shuffling
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// How many tasks can each executor assign, cores / CPUS_PER_TASK
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
// Number of cpu cores per executor
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// All task sets are sorted by priority through the scheduling pool to obtain the sorted task set
val sortedTaskSets = rootPool.getSortedTaskSetQueue
// If a new executor is added, each TaskSetManager needs to be notified
for (taskSet <- sortedTaskSets) {
  logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  if (newExecAvail) {
    taskSet.executorAdded()
  }
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// For each task set, allocate resources to it
for (taskSet <- sortedTaskSets) {
  var launchedAnyTask = false
  var launchedTaskAtCurrentMaxLocality = false
  // Order of locality from low to high
  for (currentMaxLocality <- taskSet.myLocalityLevels) {
    // Each local level will be allocated in multiple rounds,
    // Each executor is polled in turn in each round, and each executor is assigned a task,
    // After this round, each executor will be assigned a task. Obviously, in most cases, the executor's resources will not be occupied
    // It doesn't matter. We will proceed to the second round of allocation. When we know that there are no resources or that the tasks have been allocated at the current local level, we will jump out of the loop
    do {
      launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
        taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
      launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
  }
  if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
  }
}

if (tasks.size > 0) {
  hasLaunchedTask = true
}
return tasks
}
  • Update some bookkeeping quantities, such as the mapping relationship between physical nodes and executor, the mapping relationship between rack and host, the task information running on host and executor, etc; Check whether there are new executors available to join
  • Trigger the timeout check of the blacklist. The nodes or executors added to the blacklist have a certain timeout period. They cannot submit tasks to them within the timeout period. After the timeout period, these resources will be put back into use; Filter out the computing resources in the blacklist according to the latest blacklist, including host and executor
  • All task sets are sorted by priority through the scheduling pool to obtain the sorted task set
  • For each task set, tasks are allocated according to the round robin method for executors. Multiple rounds of allocation will be carried out. Each round will poll all executors in turn and assign each executor a task that meets the local requirements

TaskSchedulerImpl.resourceOfferSingleTaskSet

private def resourceOfferSingleTaskSet(
  taskSet: TaskSetManager,
  maxLocality: TaskLocality,
  shuffledOffers: Seq[WorkerOffer],
  availableCpus: Array[Int],
  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
// Poll each executor and assign a Task to it
for (i <- 0 until shuffledOffers.size) {
  val execId = shuffledOffers(i).executorId
  val host = shuffledOffers(i).host
  // Check whether the cpu resources on this executor are sufficient
  if (availableCpus(i) >= CPUS_PER_TASK) {
    try {
      // Retrieve the tasks that can be executed on this executor according to the maximum allowed locality level,
      for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
        // If you can find a local task that can run on this executor,
        // Add this task to the passed in collection
        tasks(i) += task
        val tid = task.taskId
        taskIdToTaskSetManager(tid) = taskSet
        taskIdToExecutorId(tid) = execId
        executorIdToRunningTaskIds(execId).add(tid)
        availableCpus(i) -= CPUS_PER_TASK
        assert(availableCpus(i) >= 0)
        launchedTask = true
      }
    } catch {
      case e: TaskNotSerializableException =>
        logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
        // Do not offer resources for this task, but don't throw an error to allow other
        // task sets to be submitted.
        return launchedTask
    }
  }
}
return launchedTask
}

This method is to allocate all available executors in round robin mode. In a round of allocation, each executor can only get one task at most. This is to "scatter" the tasks as much as possible and "spread" them evenly to all executors.

TaskSetManager.resourceOffer

@throws[TaskNotSerializableException]
def resourceOffer(
  execId: String,
  host: String,
  maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
// First check the blacklist
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
  blacklist.isNodeBlacklistedForTaskSet(host) ||
    blacklist.isExecutorBlacklistedForTaskSet(execId)
}

if (!isZombie && !offerBlacklisted) {
  val curTime = clock.getTimeMillis()

  var allowedLocality = maxLocality

  // Recalculate the locality level based on the locality wait time
  if (maxLocality != TaskLocality.NO_PREF) {
    allowedLocality = getAllowedLocalityLevel(curTime)
    if (allowedLocality > maxLocality) {
      // We're not allowed to search for farther-away tasks
      allowedLocality = maxLocality
    }
  }

  // Find a task that can run on this executor at the specified locality level
  dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
    // Found a task; do some bookkeeping and return a task description
    val task = tasks(index)
    // Assign a taskId
    val taskId = sched.newTaskId()
    // Do various bookkeeping
    // Update some bookkeeping
    copiesRunning(index) += 1
    // Number of task attempts
    val attemptNum = taskAttempts(index).size
    val info = new TaskInfo(taskId, index, attemptNum, curTime,
      execId, host, taskLocality, speculative)
    taskInfos(taskId) = info
    // Record the task information for each attempt
    taskAttempts(index) = info :: taskAttempts(index)
    // Update our locality level for delay scheduling
    // NO_PREF will not affect the variables related to delay scheduling
    // Update the local information and event information to calculate the local waiting time
    // For tasks without local preference, these bookkeeping volumes will not be affected
    if (maxLocality != TaskLocality.NO_PREF) {
      currentLocalityIndex = getLocalityIndex(taskLocality)
      lastLaunchTime = curTime
    }
    // Serialize and return the task
    // Serialize task
    val serializedTask: ByteBuffer = try {
      ser.serialize(task)
    } catch {
      // If the task cannot be serialized, then there's no point to re-attempt the task,
      // as it will always fail. So just abort the whole task-set.
      case NonFatal(e) =>
        val msg = s"Failed to serialize task $taskId, not attempting to retry it."
        logError(msg, e)
        abort(s"$msg Exception during serialization: $e")
        throw new TaskNotSerializableException(e)
    }
    // If the serialized volume exceeds the specified threshold, a warning message is printed
    if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
      !emittedTaskSizeWarning) {
      emittedTaskSizeWarning = true
      logWarning(s"Stage ${task.stageId} contains a task of very large size " +
        s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
        s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
    }
    // Update the bookkeeping amount of running task statistics in the scheduling pool
    addRunningTask(taskId)

    // We used to log the time it takes to serialize the task, but task size is already
    // a good proxy to task serialization time.
    // val timeTaken = clock.getTime() - startTime
    val taskName = s"task ${info.id} in stage ${taskSet.id}"
    logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
      s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")

    // The event that the first task starts is given to the event bus header through the dagScheduler
    sched.dagScheduler.taskStarted(task, info)
    // It is encapsulated into a TaskDescription object and returned to the upper layer call
    new TaskDescription(
      taskId,
      attemptNum,
      execId,
      taskName,
      index,
      addedFiles,
      addedJars,
      task.localProperties,
      serializedTask)
  }
} else {
  None
}
}

The function of this method is to assign a qualified task to a given executor and locality level. The final task is encapsulated as a TaskDescription object.

Summary

Make a summary of the task assignment. Assigning appropriate tasks to a given computing resource is mainly accomplished by TaskScheduler and TaskSetManager. The inspection of task local maintenance and allocation is completed in TaskSetManager.
Next, we analyze how to send these tasks to the developed executor after obtaining the tasks that can be actually run.

DriverEndpoint.makeOffers

First, you have to go back to driverentoint In the makeOffers method, you can call TaskSchedulerImpl The resourceoffers method cuts into TaskSchedulerImpl, and then TaskSchedulerImpl is doing task assignment, Finally, TaskSchedulerImpl returns the assigned tasks to driverindpoint in the encapsulated form of TaskDescription (driverindpoint is an internal class of the scheduling backend), and then calls the driverindpoint.launchtasks method to pass these tasks to the corresponding executor for execution.

DriverEndpoint.launchTasks

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit() >= maxRpcMessageSize) {
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
            "spark.rpc.message.maxSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
          // Note that as long as the volume of a task exceeds the threshold, the taskSet to which the task belongs will be cancelled
          // Why do you do that? Because in fact, the volume of all tasks in a taskset is the same, but the partition number is different,
          // Therefore, if the volume of one task exceeds the threshold, other tasks in the taskset must also exceed the threshold,
          // Therefore, there is no need to try other tasks. It is more efficient to cancel the taskset directly
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      // Maintain cpu resource information
      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")

      // Send the task to the specified executor through rpc
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

This method is simple. It is mainly to serialize the TaskDescription, and then check whether the volume exceeds the threshold. If it does not exceed the threshold, call the rpc service reference and send the task to the specified executor.

summary

Well, after a long call, our task is finally leaving the driver and heading for the executor. Review the process from the birth of the task in the driver to the final sending, mainly including the following steps:

  • DAGScheduler divides the job calculation chain into multiple stages according to shuffle dependency, submits a stage, creates multiple tasks according to some information of each stage, including ShuffleMapTask and ResultTask, encapsulates them into a Task set, and gives the Task set to TaskScheduler
  • TaskSchedulerImpl adds the received task set to the scheduling pool, and then notifies the scheduling backend SchedulerBackend
  • After receiving the notification of new task submission, CoarseGrainedSchedulerBackend checks the available executor s and gives them to TaskSchedulerImpl
  • TaskSchedulerImpl polls the available executors according to the obtained computing resources, the requirements of task locality level and considering the blacklist factors, and assigns tasks according to the round robin method. After multiple local level assignments and multiple rounds of assignments, the allocation relationship between tasks and executors is finally obtained, It is encapsulated in TaskDescription and returned to SchedulerBackend
  • After the SchedulerBackend obtains these allocation relationships, it knows which tasks should be sent to which executor. It can send the tasks through the network by calling the rpc interface.

Added by daleks on Fri, 24 Dec 2021 20:05:08 +0200