Akka of Scala concurrent programming

summary

  • Akka is a library developed using Scala. It is based on event driven toolkit for building high concurrency projects
  • Akka characteristics:
  1. Provide an asynchronous, non blocking, high-performance event driven programming model
  2. The built-in fault tolerance mechanism allows the Actor to recover or reset in case of error
  3. Lightweight event processing (millions of actors per GB of heap memory. * lightweight event processing and heavyweight Division mainly depends on whether it depends on the operating system and hardware. Dependence is heavyweight and non dependence is lightweight
  4. It can build highly concurrent applications on a single machine or distributed applications in the network

Akka communication process

  1. Students create an actor system
  2. Create an actor ref message and send it to the teacher through actor Ref
  3. ActorRef sends the message to the message dispatcher
  4. The Message Dispatcher saves the messages to the MailBox of the target Actor in order
  5. Message Dispatcher puts the MailBox into a thread
  6. MailBox takes out the messages in order and finally passes them to the method received by TeacherActor

API introduction:

  • Actor system: responsible for creating and supervising actors
  1. Actor system is a singleton object through which many actors can be created
  2. Use context System can get the reference of the ActorSystem that manages the Actor
  • Implement Actor class
  1. Define class or singleton object to inherit Actor(import akka.actor.Actor)
  2. Implement the receive method to receive messages (without adding the loop & react method)
  3. You can implement the preStart() method (optional), which is executed after the Actor object is built and only once in the Actor life cycle
  • Load Actor
  1. To create Akka's Actor object, you must first create ActorSystem
  2. Call actorsystem Actorof (props (Actor object), "Actor name") to load Actor

Actor Path

  • Each Actor has a Path that can be referenced externally
typeroute
Local Actorakka://actorSystem Name / user/Actor name
Remote Actorakka.tcp://dest-actorSystem@ip address: port/user/Actor name

Introductory case

  • Load two actors (SenderActor & ReceiverActor) through the actor system, send messages from SenderActor, receive them in ReceiverActor, and then reply to messages

package com.akka.ex1

/**
 * Message format for submitting tasks
 *
 * @param msg Send message
 */
case class SubmitTaskMessage(msg: String)

/**
 * Format of receipt information after task submission is successful
 *
 * @param msg Receipt information
 */
case class SuccessSubmitTaskMessage(msg: String)


package com.akka.ex1

import akka.actor.Actor

object SenderActor extends Actor {
  override def receive: Receive = {
    // Receive the message sent by entry: start
    case "start" => {
      println("SenderActor, received: start!")
      // Gets the path of the ReceiverActor
      val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")
      // Send message to ReceiverActor
      receiverActor ! SubmitTaskMessage("Hello ReceiverActor!, This is SenderActor.")
    }
    // Receive the receipt information returned by the ReceiverActor
    case SuccessSubmitTaskMessage(msg) => println(s"SenderActor, received: ${msg}")
  }
}


package com.akka.ex1

import akka.actor.Actor

object ReceiverActor extends Actor {
  override def receive: Receive = {
    // Receive messages from SenderActor
    case SubmitTaskMessage(msg) => {
      println(s"ReceiverActor, received: ${msg}")
      // Reply to SenderActor
      sender ! SuccessSubmitTaskMessage("Hi!, This is ReceiverActor.")
    }
  }
}


package com.akka.ex1

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    /**
     * Create an Actor system to be responsible for creating and supervising actors
     *
     * @param name : scala.Predef.String Set a name for ActorSystem
     * @param config : com.typesafe.config.Config Configuration environment
     */
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())

    /**
     * Load custom Actor objects through ActorSystem
     *
     * @param props : akka.actor.Props Specifies the Actor companion objects to manage
     * @param name : scala.Predef.String Sets the name of the specified Actor object
     */
    val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")
    // You must set a name for each Actor, otherwise you cannot pass context. From inside SenderActor Obtain the object of the ReceiverActor in the Actor selection mode
    // The actor will be prompted[ akka://actorSystem/user/receiverActor ] was not delivered
    actorSystem.actorOf(Props(ReceiverActor), "receiverActor")

    // Send "start" string to SenderActor
    senderActor ! "start"
  }
}
SenderActor, received: start!
ReceiverActor, received: Hello ReceiverActor!, This is SenderActor.
SenderActor, received: Hi!, This is ReceiverActor.

Timed task case

  • Through actorsystem scheduler. Schedule() method to start the scheduled task
  • Usage 1:
	final def schedule(
		initialDelay : FiniteDuration, // Start for the first time, press the set time, and execute after delay
		interval : FiniteDuration, // How often is it executed (starting for the first time, immediately, without delay)
		receiver : ActorRef, // Set the Actor that the target receives the message
		message : Any) // Message to send
	(implicit executor : ExecutionContext, sender : ActorRef = {}) // Implicit parameter, need to import

  • Usage 2:
	final def schedule(
		initialDelay : FiniteDuration, // Start for the first time, press the set time, and execute after delay
		interval : FiniteDuration // How often is it executed (starting for the first time, immediately, without delay)
	)(f : => Unit) // Functions (messages) to be executed periodically
	(implicit executor : ExecutionContext) // Implicit parameter, need to import

package com.akka.ex2

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object MainActor {
  object ReceiverActor extends Actor {
    override def receive: Receive = {
      case x => println(x)
    }
  }

  def main(args: Array[String]): Unit = {
    // Create an Actor system to be responsible for creating and supervising actors
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    // Load custom Actor objects through ActorSystem
    val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")

    // Import implicit parameter & Transformation
    import actorSystem.dispatcher
    import scala.concurrent.duration._

    // Send messages to the ReceiverActor regularly through the timer
    // Method 1: adopt the message of Any data type parameter provided
    actorSystem.scheduler.schedule(3 seconds, 2 seconds, receiverActor, "Hello ReceiverActor!, 111.")

    // Method 2: messages using custom functions
    actorSystem.scheduler.schedule(0 seconds, 5 seconds) {
      receiverActor ! "Hello ReceiverActor!, 222."
    }
  }
}

Communication case between two processes

  1. The WorkerActor sends a "connect" message to the MasterActor
  2. MasterActor replies "success" message to WorkerActor
  3. The worker actor receives and prints the received message
package com.akka.master

import akka.actor.Actor

object MasterActor extends Actor {
  override def receive: Receive = {
    case "setup" => println("MasterActor started!")
    // Receive messages from WorkerActor
    case "connect" => {
      println("MasterActor, received: connect!")
      // Receipt information returned to the sender (WorkerActor)
      sender ! "success"
    }
  }
}

package com.akka.master

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")
    // Send message to MasterActor
    masterActor ! "setup"
  }
}


package com.akka.worker

import akka.actor.Actor

// Path of WorkerActor: akka tcp://actorSystem @127.0.0.1:8081/user/WorkerActor
object WorkerActor extends Actor {
  override def receive: Receive = {
    case "setup" => {
      println("WorkerActor started!")
      // Get MasterActor remotely
      val masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor")
      // Send the string connect to the MasterActor
      masterActor ! "connect"
    }
    // Receive messages from MasterActor
    case "success" => println("MasterActor, received: success!")
  }
}

package com.akka.worker

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Entrance {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")
    // Send message to WorkerActor
    workerActor ! "setup"
  }
}

Implementation case of simple Spark communication framework

  • Simulate the communication between Spark Master and multiple workers

  • Operation steps:
  1. Start MasterActor
    1.1 after the masteractor object is built, start the scheduled task (for self-test, to remove the overtime WorkerActor)

  2. Start WorkerActor
    2.1 after the workeractor object is built, encapsulate its own information into registration information and send it to MasterActor

  3. MasterActor receives the registration information of WorkerActor and saves it
    3.1 receipt information to WorkerActor

  4. After the WorkerActor requests registration, it receives the information and prints Connection is successful!
    4.1 start the scheduled task and send a heartbeat message to the MasterActor

  5. The MasterActor receives the heartbeat message from the WorkerActor and updates the last heartbeat time in the registration information of the WorkerActor to the current time

project nameexplain
scala-spark-akka-commonStore public message entity classes
scala-spark-akka-masterAkka Master node
scala-spark-akka-workerAkka Worker node
package com.akka.spark.common

/**
 * A class that holds information about registered workeractors
 *
 * @param workerId : WorkerActor Id(UUID) of
 * @param cpuCores : WorkerActor Number of CPU cores
 * @param memory : WorkerActor Memory size
 * @param lastHeartBeatTime : Last heartbeat time
 */
case class WorkerInfo(workerId: String, cpuCores: Int, memory: Int, lastHeartBeatTime: Long)


package com.akka.spark.common

/**
 * WorkerActor Class submitting registration information
 *
 * @param workerId : WorkerActor Id(UUID) of
 * @param cpuCores : WorkerActor Number of CPU cores
 * @param memory : WorkerActor Memory size
 */
case class WorkerRegisterMessage(workerId: String, cpuCores: Int, memory: Int)

/** The singleton object that receives the receipt after successful registration*/
case object RegisterSuccessMessage

/**
 * WorkerActor Information class that regularly triggers heartbeat to MasterActor
 *
 * @param workerId : WorkerActor Id(UUID) of
 * @param cpuCores : WorkerActor Number of CPU cores
 * @param memory : WorkerActor Memory size
 */
case class WorkerHeartBeatMessage(workerId: String, cpuCores: Int, memory: Int)


package com.akka.spark.master

import com.typesafe.config.{Config, ConfigFactory}

// Class used to read configuration file information
object ConfigUtils {
  // 1. Get configuration file object
  private val config: Config = ConfigFactory.load()
  // 2. Get the time interval to check the heartbeat of WorkerActor
  val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")
  // 3. Get WorkerActor heartbeat timeout
  val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout")
}


package com.akka.spark.master

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Master {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    actorSystem.actorOf(Props(MasterActor), "masterActor")
  }
}


package com.akka.spark.master

import java.util.Date
import akka.actor.Actor
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerInfo, WorkerRegisterMessage}

object MasterActor extends Actor {
  // 1. Define a variable Map set to save the registered WorkerActor information
  private val regWorkerMap = scala.collection.mutable.Map[String, WorkerInfo]()

