• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala OneForOneStrategy类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.OneForOneStrategy的典型用法代码示例。如果您正苦于以下问题:Scala OneForOneStrategy类的具体用法?Scala OneForOneStrategy怎么用?Scala OneForOneStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了OneForOneStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ClientActor

//设置package包名称以及导入依赖的类
package org.http4s.akka

import scala.reflect.ClassTag

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, Props, Terminated}
import fs2.Task
import fs2.async.mutable.{Queue, Signal}

private class ClientActor[Out](props: Props, outQueue: Queue[Task, Out], closeSignal: Signal[Task, Boolean])
                              (implicit messageType: ClassTag[Out]) extends Actor {
  val serverActor = context actorOf props
  context watch serverActor
  
  def receive: Receive = {
    case Terminated(`serverActor`) =>
      closeSignal.set(true).unsafeRun()
      context stop self
    case messageType(m) if sender() == serverActor =>
      outQueue.enqueue1(m).unsafeRun()
    case m if sender() == serverActor =>
      org.log4s.getLogger.error(s"Server sent unhandled message ${m.getClass.getSimpleName} " +
        s"expecting a ${messageType.runtimeClass.getSimpleName}!")
    case m if sender() == self =>
      serverActor ! m
  }
  
  override def supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
    case _ => Stop
  }
} 
开发者ID:Lasering,项目名称:http4s-akka,代码行数:32,代码来源:ClientActor.scala


示例2: ClusterRouterSupervisorSpec

//设置package包名称以及导入依赖的类
package akka.cluster.oldrouting

import akka.testkit._
import akka.actor._
import akka.routing.RoundRobinRouter
import akka.actor.OneForOneStrategy
import akka.cluster.routing._

object ClusterRouterSupervisorSpec {

  class KillableActor(testActor: ActorRef) extends Actor {

    def receive = {
      case "go away" ?
        throw new IllegalArgumentException("Goodbye then!")
    }

  }

}

class ClusterRouterSupervisorSpec extends AkkaSpec("""
  akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
  akka.remote.netty.tcp.port = 0
""") {

  import ClusterRouterSupervisorSpec._

  "Cluster aware routers" must {

    "use provided supervisor strategy" in {
      val router = system.actorOf(Props(classOf[KillableActor], testActor).withRouter(
        ClusterRouterConfig(RoundRobinRouter(supervisorStrategy = OneForOneStrategy() {
          case _ ?
            testActor ! "supervised"
            SupervisorStrategy.Stop
        }), ClusterRouterSettings(
          totalInstances = 1,
          maxInstancesPerNode = 1,
          allowLocalRoutees = true,
          useRole = None))), name = "therouter")

      router ! "go away"
      expectMsg("supervised")
    }

  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:ClusterRouterSupervisorSpec.scala


示例3: CustomOneForOneUser

//设置package包名称以及导入依赖的类
package com.example.supervisor

import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy}


class CustomOneForOneUser extends Actor with ActorLogging {

  // Map that keeps track of how often a given child has been restarted
  var restarts = Map.empty[ActorRef, Int].withDefaultValue(0)

  // Using the default parameters for a SupervisorStrategy mean "arbitarily
  // often over forever"
  override val supervisorStrategy = OneForOneStrategy() {

    case _: ArithmeticException =>

      restarts(sender) match {
        case tooManyRestarts if tooManyRestarts > 15 =>
          restarts -= sender
          Stop
        case n =>
          restarts = restarts.updated(sender, n+1)
          Restart
      }
  }

  override def receive: Receive = ???
} 
开发者ID:dkmn,项目名称:week-3-lecture-examples,代码行数:30,代码来源:CustomOneForOneUser.scala


示例4: TelemetryDumperSupervisor

//设置package包名称以及导入依赖的类
package sample.kamon

import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props}
import sample.kamon.TelemetryDumperSupervisor.Saved

import scala.util.Random

class TelemetryDumperSupervisor extends Actor with ActorLogging {

  log.info("TelemetryDumperSupervisor started!")

  override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
    case e: Exception =>
      log.error(e, "There was an error when trying to save telemetry data, restarting.")
      Restart
  }

  def receive: Receive = {
    case telemetry: Int =>
      Thread.sleep(40 * Random.nextInt(10))
      context.actorOf(TelemetryDumper.props(new SimpleBackoffStrategy, telemetry))
    case Saved(telemetry) =>
    //Make some post process
  }
}

object TelemetryDumperSupervisor {
  def props(): Props = Props(classOf[TelemetryDumperSupervisor])

  case class Saved(telemetry: Int)
} 
开发者ID:frossi85,项目名称:akka-kamon-sample,代码行数:33,代码来源:TelemetryDumperSupervisor.scala


示例5: Supervisor

//设置package包名称以及导入依赖的类
package akka_in_action.supervisor

import scala.concurrent.duration._
import akka.actor.{Actor, ActorLogging, ActorSystem, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy._

class Supervisor extends Actor with ActorLogging {
  override def receive: Receive = {
    case "spawn" => context.actorOf(Props[Worker])
  }

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 30 seconds) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }
}

class Worker extends Actor with ActorLogging {
  override def receive: Receive = ???
}

object StrategyApp extends App {
  implicit val system = ActorSystem("actor-system")
  import system.dispatcher

  system.actorOf(Props[Supervisor], "supervisor")
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:31,代码来源:StrategyApp.scala


示例6: Supervisor

//设置package包名称以及导入依赖的类
package io.ticofab.scalarabbitmqexample.actor

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, Props}
import io.ticofab.scalarabbitmqexample.actor.QueueListener.{CloseYourEars, Listen}
import io.ticofab.scalarabbitmqexample.actor.Supervisor.{Begin, End}

object Supervisor {

  case object Begin

  case object End

  def props = Props[Supervisor]
}

class Supervisor extends Actor {
  val queueListener = context.actorOf(QueueListener.props)

  // very simple supervision strategy: if anything happens, stop the actor
  override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
    case _: Exception => Stop
  }

  override def receive: Receive = {
    case Begin => queueListener ! Listen
    case End => queueListener ! CloseYourEars
  }
} 
开发者ID:ticofab,项目名称:Scala-RabbitMQ-Example,代码行数:30,代码来源:Supervisor.scala


示例7: CreateCheckingsAccountResponse

//设置package包名称以及导入依赖的类
package com.franklevering.ports.adapters.http.request.handlers

import java.util.UUID

import akka.actor.SupervisorStrategy._
import akka.actor.{OneForOneStrategy, Props}
import akka.http.scaladsl.model.StatusCodes
import com.franklevering.banking.domain.model.account.Account
import com.franklevering.banking.domain.model.account.CreateCheckingsAccount
import com.franklevering.ports.adapters.http.ImperativeRequestContext
import com.franklevering.ports.adapters.http.request.Request

import scala.concurrent.duration._

case class CreateCheckingsAccountResponse(id: String)

class AccountRequestHandler(ctx: ImperativeRequestContext) extends Request(ctx) {

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  def receive: Receive = {
    case createCheckingsAccount: CreateCheckingsAccount =>
      val account = context.actorOf(Props(classOf[Account], UUID.randomUUID()))
      account ! createCheckingsAccount
    case createCheckingsAccountResponse: CreateCheckingsAccountResponse =>
      ctx.complete(StatusCodes.Created, createCheckingsAccountResponse)
      context.stop(self)
  }
} 
开发者ID:frankieleef,项目名称:banking,代码行数:36,代码来源:AccountRequestHandler.scala


示例8:

//设置package包名称以及导入依赖的类
package actors.workflow

import actors.WorkflowLog.LogMessage
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorLogging, OneForOneStrategy}
import com.amazonaws.{AmazonClientException, AmazonServiceException}

import scala.concurrent.duration._

trait AWSSupervisorStrategy extends Actor with ActorLogging {
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 20, loggingEnabled = true) {
    case ex: AmazonServiceException =>
      ex.getErrorCode match {
        case "ServiceUnavailable" | "Throttling" =>
          log.debug("Supervisor Authorized Restart")
          Restart
        case _ =>
          context.parent ! LogMessage(ex.toString)
          Stop
      }
      
    case _: AmazonClientException =>
      log.debug("Supervisor Authorized Restart")
      Restart

    case ex: Exception =>
      context.parent ! LogMessage(ex.toString)
      Stop
  }
} 
开发者ID:lifeway,项目名称:Chadash,代码行数:31,代码来源:AWSSupervisorStrategy.scala


示例9: CriticalProcessesManager

//设置package包名称以及导入依赖的类
package net.hvieira.yeoldeonlinestore.actor

import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy}
import net.hvieira.yeoldeonlinestore.actor.CriticalProcessesManager._
import net.hvieira.yeoldeonlinestore.actor.store.StoreManager
import net.hvieira.yeoldeonlinestore.actor.user.UserManager
import net.hvieira.yeoldeonlinestore.api.Item

object CriticalProcessesManager {
  private val STORE_MANAGER = "store-manager"
  private val USER_MANAGER = "user-manager"

  def props(itemProvider: () => Iterable[Item]) = Props(new CriticalProcessesManager(itemProvider))
}

class CriticalProcessesManager(private val itemProvider: () => Iterable[Item]) extends Actor {

  override def preStart(): Unit = {
    context.actorOf(StoreManager.props(3, itemProvider), STORE_MANAGER)
    context.actorOf(UserManager.props(), USER_MANAGER)
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case _ => SupervisorStrategy.restart
  }

  override def receive: Receive = {
    case IntroduceUserManagerReq =>
      val possibleRef = context.child(USER_MANAGER)
      possibleRef match {
        case Some(ref) => sender ! IntroductionResponse(ref)
        case None => throw new IllegalStateException("User manager actor ref does not exist")
      }

    case IntroduceStoreManagerReq =>
      val possibleRef = context.child(STORE_MANAGER)
      possibleRef match {
        case Some(ref) => sender ! IntroductionResponse(ref)
        case None => throw new IllegalStateException("Store manager actor ref does not exist")
      }
  }
}

case object IntroduceUserManagerReq
case object IntroduceStoreManagerReq

final case class IntroductionResponse(ref: ActorRef) 
开发者ID:hvieira,项目名称:ye-olde-online-store-akka,代码行数:48,代码来源:CriticalProcessesManager.scala


示例10: SuppervisedFsu

//设置package包名称以及导入依赖的类
package com.wincom.dcim.sharded

import akka.actor.{Actor, ActorInitializationException, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy}


class SuppervisedFsu extends Actor {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: IllegalArgumentException ? SupervisorStrategy.Restart
    case _: ActorInitializationException ? SupervisorStrategy.Restart
    case _: DeathPactException ? SupervisorStrategy.Restart
    case _: Exception ? SupervisorStrategy.Restart
  }
  val fsuActor = context.actorOf(Props[FsuActor], s"${self.path.name}")

  def receive = {
    case msg ? fsuActor forward msg
  }
} 
开发者ID:xtwxy,项目名称:akka-tests,代码行数:19,代码来源:SuppervisedFsu.scala


示例11: CrookedForeman

//设置package包名称以及导入依赖的类
package com.example.coalmine

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated}
import com.example.coalmine.Miner.{DoValuableWork, WorkDone}



object CrookedForeman {
  val props = Props[CrookedForeman]

  case object HireMiners
  case object BerateMiners
  case class DeathNotice(miner: ActorRef)
}

class CrookedForeman extends Actor with ActorLogging {
  import CrookedForeman._

  // We'll learn more about supervisor strategies next week... for now we
  // install this one so that dead children stay dead, to make our point.
  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => Stop
  }

  override def receive: Receive = {

    case HireMiners => 1 to 5 foreach { i => {
                                        val miner = context.actorOf(Miner.props,
                                          s"Miner-no-$i")
                                        log.info(s"Hiring miner $i")
                                        context.watch(miner) }
                                      }

    case BerateMiners =>
      val numMiners = context.children.size
      log.info(s"I have $numMiners miner children, at your service! Berating!")
      context.children.foreach { miner => miner ! DoValuableWork }

    case WorkDone(v) => context.parent forward WorkDone(v)

    case Terminated(miner) =>
      log.info(s"Oh dear, $miner has died!")
      context.parent ! DeathNotice(miner)
  }
} 
开发者ID:dkmn,项目名称:week-2-lecture-examples,代码行数:47,代码来源:CrookedForeman.scala


示例12: Notifier

//设置package包名称以及导入依赖的类
package auctionsystem

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.event.LoggingReceive
import akka.actor.Props
import auctionsystem.Notifier.Notification
import auctionsystem.Notifier.RepeatRequest
import auctionsystem.exceptions.RemoteServerErrorException
import akka.actor.SupervisorStrategy.Resume
import akka.actor.SupervisorStrategy.Stop

class Notifier extends Actor {
  val auctionPublisher = context.actorSelection("akka.tcp://[email protected]:2552/user/auctionPublisher")
  var requestsMap = Map [ActorRef, Notification]()
  var idx = 0;
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3) {
     case _: RemoteServerErrorException =>
       println(s"An error occurred when trying to connect with remote server, node that caused failure: ${sender.path}. Resuming...")
       self ! RepeatRequest(requestsMap(sender), sender)
       Resume
 
     case e =>
       println(s"Something else went wrong: $e")
       Stop
  }
  
  def receive = LoggingReceive {
    case Notification(auction, highestBidder, highestBid) =>
      val notifierRequest = context.actorOf(Props(new NotifierRequest(auctionPublisher)), "notifierRequest" + idx);      
      idx = idx + 1
      val notification = Notification(auction, highestBidder, highestBid)
      requestsMap += (notifierRequest -> notification)
      notifierRequest ! notification
  }
}

object Notifier {
  final case class RepeatRequest(notification: Notification, notifierRequest: ActorRef)
  final case class Notification(auctionName: String, highestBidder: ActorRef, highestBid: Int);
} 
开发者ID:Stawicki,项目名称:reactive-scala,代码行数:44,代码来源:Notifier.scala


示例13: SearchDatabaseActor

//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}

import scala.concurrent.duration.Duration
import scala.io.Source._

class SearchDatabaseActor(filename: String) extends Actor {
  override def receive: Receive = {
    case Search(title) =>
      val lines = fromFile(filename).getLines
      sender ! SearchResponse(lines.filter(s => s.startsWith(title)).toArray)
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Stop
    }
}

case class SearchResponse(books: Array[String]) 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:21,代码来源:SearchDatabaseActor.scala


示例14: ClientActor

//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}

import scala.concurrent.duration._

class ClientActor extends Actor {
  override def receive: Receive = {
    case Ordered =>
      println("\033[33m:: Book was ordered\033[0m")
    case Searched(titles, prices) =>
      for (i <- titles.indices)
        println("\033[33m:: " + s"'${titles(i)}' costs ${prices(i)} PLN" + "\033[0m")
    case SearchError(message) =>
      println("\033[33m:: Search failed in database: " + message + "\033[0m")
    case StreamLine(line) =>
      println("\033[37m" + line + "\033[0m")
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(10, Duration(60, "seconds")) {
    case _: Exception => Resume
  }
} 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:23,代码来源:ClientActor.scala


示例15: StreamActor

//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}

import scala.concurrent.duration.Duration
import scala.io.Source.fromFile

class StreamActor extends Actor {
  override def receive: Receive = {
    case StreamBook(filename) =>
      val materializer = ActorMaterializer.create(context)  // Materializing and running a stream always requires a Materializer to be in implicit scope.
      val sink = Source
        .actorRef(1000, OverflowStrategy.dropNew)           // If the buffer is full when a new element arrives, drops the new element.
        .throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping)   // throttle - to slow down the stream to 1 element per second.
        .to(Sink.actorRef(sender, NotUsed))                 // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
        .run()(materializer)
      val lines = fromFile(filename).getLines
      lines.foreach(line => sink ! StreamLine(line))
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Restart
    }
}

case class StreamBook(fileName: String)
case class StreamLine(line: String) 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala


示例16: OrderActor

//设置package包名称以及导入依赖的类
import java.io.FileWriter

import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}

import scala.concurrent.duration.Duration

class OrderActor() extends Actor {
  override def receive: Receive = {
    case Order(title: String) =>
      val currentSender = sender
      orderSingleton.orderBook(title)
      currentSender ! Ordered
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Escalate // because of other threads
    }
}

case class Order(title: String)
case class Ordered()

object orderSingleton {
  def orderBook(title: String): Unit = {
    new FileWriter("orders.txt", true) { // append to end of file
      write(s"$title\n")
      close()
    }
  }
} 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:33,代码来源:OrderActor.scala


示例17: AccountSupervisor

//设置package包名称以及导入依赖的类
package sample.blog

import akka.actor.{Actor, ActorInitializationException, ActorLogging, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy, Terminated}
import scala.concurrent.duration._



class AccountSupervisor extends Actor with ActorLogging {
  val counter = context.actorOf(Props[AccountEntity], "account-")

  context.watch(counter)

  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2) {
    case _: IllegalArgumentException     ? SupervisorStrategy.Resume
    case _: ActorInitializationException ? SupervisorStrategy.Stop
    case _: DeathPactException           ? SupervisorStrategy.Stop
    case _: Exception                    ? SupervisorStrategy.Restart
  }

  def receive = {
    case x:Terminated =>
      log.info(s"The child ${sender} terminated due to - ${x}")
    case msg ? {
      log.info(s"Supervisor of this message ${msg} - ${self.path}")
      counter forward msg
    }
  }
} 
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:29,代码来源:AccountSupervisor.scala


示例18: Server

