• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala PersistentActor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.persistence.PersistentActor的典型用法代码示例。如果您正苦于以下问题:Scala PersistentActor类的具体用法?Scala PersistentActor怎么用?Scala PersistentActor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了PersistentActor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Counter

//设置package包名称以及导入依赖的类
package reactive4.performance

import akka.persistence.{PersistentActor, RecoveryCompleted}

class Counter extends PersistentActor {
  import Counter._

  override def persistenceId: String = "test-persistent-actor-counter"

  var state = CounterState(0)
  val objectCreatedTime = System.nanoTime()

  def updateState(event: IncrementedEvent) = this.state = CounterState(state.counter + 1)

  override def receiveRecover: Receive = {
    case event: IncrementedEvent => updateState(event)
    case event: RecoveryCompleted => println(s"Recovery time in seconds: ${(System.nanoTime() - objectCreatedTime).toSeconds}")
  }

  override def receiveCommand: Receive = {
    case IncrementCommand => persist(IncrementedEvent())(updateState)
    case PrintCounter => {
      println(s"Counter value: ${state.counter}")
      println(s"Write time in seconds: ${(System.nanoTime() - objectCreatedTime).toSeconds}")
    }
  }

}
object Counter {

  case object IncrementCommand
  case object PrintCounter
  case class IncrementedEvent()

  case class CounterState(counter: Long)

  implicit class Nanoseconds(nanoseconds: Long) {
    def toSeconds = nanoseconds / 1000000000d
  }
} 
开发者ID:Passarinho4,项目名称:reactive-lab4,代码行数:41,代码来源:Counter.scala


示例2: commandHandler

//设置package包名称以及导入依赖的类
package lib.tools

import akka.actor.ActorLogging
import akka.persistence.PersistentActor
import lib.models.PayLoad

import scala.concurrent.duration._


trait Monitor extends PersistentActor with ActorLogging {
  context.setReceiveTimeout(120.seconds)

  final val persistenceId: String = self.path.name
  def commandHandler: Receive
  def recoverHandler:Receive
  final def receiveRecover: Receive = defaultRecoverHandler orElse recoverHandler
  final def receiveCommand: Receive = defaultCommandHandler orElse commandHandler
  final def defaultCommandHandler: Receive = {
    case payLoad: PayLoad => log.warning(s"No command handler for Payload [$payLoad] for the monitor $persistenceId")
  }
  final def defaultRecoverHandler: Receive = {
    case msg => log.warning(s"No recover handler for msg [$msg] for the monitor $persistenceId")
  }
} 
开发者ID:sharma-rohit,项目名称:mormont,代码行数:25,代码来源:Monitor.scala


示例3: ChunkedHermesGameFileEntries

//设置package包名称以及导入依赖的类
package proton.game.hermes

import java.util.UUID

import akka.actor.ActorLogging
import akka.event.LoggingReceive
import akka.persistence.PersistentActor

import scala.collection.mutable.ListBuffer

object ChunkedHermesGameFileEntries {
  trait EntriesMessage
  case class AppendEntries(entries: Seq[HermesGameFileEntry]) extends EntriesMessage
  case class GetEntries() extends EntriesMessage

  trait EntriesEvent
  case class EntriesAppended(entries: Seq[HermesGameFileEntry]) extends EntriesEvent

  trait EntriesResult
  case class EntriesAppendedResult(id: UUID, count: Int) extends EntriesResult
  case class GetEntriesResult(id: UUID, entries: Seq[HermesGameFileEntry]) extends EntriesResult

  val gameFileEntriesRegionName = "hermesGameFileEntries"
}

class ChunkedHermesGameFileEntries(moduleSettings: HermesGameTickerModuleSettings) extends PersistentActor with ActorLogging {
  import context._
  import ChunkedHermesGameFileEntries._

  private val _id: UUID = UUID.fromString(self.path.name)
  private val _entries = new ListBuffer[HermesGameFileEntry]()

  setReceiveTimeout(moduleSettings.chunkedTimeout)

  override def receiveRecover: Receive = {
    case event: EntriesEvent => updateState(event)
  }

  def updateState(e: EntriesEvent) = e match {
    case EntriesAppended(entries) => _entries ++= entries
  }

  override def receiveCommand: Receive = LoggingReceive {
    case AppendEntries(entries) =>
      if (entries.nonEmpty) {
        persist(EntriesAppended(entries))(e => {
          updateState(e)
          sender ! EntriesAppendedResult(_id, entries.size)
        })
      } else {
        sender ! EntriesAppendedResult(_id, 0)
      }
    case GetEntries() => sender ! GetEntriesResult(_id, _entries)
  }

  override def persistenceId: String = "hermes-game-file-entries-" + _id.toString
} 
开发者ID:Morgan-Stanley,项目名称:proton,代码行数:58,代码来源:ChunkedHermesGameFileEntries.scala


示例4: AuthActor

//设置package包名称以及导入依赖的类
package antikkor.example.auth

import akka.persistence.PersistentActor
import Model._

class AuthActor extends PersistentActor with AuthAdapterActor {
  override val persistenceId: String        = "AuthActor"
  private  var authenticator: Authenticator = Authenticator()

  override def receiveCommand: Receive = {
    case Authenticate(user: User) => authenticate(user)
    case Verify(token) => verify(token)
    case Invalidate(token) => invalidate(token)
  }

  private def authenticate(user: User): Unit =
    if (authenticator.isValidUser(user)) {
      persist(Authenticated(user, authenticator.nextToken)) { event =>
        updateState(event)
        sender() ! translate(event)
      }
    } else {
      sender() ! translate(InvalidUser(user))
    }

  private def verify(token: Token): Unit =
    authenticator.validateToken(token) match {
      case None => sender() ! translate(InvalidToken(token))
      case Some(user) => sender() ! translate(Verified(user, token))
    }

  private def invalidate(token: Token): Unit =
    authenticator.validateToken(token) match {
      case None => sender() ! translate(InvalidToken(token))
      case Some(user) => persist(Invalidated(user, token)) { event =>
        updateState(event)
        sender() ! translate(event)
      }
    }

  private def updateState(event: Event): Unit = event match {
    case Authenticated(user, token) => authenticator = authenticator.signIn(user, token)
    case Verified(user, token) => ()
    case Invalidated(user, token) => authenticator = authenticator.signOut(token)
  }

  override def receiveRecover: Receive = {
    case event: Event => updateState(event)
  }
} 
开发者ID:btlines,项目名称:antikkor,代码行数:51,代码来源:AuthActor.scala


示例5: BlogActor

//设置package包名称以及导入依赖的类
package antikkor.example.blog

import akka.persistence.PersistentActor
import antikkor.example.blog.Model._

class BlogActor extends PersistentActor with BlogAdapterActor {
  var posts: List[Post] = Nil

  override def persistenceId: String = "BlogActor"

  override def receiveCommand: Receive = {
    case AllPosts => sender() ! translate(Posts(posts))
    case Publish(post) => persist(Published(post)) { event =>
      updateState(event)
      sender() ! translate(event)
    }
  }

  override def receiveRecover: Receive = {
    case event: Published => updateState(event)
  }

  private def updateState(event: Published): Unit = posts ::= event.post
} 
开发者ID:btlines,项目名称:antikkor,代码行数:25,代码来源:BlogActor.scala


示例6: CustomerAccountActor

//设置package包名称以及导入依赖的类
package sk.bsmk.akka.actors.customers

import akka.actor.{ActorLogging, Props}
import akka.persistence.{PersistentActor, SnapshotOffer}

class CustomerAccountActor(val persistenceId: String) extends PersistentActor with ActorLogging {

  var state: CustomerAccount = CustomerAccount.initial

  override def receiveRecover: Receive = {
    case [email protected](actualPointBalance) =>
      log.debug("recovering with event: {}", event)
      state = state.copy(pointBalance = actualPointBalance)

    case [email protected](_, snapshot: CustomerAccount) =>
      log.debug("recovering with snapshot: {}", offer)
      state = snapshot
  }

