Akka series (7): Akka persistence of Actor persistence

This time, I mention this part of the content to now, because the project developed during this period has just met some difficulties in this area, so I am ready to share my experience with you. When we use Akka, we often encounter some scenarios to store the internal state of Actor. In the normal operation of the system, we do not need to worry about anything, but when the system goes wrong, such as Actor error. We need to restart, or memory overflow, or the whole system crashes. If we do not take a certain plan, the state of the actor will be lost when the system restarts, which will lead to the loss of some key data, resulting in inconsistencies in the system data. As a mature production environment application, Akka provides us with the corresponding solution is Akka persistence.

Why do you need persistent Actor s?

The consistency of data is the eternal theme. A system with good performance can not guarantee the correctness of data, nor can it be called a good system. When a system is running, it will inevitably make mistakes. How to ensure that the system can recover data correctly after the error, so as not to make the data confused is a difficult problem. When using Actor model, we will have the idea that we should try not to operate the database if we can not operate the database (here we assume that our database is safe, reliable, and can ensure the correctness and consistency of data, such as using a cloud database in China). On the one hand, if a large number of data operations will make the database face tremendous pressure. On the other hand, even if the database can handle it, such as some count, the large table operation of update will consume a lot of time, far less fast than the direct operation in memory, greatly affecting performance. But some people say that several people operate memory so fast, why not put all the data in memory? The answer is obvious. When a machine crashes or a memory overflow occurs, the data is likely to be lost and cannot be recovered. In this context, whether we have a better solution that can meet the needs and minimize performance consumption, the answer is the Akka persistence we mentioned above.

Core Architecture of Akka persistence

Before we go into Akka persistence, we can first understand its core design concept. In fact, we can use some thing s to restore the status of Actor. Things here can be logs, database data, or files. So the essence of Akka persistence is very easy to understand. We can save some data when Actor processes. When restoring, it can restore its own state according to these data.

So Akka persistence has the following key components:

  • Persistent Actor: Any actor that needs persistence must inherit it and must define or implement three key attributes:

 def persistenceId = "example" //As the only representation of a persistent Actor, it is used for persistence or query

 def receiveCommand: Receive = ??? //Actor handles message logic at normal runtime, where it can persist the message it wants

 def receiveRecover: Receive = ??? //Actor restart recovery is the logic of execution

In addition to receiveCommand similarities, two other attributes must be implemented in comparison with ordinary Actor s.
Another two key concepts in persistent Actor are Journal and Snapshot, the former for persistent events and the latter for keeping snapshots of the Actor, which play a crucial role in restoring the Actor's state.

demo of Akka persistence

Here I will first use a demo to let you have a certain understanding of the use of Akka persistence, and can roughly understand its working principle, and then continue to explain some practical problems that may be encountered.

Suppose there is such a scenario now. Now suppose there is a big red envelope of 1w yuan. Many people may grab it at the same time in an instant. The amount grabbed by each person may be different. The scenario is very simple and there are many ways to realize it. But the premise is to ensure the correctness of the data, such as the most common guarantee of using the database, but the students who know something about it know that this is not the same. It's a good solution because of the need for locks and a large number of database operations, resulting in poor performance. Can we use Actor to achieve this requirement? The answer is, of course.

Let's first define a lottery order.

case class LotteryCmd(
  userId: Long, // Participating User Id
  username: String, //Participating User Name
  email: String // Participating User Mailbox
)

Then we implement a raffle Actor and inherit Persistent Actor to make the corresponding implementation:

case class LuckyEvent(  //Successful raffle events
    userId: Long,
    luckyMoney: Int
)
case class FailureEvent(  //Failure in the lottery
    userId: Long,
    reason: String
)
case class Lottery(
    totalAmount: Int,  //Total amount of red envelope
    remainAmount: Int  //Remaining red envelope amount
) {
  def update(luckyMoney: Int) = {
    copy(
      remainAmount = remainAmount - luckyMoney
    )
  }
}
class LotteryActor(initState: Lottery) extends PersistentActor with ActorLogging{
  override def persistenceId: String = "lottery-actor-1"

  var state = initState  //Initialize Actor's state

  override def receiveRecover: Receive = {
    case event: LuckyEvent =>
      updateState(event)  //Restore Actor state based on persistent events when restoring Actor
    case SnapshotOffer(_, snapshot: Lottery) =>
      log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
      state = snapshot //Restoring Actor's state with snapshots
    case RecoveryCompleted => log.info("the actor recover completed")
  }

  def updateState(le: LuckyEvent) =
    state = state.update(le.luckyMoney)  //Update your status

  override def receiveCommand: Receive = {
    case lc: LotteryCmd =>
      doLottery(lc) match {     //Draw the lottery and get the result of the lottery. Make different treatment according to the result.
        case le: LuckyEvent =>  //Draw random red envelopes
          persist(le) { event =>
            updateState(event)
            increaseEvtCountAndSnapshot()
            sender() ! event
          }
        case fe: FailureEvent =>  //The red envelope has been exhausted
          sender() ! fe
      }
    case "saveSnapshot" =>  // Receive Storage Snapshot Command and Execute Storage Snapshot Operation
      saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>  ???  //You can do something after the snapshot has been stored successfully, such as deleting the snapshot before.
  }

  private def increaseEvtCountAndSnapshot() = {
    val snapShotInterval = 5
    if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {  //When there are five persistent events, we store a snapshot of the current Actor state
      self ! "saveSnapshot"
    }
  }

  def doLottery(lc: LotteryCmd) = {  //Realization of Drawing Logic
    if (state.remainAmount > 0) {
      val luckyMoney = scala.util.Random.nextInt(state.remainAmount) + 1
      LuckyEvent(lc.userId, luckyMoney)
    }
    else {
      FailureEvent(lc.userId, "Next time you come early, the red envelope has been smoked out!")
    }
  }
}

The program is very simple. I also give comments on the key points. I believe that if you know something about Actor, it is easy to understand. Of course, if you have some doubts, you can see the article I wrote before. Now we will test the red envelope Actor just written:

object PersistenceTest extends App {
  val lottery = Lottery(10000,10000)
  val system = ActorSystem("example-05")
  val lotteryActor = system.actorOf(Props(new LotteryActor(lottery)), "LotteryActor-1")  //Create Draw Actor
  val pool: ExecutorService = Executors.newFixedThreadPool(10)
  val r = (1 to 100).map(i =>
    new LotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan","xx@gmail.com"))  //Create 100 lottery requests
  )
  r.map(pool.execute(_))  //Use thread pool to initiate lottery requests and simulate multiple participants at the same time
  Thread.sleep(5000)
  pool.shutdown()
  system.terminate()
}

class LotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extends Runnable { //Draw request
  implicit val timeout = Timeout(3.seconds)
  def run: Unit = {
    for {
      fut <- lotteryActor ? lotteryCmd
    } yield fut match {  //Show different lottery results according to different events
      case le: LuckyEvent => println(s"Congratulations to users ${le.userId}Draw it ${le.luckyMoney}Yuan Hongbao")
      case fe: FailureEvent =>  println(fe.reason)
      case _ => println("System error, please re-extract")
    }
  }
}

Running the program, we may see the following results:

Next, I will give the steps of persistence actor in the whole operation process to help you understand its principle:

  • 1. Initialize Persistence Actor

    • 1.1 If it is initialized for the first time, it is consistent with the initialization of a normal Actor.

    • 1.2 If restart restore Actor, this is based on the persistent data recovery before Actor.

      • 1.2.1 Recovery from snapshots can quickly restore Actor, but not every persistent event will save the snapshot, in the case of complete snapshots, Actor priority from the snapshot to restore its own state.

      • 1.2.2 Recovery from events (logs, database records, etc.) and restoring Actor status by replaying persistent events are key.

  • 2. Receiving commands for processing, transforming them into events that need persistence (persistent events contain only critical data as far as possible), using persistence method of Persistence Actor (persist in the example above, I will talk about batch persistence later), and handling logical processing after successful persistence, such as modifying the status of Actor, sending messages to external Actor, etc. .

  • 3. If we need to store snapshots, we can actively specify the frequency of storing snapshots. For example, we store a snapshot 100 times after persistent events. This frequency should take into account the actual business scenario. We can also perform some operations after the snapshots are successfully stored.

Overall, the general operation of the Persistence Actor runtime is the above, of course, how it persists events, how it restores the mechanism, etc. Interesting to see the Akka source code.

Relevant configuration using Akka persistence

First, we must load the corresponding dependency packages and add the following dependencies to bulid.sbt:

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.16",  //Akka actor core dependencies
  "com.typesafe.akka" %% "akka-persistence" % "2.4.16", //Akka persistence dependence
  "org.iq80.leveldb"            % "leveldb"          % "0.7", //leveldb java version dependency
  "org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8", //leveldb java version dependency
  "com.twitter"              %% "chill-akka"                  % "0.8.0" //Event serialization dependency
)

In addition, we need to add the following configuration to application.conf:

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "log/journal"
akka.persistence.snapshot-store.local.dir = "log/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false  //Because we don't have leveldb installed locally, this property is set to false, but production environments are not recommended.

akka.actor.serializers {
  kryo = "com.twitter.chill.akka.AkkaSerializer"
}

akka.actor.serialization-bindings {
  "scala.Product" = kryo
  "akka.persistence.PersistentRepr" = kryo
}

So far, our entire Akka persistence demo has been built and can run normally. Interested students can download the source code. Source Link

Akka persistence advancement

1. Persistent plug-ins

Some students may ask, I am not very familiar with leveldb, or feel that stand-alone storage is not safe, do you have plug-ins to support distributed data storage, such as a father's cloud database? Of course, the answer is yes, conscience of course, I help you all find it.

  • 1.akka-persistence-sql-async: supports MySQL and PostgreSQL, and uses full asynchronous database driver to provide asynchronous and non-blocking API. Our company uses its variant, Flying 6. Project address

  • 2.akka-persistence-cassandra: The officially recommended plug-in, using the Cassandra database with very very very very fast write performance, is one of the more popular plug-ins, and it also supports persistence query. Project address

  • 3.akka-persistence-redis: redis should also be in line with the scene of Akka persistence. Students familiar with redis can use it. Project address

  • 4.akka-persistence-jdbc: How can JDBC be reduced? Otherwise, how can I be right about my dad who supports scala and java? Project address

The specific use of the corresponding plug-in can be seen in the specific introduction of the project, I see that it is relatively easy.

2. Batch persistence

As mentioned above, our company uses akka-persistence-sql-async plug-in, so we persist events and snapshots to the database. At first, like demo above, every event will be persisted to the database, but later in the performance test, because of its business scenario, the pressure on the database is relatively high, when the database reaches 1000 + read and write per second. Later, it will be explained that a cloud database is used with more than 15 ms of persistence time. In this way, the Actor can only handle 60 to 70 persistent events per second, while the actual business scenario requires the Actor to return the processing results within 3 seconds, which results in a large number of message processing timeouts without feedback. In addition, there are many other problems. A large number of messages can not be processed, resulting in a sharp increase in system errors and a decline in user experience. Now that we have found problems, can we optimize them? In fact, of course, since single insert is slow, can we insert them in batches? Akka persistence provides us with persistAll method. Now I will modify the demo above to make it change. Batch persistence:

class LotteryActorN(initState: Lottery) extends PersistentActor with ActorLogging{
  override def persistenceId: String = "lottery-actor-2"

  var state = initState  //Initialize Actor's state

  override def receiveRecover: Receive = {
    case event: LuckyEvent =>
      updateState(event)  //Restore Actor state based on persistent events when restoring Actor
    case SnapshotOffer(_, snapshot: Lottery) =>
      log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")
      state = snapshot //Restoring Actor's state with snapshots
    case RecoveryCompleted => log.info("the actor recover completed")
  }

  def updateState(le: LuckyEvent) =
    state = state.update(le.luckyMoney)  //Update your status

  var lotteryQueue : ArrayBuffer[(LotteryCmd, ActorRef)] = ArrayBuffer()

  context.system.scheduler  //Timer, timing trigger lottery logic
    .schedule(
      0.milliseconds,
      100.milliseconds,
      new Runnable {
        def run = {
          self ! "doLottery"
        }
      }
    )

  override def receiveCommand: Receive = {
    case lc: LotteryCmd =>
      lotteryQueue = lotteryQueue :+ (lc, sender())  //Participate in information to join the lottery queue
      println(s"the lotteryQueue size is ${lotteryQueue.size}")
      if (lotteryQueue.size > 5)  //Trigger the draw when there are five participants
        joinN(lotteryQueue)
    case "doLottery" =>
      if (lotteryQueue.size > 0)
        joinN(lotteryQueue)
    case "saveSnapshot" =>  // Receive Storage Snapshot Command and Execute Storage Snapshot Operation
      saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>  ???  //You can do something after the snapshot has been stored successfully, such as deleting the snapshot before.
  }

  private def joinN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = {  //Batch processing of lottery results
    val rs = doLotteryN(lotteryQueue)
    val success = rs.collect {  //Relevant information on winning the prize
      case (event: LuckyEvent, ref: ActorRef) =>
        event -> ref
    }.toMap
    val failure = rs.collect {  //Get the information about the failure to win the prize
      case (event: FailureEvent, ref: ActorRef) => event -> ref
    }
    persistAll(success.keys.toIndexedSeq) {  //Batch Persistence Winning User Events
      case event =>  println(event)
        updateState(event)
        increaseEvtCountAndSnapshot()
        success(event) ! event
    }
    failure.foreach {
      case (event, ref) => ref ! event
    }
    this.lotteryQueue.clear()  //Clear the queue
  }


  private def increaseEvtCountAndSnapshot() = {
    val snapShotInterval = 5
    if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) {  //When there are five persistent events, we store a snapshot of the current Actor state
      self ! "saveSnapshot"
    }
  }

  private def doLotteryN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = {  //Realization of Drawing Logic
    var remainAmount = state.remainAmount
    lotteryQueue.map(lq =>
      if (remainAmount > 0) {
        val luckyMoney = scala.util.Random.nextInt(remainAmount) + 1
        remainAmount = remainAmount - luckyMoney
        (LuckyEvent(lq._1.userId, luckyMoney),lq._2)
      }
      else {
        (FailureEvent(lq._1.userId, "Next time you come early, the red envelope has been smoked out!"),lq._2)
      }
    )
  }
}

This is a modified participatory Actor, which achieves batch persistence. Of course, in order to return messages to the sender, the processing logic is slightly more complex, but the real scenario may be more complex, and the relevant source code is also on the project just now.

3.Persistence Query

In addition, Akka Persistence also provides a Query interface for querying persistent events. This part of the content may be based on the actual business scenario to consider whether it needs to be applied. I will not expand on it. In addition, I also wrote a small demo in the project. Students who want to try it can also try it.

Keywords: Java snapshot Database Scala

Added by Sianide on Wed, 12 Jun 2019 01:39:59 +0300