//设置package包名称以及导入依赖的类
import akka.actor.{Actor, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy._


import scala.collection.mutable




class Server extends Actor {
  import Listas._

  override def supervisorStrategy = OneForOneStrategy() {
    case _: Exception => Resume
  }

  lazy val usuario = context.actorOf(Props(new Usuario), "usuario")

  def receive = {
    case MensajeEnviado(id, to, asunto, msg) => {
      usuario ! MensajeRecibido(to, id, asunto, msg)
      println("Se envío el correo a: " + to)
    }
    case ValidarCorreos(id, to) => {
      Server.existeCorreo(id).fold(l => Server.existeCorreo(to).fold(l2 => sender() ! "", r2 => sender() ! s"No existe el correo ${r2} al que se le va a enviar el mensaje"),
        r => sender() ! s"No existe el correo $r quien es el que envia el mensaje")
    }
    case ConsultarMail(m) =>{
      println(s"El usuario $m tiene ${listMailRec.filter(x => x.id == m).size} correos que son: ")
      sender() ! listMailRec.filter(x => x.id == m)
    }
    case CrearMail(mail) => {
      val valiMail = Server.validarCorreo(mail)
      valiMail.fold(l => sender() ! s"El usuario $l no es valido, no se puede crear!!",
        r => Server.existeCorreo(r).fold(l2 => sender() ! s"el usuario ${l2} ya existe!!", r2 => (Server.listCorreos += r2, sender() ! s"Se creó el usuario ${r2}!!")))
    }
    case ErrorEnviarMensaje(mail) => usuario ! ErrorEnviarMensaje(mail)
  }
}

object Server {
  val mailList = List("seven4n", "gmail", "hotmail", "yahoo")
  val listCorreos: mutable.MutableList[String] = mutable.MutableList()

  def validarCorreo(correo:String): Either[String, String] = {
    val co = correo.split("@")(1).split('.')(0)
    if(mailList.contains(co))
      Right(correo)
    else
      Left(correo)
  }

  def existeCorreo(correo:String): Either[String, String] = {
    if(Server.listCorreos.contains(correo))
      Left(correo)
    else
      Right(correo)
  }
} 
开发者ID:DIGelvezL,项目名称:Email,代码行数:60,代码来源:Server.scala


示例19: SimplifiedTweetProcessorActor

//设置package包名称以及导入依赖的类
package org.eigengo.rsa.ingest.v100

import java.util.UUID

import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.ActorMaterializer
import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord, KafkaSerializer}
import com.google.protobuf.ByteString
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
import org.eigengo.rsa.Envelope

object SimplifiedTweetProcessorActor {

  def props(config: Config): Props = {
    val producerConf = KafkaProducer.Conf(
      config.getConfig("tweet-image-producer"),
      new StringSerializer,
      KafkaSerializer[Envelope](_.toByteArray)
    )
    Props(classOf[SimplifiedTweetProcessorActor], producerConf)
  }
}

class SimplifiedTweetProcessorActor(producerConf: KafkaProducer.Conf[String, Envelope]) extends Actor {
  private[this] val producer = KafkaProducer(conf = producerConf)
  implicit val _ = ActorMaterializer()

  import scala.concurrent.duration._
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10.seconds) {
    case _ ? SupervisorStrategy.Restart
  }

  override def receive: Receive = {
    case TweetImage(handle, content) ?
      producer.send(KafkaProducerRecord("tweet-image", handle,
        Envelope(version = 100,
          handle = handle,
          ingestionTimestamp = System.nanoTime(),
          processingTimestamp = System.nanoTime(),
          messageId = UUID.randomUUID().toString,
          correlationId = UUID.randomUUID().toString,
          payload = content)))
    case SimplifiedTweet(handle, mediaUrls) ?
      mediaUrls.foreach { mediaUrl ?
        import context.dispatcher
        val request = HttpRequest(method = HttpMethods.GET, uri = Uri(mediaUrl))
        val timeout = 1000.millis
        Http(context.system).singleRequest(request).flatMap(_.entity.toStrict(timeout)).foreach { entity ?
          self ! TweetImage(handle, ByteString.copyFrom(entity.data.toArray))
        }
      }
  }

} 
开发者ID:eigengo,项目名称:reactive-summit-2016,代码行数:58,代码来源:SimplifiedTweetProcessorActor.scala


示例20: ExceptionDetector

//设置package包名称以及导入依赖的类
package me.invkrh.raft.kit

import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props}
import akka.actor.SupervisorStrategy.Restart

class ExceptionDetector(actorName: String, probes: ActorRef*) extends Actor {
  override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
    case thr: Throwable =>
      probes foreach { _ ! thr }
      Restart // or make it configurable/controllable during the test
  }
  def receive: PartialFunction[Any, Unit] = {
    case p: Props => sender ! context.actorOf(p, actorName)
  }
} 
开发者ID:invkrh,项目名称:akka-raft,代码行数:16,代码来源:ExceptionDetector.scala



注:本文中的akka.actor.OneForOneStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ShuffledRDD类代码示例发布时间:2022-05-23
下一篇:
Scala KafkaStream类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap