Master registration mechanism
Application registration
The previous article has analyzed the initialization process of the SparkContext, and finally sent the registration information of the RegisterApplication type to the Master
Now let's see how the Master responds after receiving these messages
First, the Master class inherits the ThreadSafeRpcEndpoint class
Take a look at Master's receive()
Here only look at the code related to application registration
Because Spark can allow multiple masters to exist, but only one of them is ACTIVE, the others are STANDBY, and only the ACTIVE Master will respond to the application registration request
The main task is to add the application information to the cache structure, and send the type of RegisteredApplication information to the corresponding driver of the application. Finally, schedule() will be called, and the function of schedule() will be introduced in the following blog
override def receive: PartialFunction[Any, Unit] = { // Process registration information from application case RegisterApplication(description, driver) => // TODO Prevent repeated registrations from some driver // If the current master is standby and not active, no response will be made if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) // Using registration information to create application objects // The id of the application will be generated here. The specific format is // val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) // Where nextAppNumber is automatically increased from 0 val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) // Add the current applicationinfo to the cache engine persistenceEngine.addApplication(app) // Send response information to driver driver.send(RegisteredApplication(app.id, self)) schedule() } }
/** * Process registration from application * Add applicationInfo to cache * Add application to the waiting queue * @param app */ private def registerApplication(app: ApplicationInfo): Unit = { // Get the address of the application driver val appAddress = app.driver.address // If the driver already exists, it will be returned and judged as duplicate registration of app if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } applicationMetricsSystem.registerSource(app.appSource) apps += app idToApp(app.id) = app endpointToApp(app.driver) = app addressToApp(appAddress) = app // Join the waiting queue waitingApps += app }
Now let's see what the driver will do after receiving the response information. Because it analyzes the standalone mode, the StandaloneAppClient is responsible for receiving the response information here. From this, we can see that the AppClient is used to communicate with the cluster
override def receive: PartialFunction[Any, Unit] = { case RegisteredApplication(appId_, masterRef) => // This is mainly to record the applicationId and the successful sign registration appId.set(appId_) registered.set(true) master = Some(masterRef) listener.connected(appId.get) }
Driver registration
When you use spark submit to submit a task, you will first register the Driver and send the information of RequestSubmitDriver type to the Master. Let's see how the Master handles it
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RequestSubmitDriver(description) => // When the master is in active state, the registration information of the Driver cannot be processed if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { logInfo("Driver submitted " + description.command.mainClass) // Some memory caches will be updated below val driver = createDriver(description) persistenceEngine.addDriver(driver) // Add driver to driver's waiting queue waitingDrivers += driver drivers.add(driver) // Trigger scheduling schedule() // Send response message to Driver context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } }
Worker registration
When the worker starts, it will also send the registration information to the Master
case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) => // You can see that the information of worker includes the number of core of worker and the size of RAM logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) // If the current Master is STANDBY, the worker will be notified if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else if (idToWorker.contains(id)) { workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) // Add worker information to the cache if (registerWorker(worker)) { persistenceEngine.addWorker(worker) workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress)) // Trigger scheduling schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } }