本文整理汇总了Scala中akka.persistence.PersistentActor类的典型用法代码示例。如果您正苦于以下问题:Scala PersistentActor类的具体用法?Scala PersistentActor怎么用?Scala PersistentActor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了PersistentActor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Counter
//设置package包名称以及导入依赖的类
package reactive4.performance
import akka.persistence.{PersistentActor, RecoveryCompleted}
class Counter extends PersistentActor {
import Counter._
override def persistenceId: String = "test-persistent-actor-counter"
var state = CounterState(0)
val objectCreatedTime = System.nanoTime()
def updateState(event: IncrementedEvent) = this.state = CounterState(state.counter + 1)
override def receiveRecover: Receive = {
case event: IncrementedEvent => updateState(event)
case event: RecoveryCompleted => println(s"Recovery time in seconds: ${(System.nanoTime() - objectCreatedTime).toSeconds}")
}
override def receiveCommand: Receive = {
case IncrementCommand => persist(IncrementedEvent())(updateState)
case PrintCounter => {
println(s"Counter value: ${state.counter}")
println(s"Write time in seconds: ${(System.nanoTime() - objectCreatedTime).toSeconds}")
}
}
}
object Counter {
case object IncrementCommand
case object PrintCounter
case class IncrementedEvent()
case class CounterState(counter: Long)
implicit class Nanoseconds(nanoseconds: Long) {
def toSeconds = nanoseconds / 1000000000d
}
}
开发者ID:Passarinho4,项目名称:reactive-lab4,代码行数:41,代码来源:Counter.scala
示例2: commandHandler
//设置package包名称以及导入依赖的类
package lib.tools
import akka.actor.ActorLogging
import akka.persistence.PersistentActor
import lib.models.PayLoad
import scala.concurrent.duration._
trait Monitor extends PersistentActor with ActorLogging {
context.setReceiveTimeout(120.seconds)
final val persistenceId: String = self.path.name
def commandHandler: Receive
def recoverHandler:Receive
final def receiveRecover: Receive = defaultRecoverHandler orElse recoverHandler
final def receiveCommand: Receive = defaultCommandHandler orElse commandHandler
final def defaultCommandHandler: Receive = {
case payLoad: PayLoad => log.warning(s"No command handler for Payload [$payLoad] for the monitor $persistenceId")
}
final def defaultRecoverHandler: Receive = {
case msg => log.warning(s"No recover handler for msg [$msg] for the monitor $persistenceId")
}
}
开发者ID:sharma-rohit,项目名称:mormont,代码行数:25,代码来源:Monitor.scala
示例3: ChunkedHermesGameFileEntries
//设置package包名称以及导入依赖的类
package proton.game.hermes
import java.util.UUID
import akka.actor.ActorLogging
import akka.event.LoggingReceive
import akka.persistence.PersistentActor
import scala.collection.mutable.ListBuffer
object ChunkedHermesGameFileEntries {
trait EntriesMessage
case class AppendEntries(entries: Seq[HermesGameFileEntry]) extends EntriesMessage
case class GetEntries() extends EntriesMessage
trait EntriesEvent
case class EntriesAppended(entries: Seq[HermesGameFileEntry]) extends EntriesEvent
trait EntriesResult
case class EntriesAppendedResult(id: UUID, count: Int) extends EntriesResult
case class GetEntriesResult(id: UUID, entries: Seq[HermesGameFileEntry]) extends EntriesResult
val gameFileEntriesRegionName = "hermesGameFileEntries"
}
class ChunkedHermesGameFileEntries(moduleSettings: HermesGameTickerModuleSettings) extends PersistentActor with ActorLogging {
import context._
import ChunkedHermesGameFileEntries._
private val _id: UUID = UUID.fromString(self.path.name)
private val _entries = new ListBuffer[HermesGameFileEntry]()
setReceiveTimeout(moduleSettings.chunkedTimeout)
override def receiveRecover: Receive = {
case event: EntriesEvent => updateState(event)
}
def updateState(e: EntriesEvent) = e match {
case EntriesAppended(entries) => _entries ++= entries
}
override def receiveCommand: Receive = LoggingReceive {
case AppendEntries(entries) =>
if (entries.nonEmpty) {
persist(EntriesAppended(entries))(e => {
updateState(e)
sender ! EntriesAppendedResult(_id, entries.size)
})
} else {
sender ! EntriesAppendedResult(_id, 0)
}
case GetEntries() => sender ! GetEntriesResult(_id, _entries)
}
override def persistenceId: String = "hermes-game-file-entries-" + _id.toString
}
开发者ID:Morgan-Stanley,项目名称:proton,代码行数:58,代码来源:ChunkedHermesGameFileEntries.scala
示例4: AuthActor
//设置package包名称以及导入依赖的类
package antikkor.example.auth
import akka.persistence.PersistentActor
import Model._
class AuthActor extends PersistentActor with AuthAdapterActor {
override val persistenceId: String = "AuthActor"
private var authenticator: Authenticator = Authenticator()
override def receiveCommand: Receive = {
case Authenticate(user: User) => authenticate(user)
case Verify(token) => verify(token)
case Invalidate(token) => invalidate(token)
}
private def authenticate(user: User): Unit =
if (authenticator.isValidUser(user)) {
persist(Authenticated(user, authenticator.nextToken)) { event =>
updateState(event)
sender() ! translate(event)
}
} else {
sender() ! translate(InvalidUser(user))
}
private def verify(token: Token): Unit =
authenticator.validateToken(token) match {
case None => sender() ! translate(InvalidToken(token))
case Some(user) => sender() ! translate(Verified(user, token))
}
private def invalidate(token: Token): Unit =
authenticator.validateToken(token) match {
case None => sender() ! translate(InvalidToken(token))
case Some(user) => persist(Invalidated(user, token)) { event =>
updateState(event)
sender() ! translate(event)
}
}
private def updateState(event: Event): Unit = event match {
case Authenticated(user, token) => authenticator = authenticator.signIn(user, token)
case Verified(user, token) => ()
case Invalidated(user, token) => authenticator = authenticator.signOut(token)
}
override def receiveRecover: Receive = {
case event: Event => updateState(event)
}
}
开发者ID:btlines,项目名称:antikkor,代码行数:51,代码来源:AuthActor.scala
示例5: BlogActor
//设置package包名称以及导入依赖的类
package antikkor.example.blog
import akka.persistence.PersistentActor
import antikkor.example.blog.Model._
class BlogActor extends PersistentActor with BlogAdapterActor {
var posts: List[Post] = Nil
override def persistenceId: String = "BlogActor"
override def receiveCommand: Receive = {
case AllPosts => sender() ! translate(Posts(posts))
case Publish(post) => persist(Published(post)) { event =>
updateState(event)
sender() ! translate(event)
}
}
override def receiveRecover: Receive = {
case event: Published => updateState(event)
}
private def updateState(event: Published): Unit = posts ::= event.post
}
开发者ID:btlines,项目名称:antikkor,代码行数:25,代码来源:BlogActor.scala
示例6: CustomerAccountActor
//设置package包名称以及导入依赖的类
package sk.bsmk.akka.actors.customers
import akka.actor.{ActorLogging, Props}
import akka.persistence.{PersistentActor, SnapshotOffer}
class CustomerAccountActor(val persistenceId: String) extends PersistentActor with ActorLogging {
var state: CustomerAccount = CustomerAccount.initial
override def receiveRecover: Receive = {
case [email protected](actualPointBalance) =>
log.debug("recovering with event: {}", event)
state = state.copy(pointBalance = actualPointBalance)
case [email protected](_, snapshot: CustomerAccount) =>
log.debug("recovering with snapshot: {}", offer)
state = snapshot
}
override def receiveCommand: Receive = {
case AddPointsCommand(pointsToAdd) =>
persist(PointsChangedEvent(state.pointBalance + pointsToAdd)) { event =>
state = state.withPoints(event.actualPointBalance)
}
case ResetPointsCommand =>
persist(PointsChangedEvent(0)) { _ =>
state = state.resetPoints
}
case ChangeTierCommand(newTier) =>
persist(TierChangedEvent(newTier)) { event =>
state = state.withTier(event.actualTier)
}
case "snapshot" =>
log.debug("saving snapshot")
saveSnapshot(state)
case "print" =>
println(state)
}
}
object CustomerAccountActor {
def props(persistenceId: String): Props = Props(new CustomerAccountActor(persistenceId))
}
sealed trait CustomerAccountActorCommand
case class CreateAccountCommand(name: String)
case class AddPointsCommand(points: Int)
case object ResetPointsCommand
case class ChangeTierCommand(newTier: Tier)
case class PointsChangedEvent(actualPointBalance: Int)
case class TierChangedEvent(actualTier: Tier)
开发者ID:bassmake,项目名称:hi-akka-persistence,代码行数:62,代码来源:CustomerAccountActor.scala
示例7: AbstractAccount
//设置package包名称以及导入依赖的类
package com.franklevering.banking.domain.model.account
import java.util.UUID
import akka.persistence.PersistentActor
import akka.persistence.journal.Tagged
import com.franklevering.common.domain.event.Event
import com.franklevering.ports.adapters.http.request.handlers.CreateCheckingsAccountResponse
abstract class AbstractAccount()
case class CheckingsAccountCreated(id: UUID, balance: BigDecimal = 0.0) extends Event
case class CreateCheckingsAccount(name: String)
case class CheckingsAccount(id: UUID, balance: BigDecimal = 0.0) extends AbstractAccount ()
case class SavingsAccountCreated(id: UUID, balance: BigDecimal = 0.0) extends Event
case class CreateSavingsAccount(id: UUID)
case class SavingsAccount(id: UUID, balance: BigDecimal = 0.0) extends AbstractAccount()
case class WithdrawMoney(id: UUID, amount: BigDecimal)
case class MoneyWithdrawn(id: UUID, amount: BigDecimal) extends Event
class Account(id: UUID) extends PersistentActor {
var state: Option[AbstractAccount] = None
var balance: BigDecimal = 0.0
override def receiveRecover: Receive = {
case checkingsAccountCreated: CheckingsAccountCreated => updateState(checkingsAccountCreated)
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
updateState(checkingsAccountCreatedEvent)
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
case withdrawMoney: WithdrawMoney =>
if ((balance - withdrawMoney.amount) < 0) {
throw new Exception("Not enough funds")
}
persist(Tagged(MoneyWithdrawn(withdrawMoney.id, withdrawMoney.amount), Set("withdrawn"))) { event =>
updateState(event.payload.asInstanceOf[MoneyWithdrawn])
}
}
def updateState(evt: Event): Unit = {
evt match {
case checkingsAccountCreated: CheckingsAccountCreated =>
state = Some(CheckingsAccount(checkingsAccountCreated.id, checkingsAccountCreated.balance))
balance = checkingsAccountCreated.balance
}
}
override def persistenceId: String = s"account-$id"
}
开发者ID:frankieleef,项目名称:banking,代码行数:59,代码来源:Account.scala
示例8: migrationName
//设置package包名称以及导入依赖的类
package im.actor.server.migrations
import java.time.Instant
import akka.actor.{ Actor, ActorLogging, ActorSystem }
import akka.http.scaladsl.util.FastFuture
import akka.persistence.PersistentActor
import akka.util.Timeout
import im.actor.config.ActorConfig
import im.actor.server.KeyValueMappings
import shardakka.{ InstantCodec, ShardakkaExtension }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
trait Migration {
protected def migrationName: String
protected def migrationTimeout: Duration
protected def startMigration()(implicit system: ActorSystem): Future[Unit]
def migrate()(implicit system: ActorSystem): Unit = {
import system.dispatcher
implicit val kvTimeout = Timeout(ActorConfig.defaultTimeout)
val migrations = ShardakkaExtension(system).simpleKeyValue[Instant](KeyValueMappings.Migrations, InstantCodec)
Await.result(migrations.get(migrationName) flatMap {
case Some(date) ?
system.log.debug(s"Migration $migrationName will not run. Already completed at $date")
FastFuture.successful(())
case _ ?
system.log.warning(s"Migration $migrationName started")
startMigration() flatMap { _ ?
system.log.info(s"Migration $migrationName finished")
migrations.upsert(migrationName, Instant.now())
}
} recover {
case e ?
system.log.error(e, s"Migration $migrationName failed!!!")
throw e
}, migrationTimeout)
}
}
abstract class Migrator(promise: Promise[Unit]) extends Actor with ActorLogging {
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(reason, "Migrator failure")
super.preRestart(reason, message)
promise.failure(reason)
}
}
abstract class PersistentMigrator(promise: Promise[Unit]) extends Migrator(promise) with PersistentActor {
override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = {
super.onRecoveryFailure(cause, event)
promise.failure(cause)
}
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:60,代码来源:Migration.scala
示例9: ShiteTalker
//设置package包名称以及导入依赖的类
package com.mooneyserver.dublinpubs.shite_talking.protocol.actors
import akka.actor.ActorLogging
import akka.persistence.{PersistentActor, RecoveryCompleted}
import com.mooneyserver.dublinpubs.shite_talking.protocol.actors.models.{DrunkenWaffleReceived, DrunkenWaffle, ShiteTalkerIsYapping}
class ShiteTalker extends PersistentActor with ActorLogging {
val persistenceId: String = self.path.name
override def receiveRecover: Receive = {
case a: Any => log.info(s"$persistenceId is recovering: with some message $a")
case RecoveryCompleted => log.info(s"$persistenceId has fully recovered")
}
override def receiveCommand: Receive = {
case ShiteTalkerIsYapping(_) => log.info(s"$persistenceId has been marked as Active")
case msg: DrunkenWaffle if msg.wafflerId == persistenceId =>
persistAsync(msg) {
waffle =>
log.info(s"$persistenceId has stored a message that it owns")
sender() ! DrunkenWaffleReceived(msg.waffleId)
}
case msg: DrunkenWaffle if msg.targetOfAbuseId == persistenceId =>
persistAsync(msg) {
waffle =>
log.info(s"$persistenceId has stored a message that it received")
// if client is active then send to Client
// else stash the messages in state
}
case a: Any => log.warning(s"Unexpected message received by Shite Talker: $a")
}
}
开发者ID:irishshagua,项目名称:dublin-pubs-shite-talking,代码行数:34,代码来源:ShiteTalker.scala
示例10: UserState
//设置package包名称以及导入依赖的类
package com.softwaremill.sandbox.application
import akka.actor.{ActorLogging, ActorRef, Status}
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
import akka.persistence.PersistentActor
import com.softwaremill.sandbox.application.UserActor.{CreateUser, UserCreated}
import [email protected]@
case class UserState(name: String)
class UserActor extends PersistentActor with ActorLogging {
var userState: Option[UserState] = None
override def receiveRecover: Receive = {
case event: UserCreated => userState = Some(UserState(event.name))
}
override def receiveCommand: Receive = {
case command: CreateUser =>
val currentSender = sender()
log.debug("creating used")
Thread.sleep(2500)
persist(UserCreated(command.name)) { e =>
userState = Some(UserState(e.name))
Thread.sleep(1500)
log.debug("user created")
currentSender ! Status.Success(e.name)
}
}
override def persistenceId: String = s"UA-${self.path.name}"
}
trait UserCommand {
def userId: String
}
object UserActor {
case class CreateUser(userId: String, name: String) extends UserCommand
case class UserCreated(name: String)
trait UserRegionTag
type UserRegion = ActorRef @@ UserRegionTag
}
class UserActorMessageExtractor extends HashCodeMessageExtractor(10) {
override def entityId(message: Any): String = message match {
case command: UserCommand => command.userId
}
}
开发者ID:aludwiko,项目名称:sandbox,代码行数:52,代码来源:UserActor.scala
示例11: HmdaPersistentActor
//设置package包名称以及导入依赖的类
package hmda.persistence.model
import java.util.concurrent.TimeUnit
import akka.actor.ReceiveTimeout
import akka.persistence.PersistentActor
import akka.stream.ActorMaterializer
import hmda.persistence.messages.CommonMessages.{ Event, Shutdown }
import hmda.persistence.PersistenceConfig._
import scala.concurrent.duration.Duration
abstract class HmdaPersistentActor extends PersistentActor with HmdaActor {
implicit val system = context.system
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
def updateState(event: Event): Unit
override def preStart(): Unit = {
super.preStart()
val timeout = configuration.getInt("hmda.persistent-actor-timeout")
context.setReceiveTimeout(Duration.create(timeout, TimeUnit.SECONDS))
}
override def receiveCommand: Receive = {
case ReceiveTimeout =>
self ! Shutdown
case Shutdown =>
context stop self
}
override def receiveRecover: Receive = {
case event: Event => updateState(event)
}
}
开发者ID:cfpb,项目名称:hmda-platform,代码行数:39,代码来源:HmdaPersistentActor.scala
示例12: UserManager
//设置package包名称以及导入依赖的类
package homeworkzen.domain.command.actor
import java.time.Instant
import java.util.UUID
import akka.actor.Props
import akka.persistence.PersistentActor
import homeworkzen.domain.command.message._
import homeworkzen.model._
import scala.collection.mutable
sealed class UserManager extends PersistentActor {
private val usernameToEntry: mutable.Map[String, UserEntry] = mutable.Map.empty
override def receiveRecover: Receive = {
case userCreated: UserCreatedEvent => apply(userCreated)
}
private def apply(userCreated: UserCreatedEvent): Unit = {
usernameToEntry += userCreated.userEntry.username -> userCreated.userEntry
}
override def receiveCommand: Receive = {
case createUser: CreateUserCommand => handle(createUser)
}
private def handle(createUser: CreateUserCommand): Unit = {
if (usernameToEntry.contains(createUser.username)) {
sender ! CreateUserResult(createUser, Left(UsernameAlreadyExist))
} else {
val userId = UserId(UUID.randomUUID())
val originalSender = sender
persist(UserCreatedEvent(Instant.now(), UserEntry(userId, createUser.username, createUser.hashedPassword))) { event =>
apply(event)
originalSender ! CreateUserResult(createUser, Right(userId))
}
}
}
override def persistenceId: String = self.path.parent.name + "-" + self.path.name
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:43,代码来源:UserManager.scala
示例13: UnitWorker
//设置package包名称以及导入依赖的类
package homeworkzen.domain.command.actor
import java.time.Instant
import akka.persistence.PersistentActor
import homeworkzen.domain.command.message._
import homeworkzen.model._
sealed class UnitWorker(userId: UserId, unitId: UnitId, maximumCapacity: Long, unitType: UnitType) extends PersistentActor {
private var currentAmount: Long = 0
override def receiveRecover: Receive = {
case deposit: DepositEvent => apply(deposit)
case withdraw: WithdrawEvent => apply(withdraw)
}
private def apply(deposit: DepositEvent): Unit = currentAmount = currentAmount + deposit.amountDeposited
private def apply(withdraw: WithdrawEvent): Unit = currentAmount = currentAmount - withdraw.amountWithdrawn
override def receiveCommand: Receive = {
case deposit: DepositCommand => handle(deposit)
case withdraw: WithdrawCommand => handle(withdraw)
}
private def handle(deposit: DepositCommand): Unit = {
if (deposit.amountToDeposit <= 0) {
sender ! DepositResult(deposit, Left(InvalidDepositAmount))
} else if (currentAmount + deposit.amountToDeposit > maximumCapacity) {
sender ! DepositResult(deposit, Left(DepositExceedCapacity))
} else {
val originalSender = sender
persist(DepositEvent(Instant.now(), userId, unitId, deposit.amountToDeposit)) { event =>
apply(event)
originalSender ! DepositResult(deposit, Right(currentAmount))
}
}
}
private def handle(withdraw: WithdrawCommand): Unit = {
if (withdraw.amountToWithdraw <= 0) {
sender ! WithdrawResult(withdraw, Left(InvalidWithdrawAmount))
} else if (withdraw.amountToWithdraw > currentAmount) {
sender ! WithdrawResult(withdraw, Left(WithdrawExceedAvailableAmount))
} else {
val originalSender = sender
persist(WithdrawEvent(Instant.now(), userId, unitId, withdraw.amountToWithdraw)) { event =>
apply(event)
originalSender ! WithdrawResult(withdraw, Right(currentAmount))
}
}
}
override def persistenceId: String = s"unitworker-${unitId.id}"
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:56,代码来源:UnitWorker.scala
示例14: UserWorker
//设置package包名称以及导入依赖的类
package homeworkzen.domain.command.actor
import java.time.Instant
import java.util.UUID
import akka.actor.Props
import akka.persistence.PersistentActor
import homeworkzen.domain.command.message._
import homeworkzen.model._
sealed class UserWorker extends PersistentActor {
override def receiveRecover: Receive = {
case unitCreated: UnitCreatedEvent => apply(unitCreated)
}
private def apply(unitCreated: UnitCreatedEvent): Unit = {
val props = Props(new UnitWorker(unitCreated.userId, unitCreated.unitId, unitCreated.maximumCapacity, unitCreated.unitType))
context.actorOf(props, unitCreated.unitId.id.toString)
}
override def receiveCommand: Receive = {
case createUnit: CreateUnitCommand => handle(createUnit)
case unitCommand: UnitCommand => forward(unitCommand)
}
private def handle(createUnit: CreateUnitCommand): Unit = {
if (createUnit.maximumCapacity <= 0) {
sender ! CreateUnitResult(createUnit, Left(InvalidMaximumCapacityValue))
} else {
val unitId = UnitId(UUID.randomUUID())
val originalSender = sender
persist(UnitCreatedEvent(Instant.now(), createUnit.userId, unitId, createUnit.maximumCapacity, createUnit.unitType)) { event =>
apply(event)
originalSender ! CreateUnitResult(createUnit, Right(unitId))
}
}
}
private def forward(unitCommand: UnitCommand): Unit = {
context.child(unitCommand.unitId.id.toString) match {
case Some(child) => child forward unitCommand
case None => sender ! unitCommand.unitForwardFailureMessage
}
}
override def persistenceId: String = self.path.parent.name + "-" + self.path.name
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:48,代码来源:UserWorker.scala
示例15: PersistAsyncActor
//设置package包名称以及导入依赖的类
package akka.persistence.pg.perf
import akka.actor.{Props, ActorLogging}
import akka.persistence.PersistentActor
import akka.persistence.pg.perf.Messages.{Alter, Altered}
class PersistAsyncActor(override val persistenceId: String) extends PersistentActor with ActorLogging {
override def receiveRecover: Receive = { case _ => }
override def receiveCommand: Receive = {
case Alter(txt) =>
val created = System.currentTimeMillis()
val events = 1 to 10 map { i =>
Altered(s"${txt}_$i", created)
}
persistAllAsync(events) { _ => sender ! txt }
}
}
object PersistAsyncActor {
def props(persistenceId: String) = Props(new PersistAsyncActor(persistenceId))
}
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:25,代码来源:PersistAsyncActor.scala
示例16: Borrower
//设置package包名称以及导入依赖的类
package com.abstractcode.bathurst
import akka.actor.{ActorLogging, Props, ReceiveTimeout}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.{PersistentActor, RecoveryCompleted}
class Borrower extends PersistentActor with ActorLogging {
import Borrower._
override def persistenceId: String = "borrower-" + self.path.name
override def receiveRecover = {
case e: BorrowerEvent => updateState(e)
case RecoveryCompleted => }
override def receiveCommand = {
case CreateBorrower(catalogId, borrowerId, name) => persist(BorrowerCreated(catalogId, borrowerId, name))(e => {
updateState(e)
sender() ! e
})
case ReceiveTimeout ? context.parent ! Passivate(stopMessage = Stop)
case Stop ? context.stop(self)
}
def updateState(e: BorrowerEvent) = e match {
case i: BorrowerCreated =>
}
}
object Borrower {
sealed trait BorrowerCommand {
val catalogId: Int
val borrowerId: Int
}
final case class CreateBorrower(catalogId: Int, borrowerId: Int, name: String) extends BorrowerCommand
sealed trait BorrowerEvent {
val borrowerId: Int
}
final case class BorrowerCreated(catalogId: Int, borrowerId: Int, name: String) extends BorrowerEvent
val extractEntityId: ShardRegion.ExtractEntityId = {
case i: BorrowerCommand => (s"${i.catalogId}_${i.borrowerId}", i)
}
val numberOfShards = 128
val extractShardId: ShardRegion.ExtractShardId = {
case i: BorrowerCommand => math.abs(s"${i.catalogId}_${i.borrowerId}".hashCode % numberOfShards).toString
}
def props() = Props(new Borrower())
}
开发者ID:ColinScott,项目名称:bathurst,代码行数:59,代码来源:Borrower.scala
示例17: FriendActor
//设置package包名称以及导入依赖的类
package com.packt.chapter6
import akka.actor.{ActorLogging, Props}
import akka.persistence.{PersistentActor, Recovery, RecoveryCompleted, SnapshotOffer}
object FriendActor {
def props(friendId: String, recoveryStrategy: Recovery) = Props(new FriendActor(friendId, recoveryStrategy))
}
class FriendActor(friendId: String, r: Recovery) extends PersistentActor with ActorLogging {
override val persistenceId = friendId
override val recovery = r
var state = FriendState()
def updateState(event: FriendEvent) = state = state.update(event)
val receiveRecover: Receive = {
case evt: FriendEvent =>
log.info(s"Replaying event: $evt")
updateState(evt)
case SnapshotOffer(_, recoveredState : FriendState) =>
log.info(s"Snapshot offered: $recoveredState")
state = recoveredState
case RecoveryCompleted => log.info(s"Recovery completed. Current state: $state")
}
val receiveCommand: Receive = {
case AddFriend(friend) => persist(FriendAdded(friend))(updateState)
case RemoveFriend(friend) => persist(FriendRemoved(friend))(updateState)
case "snap" => saveSnapshot(state)
case "print" => log.info(s"Current state: $state")
}
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:34,代码来源:FriendActor.scala
示例18: StockPersistenceActor
//设置package包名称以及导入依赖的类
package com.packt.chapter6
import akka.actor.{ActorLogging, Props}
import akka.persistence.{PersistentActor, RecoveryCompleted}
object StockPersistenceActor {
def props(stockId: String) = Props(new StockPersistenceActor(stockId))
}
class StockPersistenceActor(stockId: String) extends PersistentActor with ActorLogging {
override val persistenceId = stockId
var state = StockHistory()
def updateState(event: ValueAppended) = state = state.update(event)
val receiveRecover: Receive = {
case evt: ValueAppended => updateState(evt)
case RecoveryCompleted => log.info(s"Recovery completed. Current state: $state")
}
val receiveCommand: Receive = {
case ValueUpdate(value) => persist(ValueAppended(StockValue(value)))(updateState)
case "print" => log.info(s"Current state: $state")
}
override def postStop() = log.info(s"Stopping [${self.path}]")
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:28,代码来源:StockPersistenceActor.scala
示例19: SamplePersistenceActor
//设置package包名称以及导入依赖的类
package com.packt.chapter6
import akka.persistence.{PersistentActor, SnapshotOffer}
class SamplePersistenceActor extends PersistentActor {
override val persistenceId = "unique-id-1"
var state = ActiveUsers()
def updateState(event: Event) = state = state.update(event)
val receiveRecover: Receive = {
case evt: Event => updateState(evt)
case SnapshotOffer(_, snapshot: ActiveUsers) => state = snapshot
}
val receiveCommand: Receive = {
case UserUpdate(userId, Add) => persist(AddUserEvent(userId))(updateState)
case UserUpdate(userId, Remove) => persist(RemoveUserEvent(userId))(updateState)
case "snap" => saveSnapshot(state)
case "print" => println(state)
case ShutdownPersistentActor => context.stop(self)
}
override def postStop() = println(s"Stopping [${self.path}]")
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:26,代码来源:SamplePersistenceActor.scala
示例20: TodoRepositoryProcessor
//设置package包名称以及导入依赖的类
package com.todos.repository
import java.util.UUID
import akka.actor.{Actor, ActorLogging, Props}
import akka.persistence.{PersistentActor, Recovery}
import com.todos.command.{CreateTodo, RemoveTodo}
import com.todos.event.utils.ProcessedEvent
import com.todos.event.{TodoCreated, TodoRemoved}
import com.todos.response.Success
class TodoRepositoryProcessor() extends PersistentActor with ActorLogging {
log.info("Started {}", self.path.name)
def persistenceId: String = self.path.name
def receiveCommand: Receive = {
case cmd: CreateTodo =>
persist(
event = TodoCreated(
id = UUID.randomUUID(),
title = cmd.title,
completed = cmd.completed
)
) { persistedEvent =>
sender() ! Success()
context.system.eventStream.publish(ProcessedEvent(persistedEvent, lastSequenceNr))
}
case cmd: RemoveTodo =>
persist(event = TodoRemoved(cmd.id)) { persistedEvent =>
sender() ! Success()
context.system.eventStream.publish(ProcessedEvent(persistedEvent, lastSequenceNr))
}
}
override def recovery: Recovery = Recovery.none
override def receiveRecover: Receive = Actor.emptyBehavior
}
object TodoRepositoryProcessor {
val name: String = "todo-repository-processor"
def props(): Props = {
Props(
classOf[TodoRepositoryProcessor]
)
}
}
开发者ID:benniekrijger,项目名称:todo-service,代码行数:54,代码来源:TodoRepositoryProcessor.scala
注:本文中的akka.persistence.PersistentActor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论