Spark source code analysis: Master registration mechanism

Master registration mechanism

Application registration

The previous article has analyzed the initialization process of the SparkContext, and finally sent the registration information of the RegisterApplication type to the Master
Now let's see how the Master responds after receiving these messages
First, the Master class inherits the ThreadSafeRpcEndpoint class
Take a look at Master's receive()
Here only look at the code related to application registration
Because Spark can allow multiple masters to exist, but only one of them is ACTIVE, the others are STANDBY, and only the ACTIVE Master will respond to the application registration request
The main task is to add the application information to the cache structure, and send the type of RegisteredApplication information to the corresponding driver of the application. Finally, schedule() will be called, and the function of schedule() will be introduced in the following blog

override def receive: PartialFunction[Any, Unit] = {
  // Process registration information from application
case RegisterApplication(description, driver) =>
  // TODO Prevent repeated registrations from some driver
  // If the current master is standby and not active, no response will be made
  if (state == RecoveryState.STANDBY) {
    // ignore, don't send response
  } else {
    logInfo("Registering app " + description.name)
    // Using registration information to create application objects
    // The id of the application will be generated here. The specific format is
    // val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
    // Where nextAppNumber is automatically increased from 0
    val app = createApplication(description, driver)
    registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
    // Add the current applicationinfo to the cache engine
    persistenceEngine.addApplication(app)
    // Send response information to driver
    driver.send(RegisteredApplication(app.id, self))
    schedule()
  }
}
/**
 * Process registration from application
  * Add applicationInfo to cache
  * Add application to the waiting queue
  * @param app
  */
private def registerApplication(app: ApplicationInfo): Unit = {
  // Get the address of the application driver
  val appAddress = app.driver.address
  // If the driver already exists, it will be returned and judged as duplicate registration of app
  if (addressToApp.contains(appAddress)) {
    logInfo("Attempted to re-register application at same address: " + appAddress)
    return
  }

  applicationMetricsSystem.registerSource(app.appSource)

  apps += app
  idToApp(app.id) = app
  endpointToApp(app.driver) = app
  addressToApp(appAddress) = app
  // Join the waiting queue
  waitingApps += app
}

Now let's see what the driver will do after receiving the response information. Because it analyzes the standalone mode, the StandaloneAppClient is responsible for receiving the response information here. From this, we can see that the AppClient is used to communicate with the cluster

override def receive: PartialFunction[Any, Unit] = {

  case RegisteredApplication(appId_, masterRef) =>
  	// This is mainly to record the applicationId and the successful sign registration
    appId.set(appId_)
    registered.set(true)
    master = Some(masterRef)
    listener.connected(appId.get)
}

Driver registration

When you use spark submit to submit a task, you will first register the Driver and send the information of RequestSubmitDriver type to the Master. Let's see how the Master handles it

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 case RequestSubmitDriver(description) =>
 	// When the master is in active state, the registration information of the Driver cannot be processed
   if (state != RecoveryState.ALIVE) {
     val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
       "Can only accept driver submissions in ALIVE state."
     context.reply(SubmitDriverResponse(self, false, None, msg))
   } else {
     logInfo("Driver submitted " + description.command.mainClass)
     // Some memory caches will be updated below
     val driver = createDriver(description)
     persistenceEngine.addDriver(driver)
     // Add driver to driver's waiting queue
     waitingDrivers += driver
     drivers.add(driver)
     // Trigger scheduling
     schedule()

	// Send response message to Driver
     context.reply(SubmitDriverResponse(self, true, Some(driver.id),
       s"Driver successfully submitted as ${driver.id}"))
   }
}

Worker registration

When the worker starts, it will also send the registration information to the Master

case RegisterWorker(
   id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
   // You can see that the information of worker includes the number of core of worker and the size of RAM
   logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
     workerHost, workerPort, cores, Utils.megabytesToString(memory)))
     // If the current Master is STANDBY, the worker will be notified
   if (state == RecoveryState.STANDBY) {
     workerRef.send(MasterInStandby)
   } else if (idToWorker.contains(id)) {
     workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
   } else {
     val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
       workerRef, workerWebUiUrl)
       // Add worker information to the cache
       
     if (registerWorker(worker)) {
       persistenceEngine.addWorker(worker)
       workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
       // Trigger scheduling
       schedule()
     } else {
       val workerAddress = worker.endpoint.address
       logWarning("Worker registration failed. Attempted to re-register worker at same " +
         "address: " + workerAddress)
       workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
         + workerAddress))
     }
   }

Keywords: Spark

Added by Muddy_Funster on Mon, 02 Dec 2019 11:41:42 +0200