本文整理汇总了Scala中akka.actor.OneForOneStrategy类的典型用法代码示例。如果您正苦于以下问题:Scala OneForOneStrategy类的具体用法?Scala OneForOneStrategy怎么用?Scala OneForOneStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了OneForOneStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ClientActor
//设置package包名称以及导入依赖的类
package org.http4s.akka
import scala.reflect.ClassTag
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, Props, Terminated}
import fs2.Task
import fs2.async.mutable.{Queue, Signal}
private class ClientActor[Out](props: Props, outQueue: Queue[Task, Out], closeSignal: Signal[Task, Boolean])
(implicit messageType: ClassTag[Out]) extends Actor {
val serverActor = context actorOf props
context watch serverActor
def receive: Receive = {
case Terminated(`serverActor`) =>
closeSignal.set(true).unsafeRun()
context stop self
case messageType(m) if sender() == serverActor =>
outQueue.enqueue1(m).unsafeRun()
case m if sender() == serverActor =>
org.log4s.getLogger.error(s"Server sent unhandled message ${m.getClass.getSimpleName} " +
s"expecting a ${messageType.runtimeClass.getSimpleName}!")
case m if sender() == self =>
serverActor ! m
}
override def supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
case _ => Stop
}
}
开发者ID:Lasering,项目名称:http4s-akka,代码行数:32,代码来源:ClientActor.scala
示例2: ClusterRouterSupervisorSpec
//设置package包名称以及导入依赖的类
package akka.cluster.oldrouting
import akka.testkit._
import akka.actor._
import akka.routing.RoundRobinRouter
import akka.actor.OneForOneStrategy
import akka.cluster.routing._
object ClusterRouterSupervisorSpec {
class KillableActor(testActor: ActorRef) extends Actor {
def receive = {
case "go away" ?
throw new IllegalArgumentException("Goodbye then!")
}
}
}
class ClusterRouterSupervisorSpec extends AkkaSpec("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port = 0
""") {
import ClusterRouterSupervisorSpec._
"Cluster aware routers" must {
"use provided supervisor strategy" in {
val router = system.actorOf(Props(classOf[KillableActor], testActor).withRouter(
ClusterRouterConfig(RoundRobinRouter(supervisorStrategy = OneForOneStrategy() {
case _ ?
testActor ! "supervised"
SupervisorStrategy.Stop
}), ClusterRouterSettings(
totalInstances = 1,
maxInstancesPerNode = 1,
allowLocalRoutees = true,
useRole = None))), name = "therouter")
router ! "go away"
expectMsg("supervised")
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:ClusterRouterSupervisorSpec.scala
示例3: CustomOneForOneUser
//设置package包名称以及导入依赖的类
package com.example.supervisor
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy}
class CustomOneForOneUser extends Actor with ActorLogging {
// Map that keeps track of how often a given child has been restarted
var restarts = Map.empty[ActorRef, Int].withDefaultValue(0)
// Using the default parameters for a SupervisorStrategy mean "arbitarily
// often over forever"
override val supervisorStrategy = OneForOneStrategy() {
case _: ArithmeticException =>
restarts(sender) match {
case tooManyRestarts if tooManyRestarts > 15 =>
restarts -= sender
Stop
case n =>
restarts = restarts.updated(sender, n+1)
Restart
}
}
override def receive: Receive = ???
}
开发者ID:dkmn,项目名称:week-3-lecture-examples,代码行数:30,代码来源:CustomOneForOneUser.scala
示例4: TelemetryDumperSupervisor
//设置package包名称以及导入依赖的类
package sample.kamon
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props}
import sample.kamon.TelemetryDumperSupervisor.Saved
import scala.util.Random
class TelemetryDumperSupervisor extends Actor with ActorLogging {
log.info("TelemetryDumperSupervisor started!")
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e: Exception =>
log.error(e, "There was an error when trying to save telemetry data, restarting.")
Restart
}
def receive: Receive = {
case telemetry: Int =>
Thread.sleep(40 * Random.nextInt(10))
context.actorOf(TelemetryDumper.props(new SimpleBackoffStrategy, telemetry))
case Saved(telemetry) =>
//Make some post process
}
}
object TelemetryDumperSupervisor {
def props(): Props = Props(classOf[TelemetryDumperSupervisor])
case class Saved(telemetry: Int)
}
开发者ID:frossi85,项目名称:akka-kamon-sample,代码行数:33,代码来源:TelemetryDumperSupervisor.scala
示例5: Supervisor
//设置package包名称以及导入依赖的类
package akka_in_action.supervisor
import scala.concurrent.duration._
import akka.actor.{Actor, ActorLogging, ActorSystem, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy._
class Supervisor extends Actor with ActorLogging {
override def receive: Receive = {
case "spawn" => context.actorOf(Props[Worker])
}
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 30 seconds) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
}
class Worker extends Actor with ActorLogging {
override def receive: Receive = ???
}
object StrategyApp extends App {
implicit val system = ActorSystem("actor-system")
import system.dispatcher
system.actorOf(Props[Supervisor], "supervisor")
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:31,代码来源:StrategyApp.scala
示例6: Supervisor
//设置package包名称以及导入依赖的类
package io.ticofab.scalarabbitmqexample.actor
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, Props}
import io.ticofab.scalarabbitmqexample.actor.QueueListener.{CloseYourEars, Listen}
import io.ticofab.scalarabbitmqexample.actor.Supervisor.{Begin, End}
object Supervisor {
case object Begin
case object End
def props = Props[Supervisor]
}
class Supervisor extends Actor {
val queueListener = context.actorOf(QueueListener.props)
// very simple supervision strategy: if anything happens, stop the actor
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case _: Exception => Stop
}
override def receive: Receive = {
case Begin => queueListener ! Listen
case End => queueListener ! CloseYourEars
}
}
开发者ID:ticofab,项目名称:Scala-RabbitMQ-Example,代码行数:30,代码来源:Supervisor.scala
示例7: CreateCheckingsAccountResponse
//设置package包名称以及导入依赖的类
package com.franklevering.ports.adapters.http.request.handlers
import java.util.UUID
import akka.actor.SupervisorStrategy._
import akka.actor.{OneForOneStrategy, Props}
import akka.http.scaladsl.model.StatusCodes
import com.franklevering.banking.domain.model.account.Account
import com.franklevering.banking.domain.model.account.CreateCheckingsAccount
import com.franklevering.ports.adapters.http.ImperativeRequestContext
import com.franklevering.ports.adapters.http.request.Request
import scala.concurrent.duration._
case class CreateCheckingsAccountResponse(id: String)
class AccountRequestHandler(ctx: ImperativeRequestContext) extends Request(ctx) {
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
def receive: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
val account = context.actorOf(Props(classOf[Account], UUID.randomUUID()))
account ! createCheckingsAccount
case createCheckingsAccountResponse: CreateCheckingsAccountResponse =>
ctx.complete(StatusCodes.Created, createCheckingsAccountResponse)
context.stop(self)
}
}
开发者ID:frankieleef,项目名称:banking,代码行数:36,代码来源:AccountRequestHandler.scala
示例8:
//设置package包名称以及导入依赖的类
package actors.workflow
import actors.WorkflowLog.LogMessage
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorLogging, OneForOneStrategy}
import com.amazonaws.{AmazonClientException, AmazonServiceException}
import scala.concurrent.duration._
trait AWSSupervisorStrategy extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 20, loggingEnabled = true) {
case ex: AmazonServiceException =>
ex.getErrorCode match {
case "ServiceUnavailable" | "Throttling" =>
log.debug("Supervisor Authorized Restart")
Restart
case _ =>
context.parent ! LogMessage(ex.toString)
Stop
}
case _: AmazonClientException =>
log.debug("Supervisor Authorized Restart")
Restart
case ex: Exception =>
context.parent ! LogMessage(ex.toString)
Stop
}
}
开发者ID:lifeway,项目名称:Chadash,代码行数:31,代码来源:AWSSupervisorStrategy.scala
示例9: CriticalProcessesManager
//设置package包名称以及导入依赖的类
package net.hvieira.yeoldeonlinestore.actor
import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy}
import net.hvieira.yeoldeonlinestore.actor.CriticalProcessesManager._
import net.hvieira.yeoldeonlinestore.actor.store.StoreManager
import net.hvieira.yeoldeonlinestore.actor.user.UserManager
import net.hvieira.yeoldeonlinestore.api.Item
object CriticalProcessesManager {
private val STORE_MANAGER = "store-manager"
private val USER_MANAGER = "user-manager"
def props(itemProvider: () => Iterable[Item]) = Props(new CriticalProcessesManager(itemProvider))
}
class CriticalProcessesManager(private val itemProvider: () => Iterable[Item]) extends Actor {
override def preStart(): Unit = {
context.actorOf(StoreManager.props(3, itemProvider), STORE_MANAGER)
context.actorOf(UserManager.props(), USER_MANAGER)
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ => SupervisorStrategy.restart
}
override def receive: Receive = {
case IntroduceUserManagerReq =>
val possibleRef = context.child(USER_MANAGER)
possibleRef match {
case Some(ref) => sender ! IntroductionResponse(ref)
case None => throw new IllegalStateException("User manager actor ref does not exist")
}
case IntroduceStoreManagerReq =>
val possibleRef = context.child(STORE_MANAGER)
possibleRef match {
case Some(ref) => sender ! IntroductionResponse(ref)
case None => throw new IllegalStateException("Store manager actor ref does not exist")
}
}
}
case object IntroduceUserManagerReq
case object IntroduceStoreManagerReq
final case class IntroductionResponse(ref: ActorRef)
开发者ID:hvieira,项目名称:ye-olde-online-store-akka,代码行数:48,代码来源:CriticalProcessesManager.scala
示例10: SuppervisedFsu
//设置package包名称以及导入依赖的类
package com.wincom.dcim.sharded
import akka.actor.{Actor, ActorInitializationException, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy}
class SuppervisedFsu extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: IllegalArgumentException ? SupervisorStrategy.Restart
case _: ActorInitializationException ? SupervisorStrategy.Restart
case _: DeathPactException ? SupervisorStrategy.Restart
case _: Exception ? SupervisorStrategy.Restart
}
val fsuActor = context.actorOf(Props[FsuActor], s"${self.path.name}")
def receive = {
case msg ? fsuActor forward msg
}
}
开发者ID:xtwxy,项目名称:akka-tests,代码行数:19,代码来源:SuppervisedFsu.scala
示例11: CrookedForeman
//设置package包名称以及导入依赖的类
package com.example.coalmine
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated}
import com.example.coalmine.Miner.{DoValuableWork, WorkDone}
object CrookedForeman {
val props = Props[CrookedForeman]
case object HireMiners
case object BerateMiners
case class DeathNotice(miner: ActorRef)
}
class CrookedForeman extends Actor with ActorLogging {
import CrookedForeman._
// We'll learn more about supervisor strategies next week... for now we
// install this one so that dead children stay dead, to make our point.
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => Stop
}
override def receive: Receive = {
case HireMiners => 1 to 5 foreach { i => {
val miner = context.actorOf(Miner.props,
s"Miner-no-$i")
log.info(s"Hiring miner $i")
context.watch(miner) }
}
case BerateMiners =>
val numMiners = context.children.size
log.info(s"I have $numMiners miner children, at your service! Berating!")
context.children.foreach { miner => miner ! DoValuableWork }
case WorkDone(v) => context.parent forward WorkDone(v)
case Terminated(miner) =>
log.info(s"Oh dear, $miner has died!")
context.parent ! DeathNotice(miner)
}
}
开发者ID:dkmn,项目名称:week-2-lecture-examples,代码行数:47,代码来源:CrookedForeman.scala
示例12: Notifier
//设置package包名称以及导入依赖的类
package auctionsystem
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.event.LoggingReceive
import akka.actor.Props
import auctionsystem.Notifier.Notification
import auctionsystem.Notifier.RepeatRequest
import auctionsystem.exceptions.RemoteServerErrorException
import akka.actor.SupervisorStrategy.Resume
import akka.actor.SupervisorStrategy.Stop
class Notifier extends Actor {
val auctionPublisher = context.actorSelection("akka.tcp://[email protected]:2552/user/auctionPublisher")
var requestsMap = Map [ActorRef, Notification]()
var idx = 0;
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3) {
case _: RemoteServerErrorException =>
println(s"An error occurred when trying to connect with remote server, node that caused failure: ${sender.path}. Resuming...")
self ! RepeatRequest(requestsMap(sender), sender)
Resume
case e =>
println(s"Something else went wrong: $e")
Stop
}
def receive = LoggingReceive {
case Notification(auction, highestBidder, highestBid) =>
val notifierRequest = context.actorOf(Props(new NotifierRequest(auctionPublisher)), "notifierRequest" + idx);
idx = idx + 1
val notification = Notification(auction, highestBidder, highestBid)
requestsMap += (notifierRequest -> notification)
notifierRequest ! notification
}
}
object Notifier {
final case class RepeatRequest(notification: Notification, notifierRequest: ActorRef)
final case class Notification(auctionName: String, highestBidder: ActorRef, highestBid: Int);
}
开发者ID:Stawicki,项目名称:reactive-scala,代码行数:44,代码来源:Notifier.scala
示例13: SearchDatabaseActor
//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import scala.concurrent.duration.Duration
import scala.io.Source._
class SearchDatabaseActor(filename: String) extends Actor {
override def receive: Receive = {
case Search(title) =>
val lines = fromFile(filename).getLines
sender ! SearchResponse(lines.filter(s => s.startsWith(title)).toArray)
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Stop
}
}
case class SearchResponse(books: Array[String])
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:21,代码来源:SearchDatabaseActor.scala
示例14: ClientActor
//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import scala.concurrent.duration._
class ClientActor extends Actor {
override def receive: Receive = {
case Ordered =>
println("\033[33m:: Book was ordered\033[0m")
case Searched(titles, prices) =>
for (i <- titles.indices)
println("\033[33m:: " + s"'${titles(i)}' costs ${prices(i)} PLN" + "\033[0m")
case SearchError(message) =>
println("\033[33m:: Search failed in database: " + message + "\033[0m")
case StreamLine(line) =>
println("\033[37m" + line + "\033[0m")
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Resume
}
}
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:23,代码来源:ClientActor.scala
示例15: StreamActor
//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import scala.concurrent.duration.Duration
import scala.io.Source.fromFile
class StreamActor extends Actor {
override def receive: Receive = {
case StreamBook(filename) =>
val materializer = ActorMaterializer.create(context) // Materializing and running a stream always requires a Materializer to be in implicit scope.
val sink = Source
.actorRef(1000, OverflowStrategy.dropNew) // If the buffer is full when a new element arrives, drops the new element.
.throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping) // throttle - to slow down the stream to 1 element per second.
.to(Sink.actorRef(sender, NotUsed)) // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
.run()(materializer)
val lines = fromFile(filename).getLines
lines.foreach(line => sink ! StreamLine(line))
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Restart
}
}
case class StreamBook(fileName: String)
case class StreamLine(line: String)
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala
示例16: OrderActor
//设置package包名称以及导入依赖的类
import java.io.FileWriter
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import scala.concurrent.duration.Duration
class OrderActor() extends Actor {
override def receive: Receive = {
case Order(title: String) =>
val currentSender = sender
orderSingleton.orderBook(title)
currentSender ! Ordered
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Escalate // because of other threads
}
}
case class Order(title: String)
case class Ordered()
object orderSingleton {
def orderBook(title: String): Unit = {
new FileWriter("orders.txt", true) { // append to end of file
write(s"$title\n")
close()
}
}
}
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:33,代码来源:OrderActor.scala
示例17: AccountSupervisor
//设置package包名称以及导入依赖的类
package sample.blog
import akka.actor.{Actor, ActorInitializationException, ActorLogging, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy, Terminated}
import scala.concurrent.duration._
class AccountSupervisor extends Actor with ActorLogging {
val counter = context.actorOf(Props[AccountEntity], "account-")
context.watch(counter)
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2) {
case _: IllegalArgumentException ? SupervisorStrategy.Resume
case _: ActorInitializationException ? SupervisorStrategy.Stop
case _: DeathPactException ? SupervisorStrategy.Stop
case _: Exception ? SupervisorStrategy.Restart
}
def receive = {
case x:Terminated =>
log.info(s"The child ${sender} terminated due to - ${x}")
case msg ? {
log.info(s"Supervisor of this message ${msg} - ${self.path}")
counter forward msg
}
}
}
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:29,代码来源:AccountSupervisor.scala
示例18: Server
//设置package包名称以及导入依赖的类
import akka.actor.{Actor, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy._
import scala.collection.mutable
class Server extends Actor {
import Listas._
override def supervisorStrategy = OneForOneStrategy() {
case _: Exception => Resume
}
lazy val usuario = context.actorOf(Props(new Usuario), "usuario")
def receive = {
case MensajeEnviado(id, to, asunto, msg) => {
usuario ! MensajeRecibido(to, id, asunto, msg)
println("Se envío el correo a: " + to)
}
case ValidarCorreos(id, to) => {
Server.existeCorreo(id).fold(l => Server.existeCorreo(to).fold(l2 => sender() ! "", r2 => sender() ! s"No existe el correo ${r2} al que se le va a enviar el mensaje"),
r => sender() ! s"No existe el correo $r quien es el que envia el mensaje")
}
case ConsultarMail(m) =>{
println(s"El usuario $m tiene ${listMailRec.filter(x => x.id == m).size} correos que son: ")
sender() ! listMailRec.filter(x => x.id == m)
}
case CrearMail(mail) => {
val valiMail = Server.validarCorreo(mail)
valiMail.fold(l => sender() ! s"El usuario $l no es valido, no se puede crear!!",
r => Server.existeCorreo(r).fold(l2 => sender() ! s"el usuario ${l2} ya existe!!", r2 => (Server.listCorreos += r2, sender() ! s"Se creó el usuario ${r2}!!")))
}
case ErrorEnviarMensaje(mail) => usuario ! ErrorEnviarMensaje(mail)
}
}
object Server {
val mailList = List("seven4n", "gmail", "hotmail", "yahoo")
val listCorreos: mutable.MutableList[String] = mutable.MutableList()
def validarCorreo(correo:String): Either[String, String] = {
val co = correo.split("@")(1).split('.')(0)
if(mailList.contains(co))
Right(correo)
else
Left(correo)
}
def existeCorreo(correo:String): Either[String, String] = {
if(Server.listCorreos.contains(correo))
Left(correo)
else
Right(correo)
}
}
开发者ID:DIGelvezL,项目名称:Email,代码行数:60,代码来源:Server.scala
示例19: SimplifiedTweetProcessorActor
//设置package包名称以及导入依赖的类
package org.eigengo.rsa.ingest.v100
import java.util.UUID
import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.ActorMaterializer
import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord, KafkaSerializer}
import com.google.protobuf.ByteString
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
import org.eigengo.rsa.Envelope
object SimplifiedTweetProcessorActor {
def props(config: Config): Props = {
val producerConf = KafkaProducer.Conf(
config.getConfig("tweet-image-producer"),
new StringSerializer,
KafkaSerializer[Envelope](_.toByteArray)
)
Props(classOf[SimplifiedTweetProcessorActor], producerConf)
}
}
class SimplifiedTweetProcessorActor(producerConf: KafkaProducer.Conf[String, Envelope]) extends Actor {
private[this] val producer = KafkaProducer(conf = producerConf)
implicit val _ = ActorMaterializer()
import scala.concurrent.duration._
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10.seconds) {
case _ ? SupervisorStrategy.Restart
}
override def receive: Receive = {
case TweetImage(handle, content) ?
producer.send(KafkaProducerRecord("tweet-image", handle,
Envelope(version = 100,
handle = handle,
ingestionTimestamp = System.nanoTime(),
processingTimestamp = System.nanoTime(),
messageId = UUID.randomUUID().toString,
correlationId = UUID.randomUUID().toString,
payload = content)))
case SimplifiedTweet(handle, mediaUrls) ?
mediaUrls.foreach { mediaUrl ?
import context.dispatcher
val request = HttpRequest(method = HttpMethods.GET, uri = Uri(mediaUrl))
val timeout = 1000.millis
Http(context.system).singleRequest(request).flatMap(_.entity.toStrict(timeout)).foreach { entity ?
self ! TweetImage(handle, ByteString.copyFrom(entity.data.toArray))
}
}
}
}
开发者ID:eigengo,项目名称:reactive-summit-2016,代码行数:58,代码来源:SimplifiedTweetProcessorActor.scala
示例20: ExceptionDetector
//设置package包名称以及导入依赖的类
package me.invkrh.raft.kit
import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy.Restart
class ExceptionDetector(actorName: String, probes: ActorRef*) extends Actor {
override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
case thr: Throwable =>
probes foreach { _ ! thr }
Restart // or make it configurable/controllable during the test
}
def receive: PartialFunction[Any, Unit] = {
case p: Props => sender ! context.actorOf(p, actorName)
}
}
开发者ID:invkrh,项目名称:akka-raft,代码行数:16,代码来源:ExceptionDetector.scala
注:本文中的akka.actor.OneForOneStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论