summary
- Akka is a library developed using Scala. It is based on event driven toolkit for building high concurrency projects
- Akka characteristics:
- Provide an asynchronous, non blocking, high-performance event driven programming model
- The built-in fault tolerance mechanism allows the Actor to recover or reset in case of error
- Lightweight event processing (millions of actors per GB of heap memory. * lightweight event processing and heavyweight Division mainly depends on whether it depends on the operating system and hardware. Dependence is heavyweight and non dependence is lightweight
- It can build highly concurrent applications on a single machine or distributed applications in the network
Akka communication process
- Students create an actor system
- Create an actor ref message and send it to the teacher through actor Ref
- ActorRef sends the message to the message dispatcher
- The Message Dispatcher saves the messages to the MailBox of the target Actor in order
- Message Dispatcher puts the MailBox into a thread
- MailBox takes out the messages in order and finally passes them to the method received by TeacherActor
API introduction:
- Actor system: responsible for creating and supervising actors
- Actor system is a singleton object through which many actors can be created
- Use context System can get the reference of the ActorSystem that manages the Actor
- Implement Actor class
- Define class or singleton object to inherit Actor(import akka.actor.Actor)
- Implement the receive method to receive messages (without adding the loop & react method)
- You can implement the preStart() method (optional), which is executed after the Actor object is built and only once in the Actor life cycle
- Load Actor
- To create Akka's Actor object, you must first create ActorSystem
- Call actorsystem Actorof (props (Actor object), "Actor name") to load Actor
Actor Path
- Each Actor has a Path that can be referenced externally
type | route |
---|---|
Local Actor | akka://actorSystem Name / user/Actor name |
Remote Actor | akka.tcp://dest-actorSystem@ip address: port/user/Actor name |
Introductory case
- Load two actors (SenderActor & ReceiverActor) through the actor system, send messages from SenderActor, receive them in ReceiverActor, and then reply to messages
package com.akka.ex1 /** * Message format for submitting tasks * * @param msg Send message */ case class SubmitTaskMessage(msg: String) /** * Format of receipt information after task submission is successful * * @param msg Receipt information */ case class SuccessSubmitTaskMessage(msg: String) package com.akka.ex1 import akka.actor.Actor object SenderActor extends Actor { override def receive: Receive = { // Receive the message sent by entry: start case "start" => { println("SenderActor, received: start!") // Gets the path of the ReceiverActor val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor") // Send message to ReceiverActor receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.") } // Receive the receipt information returned by the ReceiverActor case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}") } } package com.akka.ex1 import akka.actor.Actor object ReceiverActor extends Actor { override def receive: Receive = { // Receive messages from SenderActor case SubmitTaskMessage(msg) => { println(s"ReceiverActor, received: ${msg}") // Reply to SenderActor sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.") } } } package com.akka.ex1 import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory object Entrance { def main(args: Array[String]): Unit = { /** * Create an Actor system to be responsible for creating and supervising actors * * @param name : scala.Predef.String Set a name for ActorSystem * @param config : com.typesafe.config.Config Configuration environment */ val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) /** * Load custom Actor objects through ActorSystem * * @param props : akka.actor.Props Specifies the Actor companion objects to manage * @param name : scala.Predef.String Sets the name of the specified Actor object */ val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor") // You must set a name for each Actor, otherwise you cannot pass context. From inside SenderActor Obtain the object of the ReceiverActor in the Actor selection mode // The actor will be prompted[ akka://actorSystem/user/receiverActor ] was not delivered actorSystem.actorOf(Props(ReceiverActor), "receiverActor") // Send "start" string to SenderActor senderActor ! "start" } } SenderActor, received: start! ReceiverActor, received: Hello ReceiverActor!, This is SenderActor. SenderActor, received: Hi!, This is ReceiverActor.
Timed task case
- Through actorsystem scheduler. Schedule() method to start the scheduled task
- Usage 1:
final def schedule( initialDelay : FiniteDuration, // Start for the first time, press the set time, and execute after delay interval : FiniteDuration, // How often is it executed (starting for the first time, immediately, without delay) receiver : ActorRef, // Set the Actor that the target receives the message message : Any) // Message to send (implicit executor : ExecutionContext, sender : ActorRef = {}) // Implicit parameter, need to import
- Usage 2:
final def schedule( initialDelay : FiniteDuration, // Start for the first time, press the set time, and execute after delay interval : FiniteDuration // How often is it executed (starting for the first time, immediately, without delay) )(f : => Unit) // Functions (messages) to be executed periodically (implicit executor : ExecutionContext) // Implicit parameter, need to import
package com.akka.ex2 import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory object MainActor { object ReceiverActor extends Actor { override def receive: Receive = { case x => println(x) } } def main(args: Array[String]): Unit = { // Create an Actor system to be responsible for creating and supervising actors val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) // Load custom Actor objects through ActorSystem val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor") // Import implicit parameter & Transformation import actorSystem.dispatcher import scala.concurrent.duration._ // Send messages to the ReceiverActor regularly through the timer // Method 1: adopt the message of Any data type parameter provided actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.") // Method 2: messages using custom functions actorSystem.scheduler.schedule(0 seconds, 5 seconds) { receiverActor ! "Hello ReceiverActor!, 222." } } }
Communication case between two processes
- The WorkerActor sends a "connect" message to the MasterActor
- MasterActor replies "success" message to WorkerActor
- The worker actor receives and prints the received message
package com.akka.master import akka.actor.Actor object MasterActor extends Actor { override def receive: Receive = { case "setup" => println("MasterActor started!") // Receive messages from WorkerActor case "connect" => { println("MasterActor, received: connect!") // Receipt information returned to the sender (WorkerActor) sender ! "success" } } } package com.akka.master import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory object Entrance { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor") // Send message to MasterActor masterActor ! "setup" } } package com.akka.worker import akka.actor.Actor // Path of WorkerActor: akka tcp://actorSystem @127.0.0.1:8081/user/WorkerActor object WorkerActor extends Actor { override def receive: Receive = { case "setup" => { println("WorkerActor started!") // Get MasterActor remotely val masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor") // Send the string connect to the MasterActor masterActor ! "connect" } // Receive messages from MasterActor case "success" => println("MasterActor, received: success!") } } package com.akka.worker import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory object Entrance { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor") // Send message to WorkerActor workerActor ! "setup" } }
Implementation case of simple Spark communication framework
- Simulate the communication between Spark Master and multiple workers
- Operation steps:
-
Start MasterActor
1.1 after the masteractor object is built, start the scheduled task (for self-test, to remove the overtime WorkerActor) -
Start WorkerActor
2.1 after the workeractor object is built, encapsulate its own information into registration information and send it to MasterActor -
MasterActor receives the registration information of WorkerActor and saves it
3.1 receipt information to WorkerActor -
After the WorkerActor requests registration, it receives the information and prints Connection is successful!
4.1 start the scheduled task and send a heartbeat message to the MasterActor -
The MasterActor receives the heartbeat message from the WorkerActor and updates the last heartbeat time in the registration information of the WorkerActor to the current time
project name | explain |
---|---|
scala-spark-akka-common | Store public message entity classes |
scala-spark-akka-master | Akka Master node |
scala-spark-akka-worker | Akka Worker node |
package com.akka.spark.common /** * A class that holds information about registered workeractors * * @param workerId : WorkerActor Id(UUID) of * @param cpuCores : WorkerActor Number of CPU cores * @param memory : WorkerActor Memory size * @param lastHeartBeatTime : Last heartbeat time */ case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long) package com.akka.spark.common /** * WorkerActor Class submitting registration information * * @param workerId : WorkerActor Id(UUID) of * @param cpuCores : WorkerActor Number of CPU cores * @param memory : WorkerActor Memory size */ case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int) /** The singleton object that receives the receipt after successful registration*/ case object RegisterSuccessMessage /** * WorkerActor Information class that regularly triggers heartbeat to MasterActor * * @param workerId : WorkerActor Id(UUID) of * @param cpuCores : WorkerActor Number of CPU cores * @param memory : WorkerActor Memory size */ case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int) package com.akka.spark.master import com.typesafe.config.{Config, ConfigFactory} // Class used to read configuration file information object ConfigUtils { // 1. Get configuration file object private val config: Config = ConfigFactory.load() // 2. Get the time interval to check the heartbeat of WorkerActor val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval") // 3. Get WorkerActor heartbeat timeout val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout") } package com.akka.spark.master import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory object Master { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) actorSystem.actorOf(Props(MasterActor), "masterActor") } } package com.akka.spark.master import java.util.Date import akka.actor.Actor import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage} object MasterActor extends Actor { // 1. Define a variable Map set to save the registered WorkerActor information private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]() // MasterActor regularly checks the heartbeat of WorkerActor and removes the workers that have timed out override def preStart(): Unit = { // 1. Import time implicit parameter & conversion import context.dispatcher import scala.concurrent.duration._ // 2. Start the scheduled task (MasterActor self test removes the overtime WorkerActor) context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) { // 3. Workeractor that filters timeout (return the filtered workerId set) val timeOutWorkerMap = regWorkerMap.filter { keyVal => // Data format of keyVal: workerid - > workerinfo (workerid, cpucores, memory, lastheartbeattime) // 3.1 get the last heartbeat time of the current WorkerActor object val lastHeartBeatTime = keyVal._2.lastHeartBeatTime // 3.2 if timeout occurs, true; otherwise, false (current time - last heartbeat time) > maximum timeout * 1000 if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false } // 4. Set of timed out workerids to be removed if (!timeOutWorkerMap.isEmpty) { regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b) } // 5. Valid workeractors are arranged in descending order according to the memory size val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverse println(s"Active WorkerActors: ${workerList}") } } override def receive: Receive = { // Receive the registration information of WorkerActor case WorkerRegisterMessage(workerId, cpuCores, memory) => { // Print the received registration information println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}") // Save the registration information to the hash table & and record the last heartbeat time regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime) // Receipt information for registered WorkerActor sender ! RegisterSuccessMessage } // Receive heartbeat message from WorkerActor case WorkerHeartBeatMessage(workerId, cpuCores, memory) => { println(s"MasterActor, received heartbeat: ${workerId}") // Updates the last heartbeat time of the specified WorkerActor object regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime) } } } package com.akka.spark.worker import com.typesafe.config.{Config, ConfigFactory} // Class used to read configuration file information object ConfigUtils { // 1. Get configuration file object private val config: Config = ConfigFactory.load() // 2. Get the heartbeat interval of WorkerActor val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval") } package com.akka.spark.worker import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigFactory object Worker { def main(args: Array[String]): Unit = { val actorSystem = ActorSystem("actorSystem", ConfigFactory.load()) actorSystem.actorOf(Props(WorkerActor), "workerActor") } } package com.akka.spark.worker import java.util.UUID import akka.actor.{Actor, ActorSelection} import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage} import scala.util.Random object WorkerActor extends Actor { // Represents a reference to the MasterActor private var masterActor: ActorSelection = _ // Registration information of WorkerActor private var workerId: String = _ private var cpuCores: Int = _ // Number of CPU cores private var memory: Int = _ // Memory size private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // Random range of CPU cores private val memoryList = List(512, 1024, 2048, 4096) // Random range of memory size override def preStart(): Unit = { // Gets a reference to the MasterActor masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor") // Random setting number workerId = UUID.randomUUID().toString // Randomly select the number of CPU cores and memory size of WorkerActor val r = new Random() cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length)) memory = memoryList(r.nextInt(memoryList.length)) // Encapsulate the registration information of WorkerActor val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory) // Send to MasterActor masterActor ! registerMessage } override def receive: Receive = { // Receipt information of successful registration case RegisterSuccessMessage => { println("Connection is successful!") // 1. Import time implicit parameter & conversion import context.dispatcher import scala.concurrent.duration._ // Send heartbeat message to MasterActor regularly context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) { masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory) } } } }
If you feel helpful, welcome to like it ~ thank you!!