  override def receiveCommand: Receive = {

    case AddPointsCommand(pointsToAdd) =>
      persist(PointsChangedEvent(state.pointBalance + pointsToAdd)) { event =>
        state = state.withPoints(event.actualPointBalance)
      }

    case ResetPointsCommand =>
      persist(PointsChangedEvent(0)) { _ =>
        state = state.resetPoints
      }

    case ChangeTierCommand(newTier) =>
      persist(TierChangedEvent(newTier)) { event =>
        state = state.withTier(event.actualTier)
      }

    case "snapshot" =>
      log.debug("saving snapshot")
      saveSnapshot(state)
    case "print" =>
      println(state)
  }

}

object CustomerAccountActor {

  def props(persistenceId: String): Props = Props(new CustomerAccountActor(persistenceId))

}

sealed trait CustomerAccountActorCommand

case class CreateAccountCommand(name: String)
case class AddPointsCommand(points: Int)
case object ResetPointsCommand

case class ChangeTierCommand(newTier: Tier)

case class PointsChangedEvent(actualPointBalance: Int)
case class TierChangedEvent(actualTier: Tier) 
开发者ID:bassmake,项目名称:hi-akka-persistence,代码行数:62,代码来源:CustomerAccountActor.scala


示例7: AbstractAccount

//设置package包名称以及导入依赖的类
package com.franklevering.banking.domain.model.account

import java.util.UUID

import akka.persistence.PersistentActor
import akka.persistence.journal.Tagged
import com.franklevering.common.domain.event.Event
import com.franklevering.ports.adapters.http.request.handlers.CreateCheckingsAccountResponse

abstract class AbstractAccount()

case class CheckingsAccountCreated(id: UUID, balance: BigDecimal = 0.0) extends Event
case class CreateCheckingsAccount(name: String)
case class CheckingsAccount(id: UUID, balance: BigDecimal = 0.0) extends AbstractAccount ()

case class SavingsAccountCreated(id: UUID, balance: BigDecimal = 0.0) extends Event
case class CreateSavingsAccount(id: UUID)
case class SavingsAccount(id: UUID, balance: BigDecimal = 0.0) extends AbstractAccount()

case class WithdrawMoney(id: UUID, amount: BigDecimal)
case class MoneyWithdrawn(id: UUID, amount: BigDecimal) extends Event

class Account(id: UUID) extends PersistentActor {

  var state: Option[AbstractAccount] = None
  var balance: BigDecimal = 0.0

  override def receiveRecover: Receive = {
    case checkingsAccountCreated: CheckingsAccountCreated => updateState(checkingsAccountCreated)
  }

  override def receiveCommand: Receive = {
    case createCheckingsAccount: CreateCheckingsAccount =>
      persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
        val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
        updateState(checkingsAccountCreatedEvent)
        sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
      }
    case withdrawMoney: WithdrawMoney =>
      if ((balance - withdrawMoney.amount) < 0) {
        throw new Exception("Not enough funds")
      }

      persist(Tagged(MoneyWithdrawn(withdrawMoney.id, withdrawMoney.amount), Set("withdrawn"))) { event =>
        updateState(event.payload.asInstanceOf[MoneyWithdrawn])
      }
  }

  def updateState(evt: Event): Unit = {
    evt match {
      case checkingsAccountCreated: CheckingsAccountCreated =>
        state = Some(CheckingsAccount(checkingsAccountCreated.id, checkingsAccountCreated.balance))
        balance = checkingsAccountCreated.balance
    }
  }

  override def persistenceId: String = s"account-$id"
} 
开发者ID:frankieleef,项目名称:banking,代码行数:59,代码来源:Account.scala


示例8: migrationName

//设置package包名称以及导入依赖的类
package im.actor.server.migrations

import java.time.Instant