  // MasterActor regularly checks the heartbeat of WorkerActor and removes the workers that have timed out
  override def preStart(): Unit = {
    // 1. Import time implicit parameter & conversion
    import context.dispatcher
    import scala.concurrent.duration._

    // 2. Start the scheduled task (MasterActor self test removes the overtime WorkerActor)
    context.system.scheduler.schedule(0 seconds, ConfigUtils.`master.check.heartbeat.interval` seconds) {
      // 3. Workeractor that filters timeout (return the filtered workerId set)
      val timeOutWorkerMap = regWorkerMap.filter {
        keyVal => // Data format of keyVal: workerid - > workerinfo (workerid, cpucores, memory, lastheartbeattime)
          // 3.1 get the last heartbeat time of the current WorkerActor object
          val lastHeartBeatTime = keyVal._2.lastHeartBeatTime
          // 3.2 if timeout occurs, true; otherwise, false (current time - last heartbeat time) > maximum timeout * 1000
          if ((new Date().getTime - lastHeartBeatTime) > (ConfigUtils.`master.check.heartbeat.timeout` * 1000)) true else false
      }

      // 4. Set of timed out workerids to be removed
      if (!timeOutWorkerMap.isEmpty) {
        regWorkerMap --= timeOutWorkerMap.map(_._1) // ArrayBuffer(5b9feb50-5c33-496b-a325-dd168360281b)
      }

      // 5. Valid workeractors are arranged in descending order according to the memory size
      val workerList = regWorkerMap.map(_._2).toList.sortBy(_.memory).reverse
      println(s"Active WorkerActors: ${workerList}")
    }
  }

  override def receive: Receive = {
    // Receive the registration information of WorkerActor
    case WorkerRegisterMessage(workerId, cpuCores, memory) => {
      // Print the received registration information
      println(s"MasterActor, received info: ${workerId}, ${cpuCores}, ${memory}")

      // Save the registration information to the hash table & and record the last heartbeat time
      regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)

      // Receipt information for registered WorkerActor
      sender ! RegisterSuccessMessage
    }

    // Receive heartbeat message from WorkerActor
    case WorkerHeartBeatMessage(workerId, cpuCores, memory) => {
      println(s"MasterActor, received heartbeat: ${workerId}")
      // Updates the last heartbeat time of the specified WorkerActor object
      regWorkerMap += workerId -> WorkerInfo(workerId, cpuCores, memory, new Date().getTime)
    }
  }
}


package com.akka.spark.worker

import com.typesafe.config.{Config, ConfigFactory}

// Class used to read configuration file information
object ConfigUtils {
  // 1. Get configuration file object
  private val config: Config = ConfigFactory.load()
  // 2. Get the heartbeat interval of WorkerActor
  val `worker.heartbeat.interval`: Int = config.getInt("worker.heartbeat.interval")
}


package com.akka.spark.worker

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Worker {
  def main(args: Array[String]): Unit = {
    val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())
    actorSystem.actorOf(Props(WorkerActor), "workerActor")
  }
}


package com.akka.spark.worker

import java.util.UUID
import akka.actor.{Actor, ActorSelection}
import com.akka.spark.common.{RegisterSuccessMessage, WorkerHeartBeatMessage, WorkerRegisterMessage}
import scala.util.Random

object WorkerActor extends Actor {
  // Represents a reference to the MasterActor
  private var masterActor: ActorSelection = _
  // Registration information of WorkerActor
  private var workerId: String = _
  private var cpuCores: Int = _ // Number of CPU cores
  private var memory: Int = _ // Memory size
  private val cpuCoreList = List(1, 2, 3, 4, 6, 8) // Random range of CPU cores
  private val memoryList = List(512, 1024, 2048, 4096) // Random range of memory size

  override def preStart(): Unit = {
    // Gets a reference to the MasterActor
    masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8080/user/masterActor")

    // Random setting number
    workerId = UUID.randomUUID().toString

    // Randomly select the number of CPU cores and memory size of WorkerActor
    val r = new Random()
    cpuCores = cpuCoreList(r.nextInt(cpuCoreList.length))
    memory = memoryList(r.nextInt(memoryList.length))

    // Encapsulate the registration information of WorkerActor
    val registerMessage = WorkerRegisterMessage(workerId, cpuCores, memory)
    // Send to MasterActor
    masterActor ! registerMessage
  }

  override def receive: Receive = {
    // Receipt information of successful registration
    case RegisterSuccessMessage => {
      println("Connection is successful!")

      // 1. Import time implicit parameter & conversion
      import context.dispatcher
      import scala.concurrent.duration._

      // Send heartbeat message to MasterActor regularly
      context.system.scheduler.schedule(0 seconds, ConfigUtils.`worker.heartbeat.interval` seconds) {
        masterActor ! WorkerHeartBeatMessage(workerId, cpuCores, memory)
      }
    }
  }
}

If you feel helpful, welcome to like it ~ thank you!!

Keywords: Scala Back-end

Added by reeferd on Sat, 12 Feb 2022 12:37:20 +0200