本文整理汇总了Scala中akka.actor.Stash类的典型用法代码示例。如果您正苦于以下问题:Scala Stash类的具体用法?Scala Stash怎么用?Scala Stash使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Stash类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: RemoteActorProxyWithBecome
//设置package包名称以及导入依赖的类
package com.dazito.scala.akkademy.client
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, Stash}
import com.dazito.scala.dakkabase.messages.{Disconnected, Connected, GetRequest}
class RemoteActorProxyWithBecome extends Actor with Stash {
override def receive: Receive = {
case x: GetRequest =>
stash()
case _: Connected =>
context.become(online)
unstashAll()
}
def online: Receive = {
case x: GetRequest =>
processMessage(x)
case _: Disconnected =>
context.unbecome()
}
def processMessage(message: AnyRef) = {
// Do something
}
}
开发者ID:dazito,项目名称:LearningAkkaScalaClient,代码行数:30,代码来源:RemoteActorProxyWithBecome.scala
示例2: RemoteActorProxy
//设置package包名称以及导入依赖的类
package com.dazito.scala.akkademy.client
import akka.actor.Actor.Receive
import akka.actor.{Actor, Stash}
import com.dazito.scala.dakkabase.messages.{Disconnected, Connected, GetRequest}
class RemoteActorProxy extends Actor with Stash {
var isOnline = false;
override def receive: Receive = {
case message: GetRequest =>
if(isOnline) {
processMessage(message)
}
else {
stash()
}
case _: Connected =>
isOnline = true
unstashAll()
case _: Disconnected =>
isOnline = false;
}
def processMessage(message: AnyRef) = {
// Do something
}
}
开发者ID:dazito,项目名称:LearningAkkaScalaClient,代码行数:31,代码来源:RemoteActorProxy.scala
示例3: User
//设置package包名称以及导入依赖的类
package com.udemy.akka.become
import akka.actor.{Actor, Stash}
import akka.actor.Actor.Receive
import com.udemy.akka.become.UserStorage.{Connect, Disconnect, Operation}
case class User(username:String,email:String)
object UserStorage{
trait DBOperation
object DBOperation{
case object Create extends DBOperation
case object Read extends DBOperation
case object Update extends DBOperation
case object Delete extends DBOperation
}
case object Connect
case object Disconnect
case class Operation(operation:DBOperation,user:Option[User])
}
class UserStorage extends Actor with Stash{
override def receive: Receive = disconnected
def connected:Actor.Receive= {
case Disconnect=>
println("User storage disconnected from DB")
unstashAll()
context.unbecome()
case Operation(op,user)=>
println(s"User storage recieved $op to do in user : $user")
}
def disconnected:Actor.Receive = {
case Connect =>
println("User storage connected to DB")
unstashAll()
context.become(connected)
case _ =>
stash()
}
}
开发者ID:akshay-harale,项目名称:udemy-akka,代码行数:48,代码来源:User.scala
示例4: SchedulerActor
//设置package包名称以及导入依赖的类
package mesosphere.mesos.simulation
import akka.actor.{ Actor, Stash }
import akka.event.LoggingReceive
import mesosphere.marathon.stream._
import org.apache.mesos.Protos.{ FrameworkID, MasterInfo, Offer, TaskStatus }
import org.apache.mesos.{ Scheduler, SchedulerDriver }
import org.slf4j.LoggerFactory
import scala.collection.immutable.Seq
object SchedulerActor {
private val log = LoggerFactory.getLogger(getClass)
case class Registered(
frameworkId: FrameworkID,
master: MasterInfo)
case class ResourceOffers(offers: Seq[Offer])
}
class SchedulerActor(scheduler: Scheduler) extends Actor with Stash {
import SchedulerActor._
var driverOpt: Option[SchedulerDriver] = None
def receive: Receive = waitForDriver
def waitForDriver: Receive = LoggingReceive.withLabel("waitForDriver") {
case driver: SchedulerDriver =>
log.info("received driver")
driverOpt = Some(driver)
context.become(handleCmds(driver))
unstashAll()
case _ => stash()
}
def handleCmds(driver: SchedulerDriver): Receive = LoggingReceive.withLabel("handleCmds") {
case Registered(frameworkId, masterInfo) =>
scheduler.registered(driver, frameworkId, masterInfo)
case ResourceOffers(offers) =>
scheduler.resourceOffers(driver, offers)
case status: TaskStatus =>
scheduler.statusUpdate(driver, status)
}
override def postStop(): Unit = {
driverOpt.foreach { driver => scheduler.disconnected(driver) }
}
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:54,代码来源:SchedulerActor.scala
示例5: fold
//设置package包名称以及导入依赖的类
package com.github.wildprairie.common.actors.shared
import akka.actor.{Actor, ActorRef, Stash}
import scala.concurrent.{Future, Promise}
trait ActorFolder { this: Actor with Stash =>
def fold[B](actors: Traversable[ActorRef], input: Any)(accumulator: B)(
accumulate: PartialFunction[(Any, B), B]): Future[B] = {
val promise = Promise[B]()
if (actors.isEmpty) {
promise.success(accumulator)
} else {
actors.foreach(_ ! input)
context.become(collectAndDispatch(actors.size, promise)(accumulator)(accumulate))
}
promise.future
}
private[this] def collectAndDispatch[B](remaining: Int, promise: Promise[B])(accumulator: B)(
accumulate: PartialFunction[(Any, B), B]): Receive = {
case msg =>
val matched =
accumulate.runWith { res =>
if (remaining > 1) {
context.become(
collectAndDispatch(remaining - 1, promise)(res)(accumulate),
discardOld = true
)
} else {
promise.success(res)
unstashAll()
context.unbecome()
}
}.apply((msg, accumulator))
if (!matched) stash()
}
}
开发者ID:OpenWakfu,项目名称:wildprairie,代码行数:42,代码来源:ActorFolder.scala
示例6: User
//设置package包名称以及导入依赖的类
package database
import akka.actor.{Stash, Props, ActorSystem, Actor}
import database.UserStorage.{DBOperation, Connect, DisConnect, Operation}
case class User(username: String, email: String)
object UserStorage {
trait DBOperation
object DBOperation {
case object Create extends DBOperation
case object Update extends DBOperation
case object Read extends DBOperation
case object Delete extends DBOperation
}
case object Connect
case object DisConnect
case class Operation(dbOperation: DBOperation, user: Option[User])
}
class UserStorage extends Actor with Stash {
def connected: Actor.Receive = {
case DisConnect =>
println("User Storage Disconnect from DB")
context.unbecome()
case Operation(op, user) =>
println(s"User Storage receive ${op} to do in user: ${user} ")
}
def disconnected: Actor.Receive = {
case Connect =>
println(s"User Storage connected to DB")
unstashAll() // inqueue's message
context.become(connected)
case _ =>
stash()
}
override def receive: Receive = disconnected
}
object BecomeHotswap extends App {
val system = ActorSystem("Hotswap-Become")
val userStorage = system.actorOf(Props[UserStorage], "userStorage")
userStorage ! Connect
userStorage ! Operation(DBOperation.Create, Some(User("Admin", "[email protected]")) )
userStorage ! DisConnect
Thread.sleep(100)
system.shutdown()
}
开发者ID:astray1988,项目名称:AkkaExplore,代码行数:61,代码来源:User.scala
示例7: UnaryNode
//设置package包名称以及导入依赖的类
package hu.bme.mit.ire.nodes.unary
import akka.actor.{Actor, Stash}
import hu.bme.mit.ire._
import hu.bme.mit.ire.messages._
abstract class UnaryNode(val expectedTerminatorCount: Int = 1) extends Actor with Forwarder with Stash with TerminatorHandler {
val log = context.system.log
val name = self.path.name
def onChangeSet(changeSet: ChangeSet)
override def receive: Actor.Receive = {
case pause: Pause => context.become({
case resume: Resume => {
if (resume.messageID == pause.messageID) {
context.unbecome()
unstashAll()
} else stash()
}
case terminator: TerminatorMessage => handleTerminator(terminator)
case _ => stash()
})
case changeSet: ChangeSet => onChangeSet(changeSet)
case terminator: TerminatorMessage => handleTerminator(terminator)
case Primary | Secondary =>
throw new UnsupportedOperationException(s"$name received Beta-wrapped message")
case _:SizeRequest => sender() ! onSizeRequest()
case _ => throw new UnsupportedOperationException(s"$name received unknown message")
}
def onSizeRequest(): Long
}
开发者ID:FTSRG,项目名称:ire,代码行数:34,代码来源:UnaryNode.scala
示例8: PausableActor
//设置package包名称以及导入依赖的类
package com.packt.chapter10
import akka.actor.{Actor, ActorLogging, ActorRef, Stash}
import com.packt.chapter10.PausableActor.{Ready, Work}
object PausableActor {
case class Work(id: Int)
case object Ready
}
class PausableActor(target: ActorRef) extends Actor with ActorLogging with Stash {
def receive = {
case work: Work =>
target ! work
log.info(s"Received Work [${work.id}]. Sending and pausing.")
context.become(pausedBehavior, discardOld = false)
}
def pausedBehavior : Receive = {
case work: Work =>
stash()
case Ready if sender == target =>
log.info(s"[${target.path.name}] is ready again.")
context.unbecome()
unstashAll()
case Ready =>
log.info(s"Discarding [Ready] from other actor different from ${target.path.name}")
}
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:30,代码来源:PausableActor.scala
示例9: PersistingActor
//设置package包名称以及导入依赖的类
package com.mikemunhall.simpletwitterstats.server
import akka.actor.{Actor, Props, Stash}
import com.mikemunhall.simpletwitterstats.model.metrics.Tweet
import com.mikemunhall.simpletwitterstats.server.util.ParseUtil
import com.typesafe.scalalogging.StrictLogging
object PersistingActor {
def props = Props[PersistingActor]
case class Persist(tweet: Tweet)
}
class PersistingActor extends Actor with Stash with StrictLogging with ParseUtil {
import com.mikemunhall.simpletwitterstats.server.PersistingActor.Persist
import com.mikemunhall.simpletwitterstats.timeSeriesData
self ! "initialize"
def receive = uninitialized
def uninitialized: Receive = {
case "initialize" =>
logger.debug("Initializing PersistingActor and unstashing messages.")
unstashAll()
context.become(initialized)
case _ =>
logger.debug("PersistingActor invoked before initialization. Stashing message.")
stash()
}
def initialized: Receive = {
case Persist(tweet) =>
timeSeriesData.add(tweet)
}
}
开发者ID:mmunhall,项目名称:simple-twitter-stats,代码行数:36,代码来源:PersistingActor.scala
示例10: ParsingActor
//设置package包名称以及导入依赖的类
package com.mikemunhall.simpletwitterstats.server
import akka.actor.{Actor, Props, Stash}
import com.mikemunhall.simpletwitterstats.model.metrics.Tweet
import com.mikemunhall.simpletwitterstats.server.util.ParseUtil
import com.typesafe.scalalogging.StrictLogging
import twitter4j.Status
object ParsingActor {
case class Parse(status: Status)
}
class ParsingActor extends Actor with Stash with StrictLogging with ParseUtil {
import com.mikemunhall.simpletwitterstats.server.ParsingActor.Parse
import com.mikemunhall.simpletwitterstats.server.PersistingActor.Persist
val persistingActor = context.actorOf(PersistingActor.props, "persistingActor")
self ! "initialize"
def receive = uninitialized
def uninitialized: Receive = {
case "initialize" =>
logger.debug("Initializing ParsingActor and unstashing messages.")
unstashAll()
context.become(initialized)
case _ =>
logger.debug("ParsingActor invoked before initialization. Stashing message.")
stash()
}
def initialized: Receive = {
case Parse(status) =>
val urlEntities = status.getURLEntities
val tweet = Tweet(
status.getId,
dateToLocalDateTime(status.getCreatedAt),
emojisFromText(status.getText),
status.getHashtagEntities.map(_.getText).toList,
urlEntities.map(_.getExpandedURL).filterNot(_ == "").map(domainFromUrl(_)).toList,
// TODO: Externalize the domain filter
urlEntities.map(_.getExpandedURL).filter(d => d.contains("pic.twitter") || d.contains("instagram")).map(domainFromUrl(_)).toList
)
persistingActor ! Persist(tweet)
}
}
开发者ID:mmunhall,项目名称:simple-twitter-stats,代码行数:48,代码来源:ParsingActor.scala
示例11: OrderingExecutor
//设置package包名称以及导入依赖的类
package com.pagerduty.scheduler.akka
import akka.actor.{ActorRef, ActorRefFactory, Props, Stash}
import com.pagerduty.scheduler.model.{Task, TaskKey}
import scala.collection.immutable.SortedMap
object OrderingExecutor {
class OrderingExecutor(
orderingId: Task.OrderingId,
partitionExecutor: ActorRef,
taskExecutorFactory: (ActorRefFactory, Task, ActorRef) => ActorRef)
extends ExtendedLoggingFSM[OrderingExecutor.State, OrderingExecutor.Data]
with Stash {
import OrderingExecutor._
import PartitionExecutor._
startWith(Idle, Data(SortedMap.empty))
override val supervisorStrategy = Supervision.AlwaysEscalateStrategy
when(Idle) {
case Event(ExecuteOrderingTask(task), data) if task.orderingId == orderingId => {
assert(data.taskQueue.isEmpty)
execute(task)
goto(Executing)
}
}
when(Executing) {
case Event(ExecuteOrderingTask(task), data) if task.orderingId == orderingId => {
stay() using Data(data.taskQueue + (task.taskKey -> task))
}
case Event(TaskExecuted(taskKey), data) if taskKey.orderingId == orderingId => {
partitionExecutor ! OrderingTaskExecuted(taskKey)
if (data.taskQueue.isEmpty) {
goto(Idle)
} else {
val (taskKey, task) = data.taskQueue.head
val remaining = data.taskQueue.tail
execute(task)
stay() using Data(remaining)
}
}
}
def execute(task: Task): Unit = {
taskExecutorFactory(context, task, context.self)
}
}
开发者ID:PagerDuty,项目名称:scheduler,代码行数:53,代码来源:OrderingExecutor.scala
示例12: Pokedex
//设置package包名称以及导入依赖的类
package PokemonsDataStore
import java.io.FileReader
import Common.{FetchPokemonByIdRequest, FetchPokemonByIdResponse, GetStatsRequest, RequestStatus}
import PokemonsDataStore.PokemonModel.{PokemonStat, TypeRouter}
import akka.actor.{Actor, ActorRef, Props, Stash}
import java.nio.file.{Files, Paths}
import scala.io.Source
class Pokedex(pokeApiClient : ActorRef) extends Actor with Stash{
var pokemonsLeftToInit = 721
def OnPokemonReceived(response : FetchPokemonByIdResponse): Unit = {
response match {
case FetchPokemonByIdResponse(_, _, RequestStatus.Success) =>
// Spawning pokemon actor containing info retrieved from pokeApi
pokemonsLeftToInit -= 1
val currentPokemonId = 722 - pokemonsLeftToInit
context.actorOf(Props(new PokemonBase(response.result, typeRouter)), response.result.name)
if (pokemonsLeftToInit > 0) {
println("Fetching next pokemon (id: " + currentPokemonId + " )")
CreatePokemon(currentPokemonId)
}
else {
println("No more pokemon to spawn !")
unstashAll()
context.become(receive)
}
case FetchPokemonByIdResponse(request : FetchPokemonByIdRequest, _, RequestStatus.Error) =>
// Trying again. Not the best way to handle a failure, only serves as fail-safe for now
CreatePokemon(request.id)
}
}
var typeRouter : ActorRef = ActorRef.noSender
def CreatePokemon(id: Int): Unit = {
pokeApiClient.tell(FetchPokemonByIdRequest(id), self)
}
override def preStart(): Unit = {
super.preStart()
typeRouter = context.actorOf(Props[TypeRouter], "typeRouter")
CreatePokemon(1)
}
// I could have stashed request until all pokemons are up before answering but I prefer to be as reactive as possible,
// even if it means only having partial results
override def receive: Receive = {
case request : GetStatsRequest =>
val path = context.self.path + "/" + request.pokemonName
println("Received request for pokemon named " + request.pokemonName + ". Searching at path: " + path)
context.actorSelection(path).forward(request)
case response : FetchPokemonByIdResponse =>
OnPokemonReceived(response)
}
}
开发者ID:Sid3way,项目名称:SuperRadHomework,代码行数:62,代码来源:Pokedex.scala
示例13: Counter
//设置package包名称以及导入依赖的类
// Copyright (c) Microsoft. All rights reserved.
package it.helpers
import java.util.concurrent.Executors
import akka.actor.{Actor, Stash}
import scala.concurrent.ExecutionContext
class Counter extends Actor with Stash {
implicit val executionContext = ExecutionContext
.fromExecutorService(Executors.newFixedThreadPool(sys.runtime.availableProcessors))
private[this] var count: Long = 0
override def receive: Receive = ready
def ready: Receive = {
case "reset" ? {
context.become(busy)
count = 0
context.become(ready)
unstashAll()
}
case "inc" ? {
context.become(busy)
count += 1
context.become(ready)
unstashAll()
}
case "get" ? sender() ! count
}
def busy: Receive = {
case _ ? stash()
}
}
开发者ID:Azure,项目名称:toketi-iothubreact,代码行数:41,代码来源:Counter.scala
示例14: Pipe
//设置package包名称以及导入依赖的类
package fr.acinq.eclair
import akka.actor.{Actor, ActorLogging, ActorRef, Stash}
import fr.acinq.eclair.channel.Commitments.msg2String
import fr.acinq.eclair.channel.{INPUT_DISCONNECTED, INPUT_RECONNECTED}
import fr.acinq.eclair.wire.LightningMessage
class Pipe extends Actor with Stash with ActorLogging {
def receive = {
case (a: ActorRef, b: ActorRef) =>
unstashAll()
context become connected(a, b)
case msg => stash()
}
def connected(a: ActorRef, b: ActorRef): Receive = {
case msg: LightningMessage if sender() == a =>
log.debug(f"A ---${msg2String(msg)}%-6s--> B")
b forward msg
case msg: LightningMessage if sender() == b =>
log.debug(f"A <--${msg2String(msg)}%-6s--- B")
a forward msg
case [email protected]_DISCONNECTED =>
log.debug("DISCONNECTED")
// used for fuzzy testing (eg: send Disconnected messages)
a forward msg
b forward msg
context become disconnected(a, b)
}
def disconnected(a: ActorRef, b: ActorRef): Receive = {
case msg: LightningMessage if sender() == a =>
// dropped
log.info(f"A ---${msg2String(msg)}%-6s-X")
case msg: LightningMessage if sender() == b =>
// dropped
log.debug(f" X-${msg2String(msg)}%-6s--- B")
case [email protected]_RECONNECTED(r) =>
log.debug(s"RECONNECTED with $r")
// used for fuzzy testing (eg: send Disconnected messages)
a forward msg
b forward msg
r ! (a, b)
}
}
开发者ID:viacoin,项目名称:eclair,代码行数:50,代码来源:Pipe.scala
示例15: Ch5PingActor
//设置package包名称以及导入依赖的类
package com.akkastarting.chapter5
import akka.actor.{ActorLogging, Actor, Props, Stash}
class Ch5PingActor extends Actor with ActorLogging with Stash {
val child = context.system.actorOf(Ch5Ping1Actor.props, "Ping1Actor")
def zeroDone: Receive = {
case Done(message: String) =>
log.info(s"Received the first done -> $message")
context.become(oneDone)
case _ =>
stash()
}
def oneDone: Receive = {
case Done(message: String) =>
log.info(s"Received the second done -> $message")
unstashAll()
context.become(allDone)
case _ =>
stash()
}
def allDone: Receive = {
case Reset(message: String) =>
log.info(s"Received a reset -> $message")
context.become(receive)
case _ =>
stash()
}
def receive: Receive = {
case Work(message: String) =>
(child ! message)(self)
context.become(zeroDone)
case _ =>
stash()
}
}
object Ch5PingActor {
val props = Props[Ch5PingActor]
}
开发者ID:imjuni,项目名称:AkkaStartingUsingScala,代码行数:46,代码来源:Ch5PingActor.scala
示例16: GracefulKill
//设置package包名称以及导入依赖的类
package com.scalanerds.wireserver.handlers
import akka.actor.{Actor, ActorRef, Stash}
import akka.event.Logging
import akka.util.ByteString
import com.scalanerds.wireserver.messageTypes._
import com.scalanerds.wireserver.wire.Message
import com.scalanerds.wireserver.wire.opcodes._
object GracefulKill
abstract class MsgHandler extends Actor with Stash {
var connection: ActorRef = _
val log = Logging(context.system, this)
override def postStop(): Unit = {
if (connection == null)
log.debug("Walter died without processing valid messages")
else
log.debug("Walter died " + connection.path)
super.postStop()
}
def receive: Receive = uninitialized
def uninitialized: Receive = {
case GracefulKill => stop()
case ref: ActorRef =>
connection = ref
context.become(initialized)
unstashAll()
case _ => stash()
}
def initialized: Receive = {
def stop() {
log.debug("MsgHandler stop " + connection.path)
context stop self
}
}
开发者ID:scalanerds,项目名称:wireserver,代码行数:45,代码来源:MsgHandler.scala
示例17: MetricShipper
//设置package包名称以及导入依赖的类
package com.codekeepersinc.kamonlogstash
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Stash }
import akka.util.ByteString
import MetricShipper.ShipperConfig
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._
import io.circe.java8.time.encodeZonedDateTime
import scala.concurrent.duration.FiniteDuration
object MetricShipper {
case class ShipperConfig(
address: String,
port: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
retryAutoReset: FiniteDuration
)
def props(config: ShipperConfig): Props = Props(new MetricShipper(config))
}
class MetricShipper(config: ShipperConfig) extends Actor with ActorLogging with Stash {
protected final val LOG_SEPARATOR = "\r\n"
val watcher = context.actorOf(LogstashWatcher.props(self, config), "watcher")
implicit final val encodeZonedDatetime: Encoder[ZonedDateTime] = encodeZonedDateTime(ISO_OFFSET_DATE_TIME)
def receive: Receive = waitForClient
def connected(client: ActorRef): Receive = {
case l: List[MetricLogger.Metric] => client ! ByteString(l.map(_.asJson.noSpaces).mkString(LOG_SEPARATOR) + LOG_SEPARATOR, "UTF-8")
case LogstashWatcher.WaitForReconnection => context become (waitForClient, discardOld = true)
}
def waitForClient: Receive = {
case LogstashWatcher.UseClient(client) => {
unstashAll()
context become (connected(client), discardOld = true)
}
case l: List[MetricLogger.Metric] => stash()
}
override def preStart(): Unit = watcher ! LogstashWatcher.Connect
}
开发者ID:darienmt,项目名称:kamon-logstash,代码行数:56,代码来源:MetricShipper.scala
示例18: StatsdActor
//设置package包名称以及导入依赖的类
package com.sharpershape.scala.metrics.statsd
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorRef, Props, Stash}
import akka.io.{IO, Udp}
import akka.util.ByteString
class StatsdActor (remote: InetSocketAddress) extends Actor with Stash {
import context.system
IO(Udp) ! Udp.SimpleSender
override def receive: Receive = {
case Udp.SimpleSenderReady =>
context.become(ready(sender()))
unstashAll()
case _ =>
stash()
}
def ready(socket: ActorRef): Receive = {
case metric: Statsd.Metric =>
socket ! Udp.Send(ByteString(metric.payload), remote)
}
}
object StatsdActor {
def props(remote: InetSocketAddress): Props = Props(new StatsdActor(remote))
}
开发者ID:SharperShape,项目名称:scala-metrics,代码行数:32,代码来源:StatsdActor.scala
示例19: User
//设置package包名称以及导入依赖的类
package com.github.lzenczuk.akka.course.become
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Stash}
import akka.actor.Actor.Receive
import com.github.lzenczuk.akka.course.become.User.{Message, OffLine, OnLine}
object User {
sealed trait UserCommands
case object OnLine extends UserCommands
case object OffLine extends UserCommands
case class Message(content:String) extends UserCommands
}
class User(name:String) extends Actor with ActorLogging with Stash{
def receive = offLint
def onLint:Actor.Receive = {
case OffLine =>
log.info(s"User $name off-line")
context.unbecome()
case Message(msg) =>
log.info(s"User $name receive message: $msg")
}
def offLint:Actor.Receive = {
case OnLine =>
log.info(s"User $name on-line")
unstashAll()
context.become(onLint)
case Message(_) =>
log.info(s"User $name off-line. Stash message to consume later.")
stash()
}
}
object BecomeMain extends App{
private val system: ActorSystem = ActorSystem("user-message-system")
private val user1: ActorRef = system.actorOf(Props(new User("Mark")))
user1 ! Message("Hi Mark")
user1 ! Message("How are you?")
user1 ! OffLine
user1 ! Message("Boom boom boom!")
user1 ! OnLine
user1 ! Message("UR on line!")
user1 ! OffLine
user1 ! Message("Off line?")
Thread.sleep(200L)
system.shutdown()
}
开发者ID:lzenczuk,项目名称:akka-app-one,代码行数:57,代码来源:BecomeMain.scala
示例20: preStart
//设置package包名称以及导入依赖的类
package dcos.metronome.repository
import akka.actor.{ Actor, ActorLogging, Stash }
import scala.concurrent.Future
import scala.util.{ Failure, Success }
trait LoadContentOnStartup[Id, Model] extends Actor with Stash with ActorLogging {
import LoadContentOnStartup._
//TODO: change me to zk ec
import context.dispatcher
override def preStart(): Unit = {
super.preStart()
context.become(waitForInit)
loadAll()
}
def repo: Repository[Id, Model]
def initialize(specs: List[Model]): Unit
def waitForInit: Receive = {
case init: Init[Model] =>
initialize(init.result)
context.become(receive)
unstashAll()
case _ => stash()
}
def loadAll(): Unit = {
val loadAllFuture = repo.ids().flatMap { ids =>
ids.foldLeft(Future.successful(List.empty[Model])) {
case (resultsFuture, id) => resultsFuture.flatMap { res =>
repo.get(id).map(_.map(_ :: res).getOrElse(res))
}
}
}
val me = self
loadAllFuture.onComplete {
case Success(result) => me ! Init(result)
case Failure(ex) =>
log.error(ex, "Can not load initial data. Give up.")
throw ex
}
}
}
object LoadContentOnStartup {
case class Init[T](result: List[T])
}
开发者ID:dcos,项目名称:metronome,代码行数:52,代码来源:LoadContentOnStartup.scala
注:本文中的akka.actor.Stash类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论