import akka.actor.{ Actor, ActorLogging, ActorSystem }
import akka.http.scaladsl.util.FastFuture
import akka.persistence.PersistentActor
import akka.util.Timeout
import im.actor.config.ActorConfig
import im.actor.server.KeyValueMappings
import shardakka.{ InstantCodec, ShardakkaExtension }

import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }

trait Migration {

  protected def migrationName: String

  protected def migrationTimeout: Duration

  protected def startMigration()(implicit system: ActorSystem): Future[Unit]

  def migrate()(implicit system: ActorSystem): Unit = {
    import system.dispatcher
    implicit val kvTimeout = Timeout(ActorConfig.defaultTimeout)
    val migrations = ShardakkaExtension(system).simpleKeyValue[Instant](KeyValueMappings.Migrations, InstantCodec)
    Await.result(migrations.get(migrationName) flatMap {
      case Some(date) ?
        system.log.debug(s"Migration $migrationName will not run. Already completed at $date")
        FastFuture.successful(())
      case _ ?
        system.log.warning(s"Migration $migrationName started")
        startMigration() flatMap { _ ?
          system.log.info(s"Migration $migrationName finished")
          migrations.upsert(migrationName, Instant.now())
        }
    } recover {
      case e ?
        system.log.error(e, s"Migration $migrationName failed!!!")
        throw e
    }, migrationTimeout)
  }
}

abstract class Migrator(promise: Promise[Unit]) extends Actor with ActorLogging {
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error(reason, "Migrator failure")
    super.preRestart(reason, message)
    promise.failure(reason)
  }
}

abstract class PersistentMigrator(promise: Promise[Unit]) extends Migrator(promise) with PersistentActor {
  override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = {
    super.onRecoveryFailure(cause, event)
    promise.failure(cause)
  }
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:60,代码来源:Migration.scala


示例9: ShiteTalker

//设置package包名称以及导入依赖的类
package com.mooneyserver.dublinpubs.shite_talking.protocol.actors

import akka.actor.ActorLogging
import akka.persistence.{PersistentActor, RecoveryCompleted}
import com.mooneyserver.dublinpubs.shite_talking.protocol.actors.models.{DrunkenWaffleReceived, DrunkenWaffle, ShiteTalkerIsYapping}

class ShiteTalker extends PersistentActor with ActorLogging {

  val persistenceId: String = self.path.name

  override def receiveRecover: Receive = {
    case a: Any => log.info(s"$persistenceId is recovering: with some message $a")
    case RecoveryCompleted => log.info(s"$persistenceId has fully recovered")
  }

  override def receiveCommand: Receive = {
    case ShiteTalkerIsYapping(_) => log.info(s"$persistenceId has been marked as Active")
    case msg: DrunkenWaffle if msg.wafflerId == persistenceId =>
      persistAsync(msg) {
        waffle =>
          log.info(s"$persistenceId has stored a message that it owns")
          sender() ! DrunkenWaffleReceived(msg.waffleId)
      }
    case msg: DrunkenWaffle if msg.targetOfAbuseId == persistenceId =>
      persistAsync(msg) {
        waffle =>
          log.info(s"$persistenceId has stored a message that it received")
          // if client is active then send to Client
          // else stash the messages in state
      }
    case a: Any => log.warning(s"Unexpected message received by Shite Talker: $a")
  }
} 
开发者ID:irishshagua,项目名称:dublin-pubs-shite-talking,代码行数:34,代码来源:ShiteTalker.scala


示例10: UserState

//设置package包名称以及导入依赖的类
package com.softwaremill.sandbox.application

import akka.actor.{ActorLogging, ActorRef, Status}
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
import akka.persistence.PersistentActor
import com.softwaremill.sandbox.application.UserActor.{CreateUser, UserCreated}
import [email protected]@

case class UserState(name: String)

class UserActor extends PersistentActor with ActorLogging {

  var userState: Option[UserState] = None

  override def receiveRecover: Receive = {
    case event: UserCreated => userState = Some(UserState(event.name))
  }
  override def receiveCommand: Receive = {
    case command: CreateUser =>
      val currentSender = sender()
      log.debug("creating used")
      Thread.sleep(2500)
      persist(UserCreated(command.name)) { e =>
        userState = Some(UserState(e.name))
        Thread.sleep(1500)
        log.debug("user created")
        currentSender ! Status.Success(e.name)
      }
  }
  override def persistenceId: String = s"UA-${self.path.name}"
}

trait UserCommand {
  def userId: String
}

object UserActor {
  case class CreateUser(userId: String, name: String) extends UserCommand
  case class UserCreated(name: String)

  trait UserRegionTag

  type UserRegion = ActorRef @@ UserRegionTag
}

class UserActorMessageExtractor extends HashCodeMessageExtractor(10) {

  override def entityId(message: Any): String = message match {
    case command: UserCommand => command.userId
  }
} 
开发者ID:aludwiko,项目名称:sandbox,代码行数:52,代码来源:UserActor.scala


示例11: HmdaPersistentActor

//设置package包名称以及导入依赖的类
package hmda.persistence.model

import java.util.concurrent.TimeUnit

import akka.actor.ReceiveTimeout
import akka.persistence.PersistentActor
import akka.stream.ActorMaterializer
import hmda.persistence.messages.CommonMessages.{ Event, Shutdown }
import hmda.persistence.PersistenceConfig._
import scala.concurrent.duration.Duration

abstract class HmdaPersistentActor extends PersistentActor with HmdaActor {

  implicit val system = context.system
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  def updateState(event: Event): Unit

  override def preStart(): Unit = {
    super.preStart()
    val timeout = configuration.getInt("hmda.persistent-actor-timeout")
    context.setReceiveTimeout(Duration.create(timeout, TimeUnit.SECONDS))
  }

  override def receiveCommand: Receive = {
    case ReceiveTimeout =>
      self ! Shutdown

    case Shutdown =>
      context stop self
  }

  override def receiveRecover: Receive = {
    case event: Event => updateState(event)
  }

} 
开发者ID:cfpb,项目名称:hmda-platform,代码行数:39,代码来源:HmdaPersistentActor.scala


示例12: UserManager

//设置package包名称以及导入依赖的类
package homeworkzen.domain.command.actor

import java.time.Instant
import java.util.UUID

import akka.actor.Props
import akka.persistence.PersistentActor
import homeworkzen.domain.command.message._
import homeworkzen.model._

import scala.collection.mutable

sealed class UserManager extends PersistentActor {
  private val usernameToEntry: mutable.Map[String, UserEntry] = mutable.Map.empty

  override def receiveRecover: Receive = {
    case userCreated: UserCreatedEvent => apply(userCreated)
  }

  private def apply(userCreated: UserCreatedEvent): Unit = {
    usernameToEntry += userCreated.userEntry.username -> userCreated.userEntry
  }

  override def receiveCommand: Receive = {
    case createUser: CreateUserCommand => handle(createUser)
  }

  private def handle(createUser: CreateUserCommand): Unit = {
    if (usernameToEntry.contains(createUser.username)) {
      sender ! CreateUserResult(createUser, Left(UsernameAlreadyExist))
    } else {
      val userId = UserId(UUID.randomUUID())
      val originalSender = sender
      persist(UserCreatedEvent(Instant.now(), UserEntry(userId, createUser.username, createUser.hashedPassword))) { event =>
        apply(event)
        originalSender ! CreateUserResult(createUser, Right(userId))
      }
    }
  }

  override def persistenceId: String = self.path.parent.name + "-" + self.path.name
} 
开发者ID:anopse,项目名称:HomeworkZen,代码行数:43,代码来源:UserManager.scala


示例13: UnitWorker

//设置package包名称以及导入依赖的类
package homeworkzen.domain.command.actor

import java.time.Instant

import akka.persistence.PersistentActor
import homeworkzen.domain.command.message._
import homeworkzen.model._

sealed class UnitWorker(userId: UserId, unitId: UnitId, maximumCapacity: Long, unitType: UnitType) extends PersistentActor {
  private var currentAmount: Long = 0

  override def receiveRecover: Receive = {
    case deposit: DepositEvent => apply(deposit)
    case withdraw: WithdrawEvent => apply(withdraw)
  }

  private def apply(deposit: DepositEvent): Unit = currentAmount = currentAmount + deposit.amountDeposited

  private def apply(withdraw: WithdrawEvent): Unit = currentAmount = currentAmount - withdraw.amountWithdrawn

  override def receiveCommand: Receive = {
    case deposit: DepositCommand => handle(deposit)
    case withdraw: WithdrawCommand => handle(withdraw)
  }

  private def handle(deposit: DepositCommand): Unit = {
    if (deposit.amountToDeposit <= 0) {
      sender ! DepositResult(deposit, Left(InvalidDepositAmount))
    } else if (currentAmount + deposit.amountToDeposit > maximumCapacity) {
      sender ! DepositResult(deposit, Left(DepositExceedCapacity))
    } else {
      val originalSender = sender
      persist(DepositEvent(Instant.now(), userId, unitId, deposit.amountToDeposit)) { event =>
        apply(event)
        originalSender ! DepositResult(deposit, Right(currentAmount))
      }
    }
  }

  private def handle(withdraw: WithdrawCommand): Unit = {
    if (withdraw.amountToWithdraw <= 0) {
      sender ! WithdrawResult(withdraw, Left(InvalidWithdrawAmount))
    } else if (withdraw.amountToWithdraw > currentAmount) {
      sender ! WithdrawResult(withdraw, Left(WithdrawExceedAvailableAmount))
    } else {
      val originalSender = sender
      persist(WithdrawEvent(Instant.now(), userId, unitId, withdraw.amountToWithdraw)) { event =>
        apply(event)
        originalSender ! WithdrawResult(withdraw, Right(currentAmount))
      }
    }
  }

  override def persistenceId: String = s"unitworker-${unitId.id}"
} 
开发者ID:anopse,项目名称:HomeworkZen,代码行数:56,代码来源:UnitWorker.scala


示例14: UserWorker

//设置package包名称以及导入依赖的类
package homeworkzen.domain.command.actor

import java.time.Instant
import java.util.UUID

import akka.actor.Props
import akka.persistence.PersistentActor
import homeworkzen.domain.command.message._
import homeworkzen.model._

sealed class UserWorker extends PersistentActor {
  override def receiveRecover: Receive = {
    case unitCreated: UnitCreatedEvent => apply(unitCreated)
  }

  private def apply(unitCreated: UnitCreatedEvent): Unit = {
    val props = Props(new UnitWorker(unitCreated.userId, unitCreated.unitId, unitCreated.maximumCapacity, unitCreated.unitType))
    context.actorOf(props, unitCreated.unitId.id.toString)
  }

  override def receiveCommand: Receive = {
    case createUnit: CreateUnitCommand => handle(createUnit)
    case unitCommand: UnitCommand => forward(unitCommand)
  }

  private def handle(createUnit: CreateUnitCommand): Unit = {
    if (createUnit.maximumCapacity <= 0) {
      sender ! CreateUnitResult(createUnit, Left(InvalidMaximumCapacityValue))
    } else {
      val unitId = UnitId(UUID.randomUUID())
      val originalSender = sender
      persist(UnitCreatedEvent(Instant.now(), createUnit.userId, unitId, createUnit.maximumCapacity, createUnit.unitType)) { event =>
        apply(event)
        originalSender ! CreateUnitResult(createUnit, Right(unitId))
      }
    }
  }

  private def forward(unitCommand: UnitCommand): Unit = {
    context.child(unitCommand.unitId.id.toString) match {
      case Some(child) => child forward unitCommand
      case None => sender ! unitCommand.unitForwardFailureMessage
    }
  }

  override def persistenceId: String =  self.path.parent.name + "-" + self.path.name
} 
开发者ID:anopse,项目名称:HomeworkZen,代码行数:48,代码来源:UserWorker.scala


示例15: PersistAsyncActor

//设置package包名称以及导入依赖的类
package akka.persistence.pg.perf

import akka.actor.{Props, ActorLogging}
import akka.persistence.PersistentActor
import akka.persistence.pg.perf.Messages.{Alter, Altered}

class PersistAsyncActor(override val persistenceId: String) extends PersistentActor with ActorLogging {

  override def receiveRecover: Receive = { case _ => }

  override def receiveCommand: Receive = {
    case Alter(txt) =>
      val created = System.currentTimeMillis()
      val events = 1 to 10 map { i =>
        Altered(s"${txt}_$i", created)
      }
      persistAllAsync(events) { _ => sender ! txt }
  }

}

object PersistAsyncActor {
  def props(persistenceId: String) = Props(new PersistAsyncActor(persistenceId))
} 
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:25,代码来源:PersistAsyncActor.scala


示例16: Borrower

//设置package包名称以及导入依赖的类
package com.abstractcode.bathurst

import akka.actor.{ActorLogging, Props, ReceiveTimeout}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.{PersistentActor, RecoveryCompleted}


class Borrower extends PersistentActor with ActorLogging {

  import Borrower._

  override def persistenceId: String = "borrower-" + self.path.name

  override def receiveRecover = {
    case e: BorrowerEvent => updateState(e)
    case RecoveryCompleted =>  }

  override def receiveCommand = {
    case CreateBorrower(catalogId, borrowerId, name) => persist(BorrowerCreated(catalogId, borrowerId, name))(e => {
      updateState(e)
      sender() ! e
    })
    case ReceiveTimeout ? context.parent ! Passivate(stopMessage = Stop)
    case Stop ? context.stop(self)
  }

  def updateState(e: BorrowerEvent) = e match {
    case i: BorrowerCreated =>
  }
}

object Borrower {
  sealed trait BorrowerCommand {
    val catalogId: Int
    val borrowerId: Int
  }

  final case class CreateBorrower(catalogId: Int, borrowerId: Int, name: String) extends BorrowerCommand

  sealed trait BorrowerEvent {
    val borrowerId: Int
  }

  final case class BorrowerCreated(catalogId: Int, borrowerId: Int, name: String) extends BorrowerEvent

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case i: BorrowerCommand => (s"${i.catalogId}_${i.borrowerId}", i)
  }

  val numberOfShards = 128

  val extractShardId: ShardRegion.ExtractShardId = {
    case i: BorrowerCommand => math.abs(s"${i.catalogId}_${i.borrowerId}".hashCode % numberOfShards).toString
  }

  def props() = Props(new Borrower())
} 
开发者ID:ColinScott,项目名称:bathurst,代码行数:59,代码来源:Borrower.scala


示例17: FriendActor

//设置package包名称以及导入依赖的类
package com.packt.chapter6

import akka.actor.{ActorLogging, Props}
import akka.persistence.{PersistentActor, Recovery, RecoveryCompleted, SnapshotOffer}

object FriendActor {
  def props(friendId: String, recoveryStrategy: Recovery) = Props(new FriendActor(friendId, recoveryStrategy))
}

class FriendActor(friendId: String, r: Recovery) extends PersistentActor with ActorLogging {
  override val persistenceId = friendId
  override val recovery = r

  var state = FriendState()
  def updateState(event: FriendEvent) = state = state.update(event)

  val receiveRecover: Receive = {
    case evt: FriendEvent =>
      log.info(s"Replaying event: $evt")
      updateState(evt)
    case SnapshotOffer(_, recoveredState : FriendState) =>
      log.info(s"Snapshot offered: $recoveredState")
      state = recoveredState
    case RecoveryCompleted => log.info(s"Recovery completed. Current state: $state")
  }

  val receiveCommand: Receive = {
    case AddFriend(friend) => persist(FriendAdded(friend))(updateState)
    case RemoveFriend(friend) => persist(FriendRemoved(friend))(updateState)
    case "snap" => saveSnapshot(state)
    case "print" => log.info(s"Current state: $state")
  }
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:34,代码来源:FriendActor.scala


示例18: StockPersistenceActor

//设置package包名称以及导入依赖的类
package com.packt.chapter6

import akka.actor.{ActorLogging, Props}
import akka.persistence.{PersistentActor, RecoveryCompleted}

object StockPersistenceActor {
  def props(stockId: String) = Props(new StockPersistenceActor(stockId))
}

class StockPersistenceActor(stockId: String) extends PersistentActor with ActorLogging {
  override val persistenceId = stockId

  var state = StockHistory()
  def updateState(event: ValueAppended) = state = state.update(event)

  val receiveRecover: Receive = {
    case evt: ValueAppended => updateState(evt)
    case RecoveryCompleted => log.info(s"Recovery completed. Current state: $state")
  }

  val receiveCommand: Receive = {
    case ValueUpdate(value) => persist(ValueAppended(StockValue(value)))(updateState)
    case "print" => log.info(s"Current state: $state")
  }

  override def postStop() = log.info(s"Stopping [${self.path}]")
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:28,代码来源:StockPersistenceActor.scala


示例19: SamplePersistenceActor

//设置package包名称以及导入依赖的类
package com.packt.chapter6

import akka.persistence.{PersistentActor, SnapshotOffer}

class SamplePersistenceActor extends PersistentActor {
  override val persistenceId = "unique-id-1"

  var state = ActiveUsers()
  def updateState(event: Event) = state = state.update(event)

  val receiveRecover: Receive = {
    case evt: Event => updateState(evt)
    case SnapshotOffer(_, snapshot: ActiveUsers) => state = snapshot
  }

  val receiveCommand: Receive = {
    case UserUpdate(userId, Add) => persist(AddUserEvent(userId))(updateState)
    case UserUpdate(userId, Remove) => persist(RemoveUserEvent(userId))(updateState)
    case "snap"  => saveSnapshot(state)
    case "print" => println(state)
    case ShutdownPersistentActor => context.stop(self)
  }

  override def postStop() = println(s"Stopping [${self.path}]")
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:26,代码来源:SamplePersistenceActor.scala


示例20: TodoRepositoryProcessor

//设置package包名称以及导入依赖的类
package com.todos.repository

import java.util.UUID

import akka.actor.{Actor, ActorLogging, Props}
import akka.persistence.{PersistentActor, Recovery}
import com.todos.command.{CreateTodo, RemoveTodo}
import com.todos.event.utils.ProcessedEvent
import com.todos.event.{TodoCreated, TodoRemoved}
import com.todos.response.Success

class TodoRepositoryProcessor() extends PersistentActor with ActorLogging {
  log.info("Started {}", self.path.name)

  def persistenceId: String = self.path.name

  def receiveCommand: Receive = {
    case cmd: CreateTodo =>
      persist(
        event = TodoCreated(
          id = UUID.randomUUID(),
          title = cmd.title,
          completed = cmd.completed
        )
      ) { persistedEvent =>
        sender() ! Success()

        context.system.eventStream.publish(ProcessedEvent(persistedEvent, lastSequenceNr))
      }
    case cmd: RemoveTodo =>
      persist(event = TodoRemoved(cmd.id)) { persistedEvent =>
        sender() ! Success()

        context.system.eventStream.publish(ProcessedEvent(persistedEvent, lastSequenceNr))
      }
  }

  override def recovery: Recovery = Recovery.none

  override def receiveRecover: Receive = Actor.emptyBehavior

}

object TodoRepositoryProcessor {
  val name: String = "todo-repository-processor"

  def props(): Props = {
    Props(
      classOf[TodoRepositoryProcessor]
    )
  }

} 
开发者ID:benniekrijger,项目名称:todo-service,代码行数:54,代码来源:TodoRepositoryProcessor.scala



注:本文中的akka.persistence.PersistentActor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Worker类代码示例发布时间:2022-05-23
下一篇:
Scala Logger类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap