本文整理汇总了Scala中akka.pattern.BackoffSupervisor类的典型用法代码示例。如果您正苦于以下问题:Scala BackoffSupervisor类的具体用法?Scala BackoffSupervisor怎么用?Scala BackoffSupervisor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了BackoffSupervisor类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Main
//设置package包名称以及导入依赖的类
package com.example
import akka.actor.{ActorSystem,Props}
import akka.pattern.{Backoff,BackoffSupervisor}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import kamon.Kamon
object Main {
def main(args: Array[String]): Unit = {
Kamon.start()
if (args.isEmpty || args.head == "Server")
startServerSystem()
if (args.isEmpty || args.head == "Writer")
startWriterSystem()
if (args.isEmpty || args.head == "Reader")
startReaderSystem()
}
def startServerSystem() = {
val system = ActorSystem("ServerSystem", ConfigFactory.load("server"))
import system.dispatcher
val serverActor = supervise(system, ServerActor.props, "serverActor")
system.scheduler.schedule(1 second, 1 seconds){serverActor ! "print"}
}
def startWriterSystem() = {
val config = ConfigFactory.load("writer")
val serverHost = config.getString("serverHost")
val serverPort = config.getInt("serverPort")
val system = ActorSystem("WriterSystem", config)
val serverSelection = system.actorSelection(s"akka.tcp://[email protected]$serverHost:$serverPort/user/serverActor")
val writerActor = supervise(system, WriterActor.props(serverSelection), "writerActor")
}
def startReaderSystem() = {
val config = ConfigFactory.load("reader")
val serverHost = config.getString("serverHost")
val serverPort = config.getInt("serverPort")
val system = ActorSystem("ReaderSystem", config)
import system.dispatcher
val serverSelection = system.actorSelection(s"akka.tcp://[email protected]$serverHost:$serverPort/user/serverActor")
val readerActor = supervise(system, ReaderActor.props(serverSelection, 1000), "readerActor")
system.scheduler.schedule(1 second, 1 seconds){readerActor ! "print"}
}
def supervise(system: ActorSystem, childProps: Props, name: String) = {
val supervisor = BackoffSupervisor.props(
Backoff.onStop(
childProps,
childName = name,
minBackoff = 3 seconds,
maxBackoff = 30 seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
))
system.actorOf(supervisor, name = name)
}
}
开发者ID:enear,项目名称:server-writer-reader,代码行数:60,代码来源:ApplicationMain.scala
示例2: Main
//设置package包名称以及导入依赖的类
package sample.kamon
import akka.actor._
import akka.pattern.{Backoff, BackoffSupervisor}
import kamon.Kamon
import scala.concurrent.duration._
object Main extends App {
Kamon.start()
val system = ActorSystem("application")
val actorProps = Props[LongConsumer]
val supervisor = BackoffSupervisor.props(
Backoff.onStop(
actorProps,
childName = "telemetry-consumer",
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
).withSupervisorStrategy(
OneForOneStrategy() {
case ex =>
system.log.error(ex, "There was an error in KafkaActor")
SupervisorStrategy.Restart //Here we can add some log or send a notification
})
)
system.actorOf(supervisor)
}
开发者ID:frossi85,项目名称:akka-kamon-sample,代码行数:33,代码来源:Main.scala
示例3: SimulateWindTurbines
//设置package包名称以及导入依赖的类
package sample.stream_actor
import akka.actor.ActorSystem
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import sample.WindTurbineSimulator
import scala.concurrent.duration._
object SimulateWindTurbines extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val endpoint = "ws://127.0.0.1:8080"
val numberOfTurbines = 5
Source(1 to numberOfTurbines)
.throttle(
elements = 100, //number of elements to be taken from bucket
per = 1.second,
maximumBurst = 100, //capacity of bucket
mode = ThrottleMode.shaping
)
.map { _ =>
val id = java.util.UUID.randomUUID.toString
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
WindTurbineSimulator.props(id, endpoint),
childName = id,
minBackoff = 1.second,
maxBackoff = 30.seconds,
randomFactor = 0.2
))
system.actorOf(supervisor, name = s"$id-backoff-supervisor")
}
.runWith(Sink.ignore)
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:41,代码来源:SimulateWindTurbines.scala
示例4: BootstrapActor
//设置package包名称以及导入依赖的类
package com.microworkflow
import akka.actor.{Actor, Props}
import akka.pattern.BackoffSupervisor
import akka.stream.{ActorMaterializer, Materializer}
import scala.concurrent.duration._
class BootstrapActor private(propsBuilder: Materializer ? Props) extends Actor {
override def receive: Receive = {
case _ ?
}
implicit val materializer = ActorMaterializer()
val supervisorProps =
BackoffSupervisor.props(
propsBuilder(materializer)
, "rest-router"
, minBackoff = 5.seconds
, maxBackoff = 50.seconds
, randomFactor = 0.11
)
context.actorOf(supervisorProps)
}
object BootstrapActor {
def props(propsBuilder: Materializer ? Props): Props =
Props(classOf[BootstrapActor], propsBuilder)
}
开发者ID:polymorphic,项目名称:akka-http-svc.g8,代码行数:32,代码来源:BootstrapActor.scala
示例5:
//设置package包名称以及导入依赖的类
package org.dohrm.storyline.users
import java.util.concurrent.TimeUnit
import akka.actor.Props
import akka.pattern.BackoffSupervisor
import org.dohrm.auth0.directives.Auth0
import org.dohrm.storyline.users.actors.UserRepository
import org.dohrm.storyline.users.directives.UserSecurity
import org.dohrm.toolkit.context.{ActorContext, FutureContext, JdbcContext}
import scala.concurrent.duration.FiniteDuration
trait UserContext extends UserSecurity {
self: ActorContext
with Auth0
with FutureContext
with JdbcContext
=>
override val userRepositoryActor = as.actorOf(
BackoffSupervisor.props(
Props(new UserRepository),
"crud-user",
FiniteDuration(3, TimeUnit.SECONDS),
FiniteDuration(30, TimeUnit.SECONDS),
0.42
)
)
}
开发者ID:dohr-michael,项目名称:storyline,代码行数:34,代码来源:UserContext.scala
示例6: Main1
//设置package包名称以及导入依赖的类
package ex1
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.pattern.{Backoff, BackoffSupervisor}
import scala.concurrent.Await
import scala.concurrent.duration._
object Main1 {
val x = new AtomicInteger(0)
def main(args: Array[String]) {
val system = ActorSystem("mofu")
val ahoProp = BackoffSupervisor.props(
Backoff.onFailure(
childProps = BackOffSupervisorActor.props,
childName = "aho",
minBackoff = 1.milliseconds,
maxBackoff = 2.seconds,
randomFactor = 0
)
)
val ref = system.actorOf(ahoProp)
(1 to 100).foreach { x =>
ref ! x
Thread.sleep(100)
}
Await.result(system.whenTerminated, Duration.Inf)
}
}
class BackOffSupervisorActor extends Actor {
import Main1.x
def receive: Receive = {
case _ if x.get() < 10 =>
x.incrementAndGet()
throw new RuntimeException(s"restarted ${x.get()} at ${System.currentTimeMillis()}")
case _ =>
println("yeah!")
context.system.terminate()
}
}
object BackOffSupervisorActor {
def props = Props(classOf[BackOffSupervisorActor])
}
开发者ID:matsu-chara,项目名称:AkkaSandbox,代码行数:48,代码来源:BackOffSupervisorActor.scala
示例7: AuditApp
//设置package包名称以及导入依赖的类
package audit
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.pattern.BackoffSupervisor
import akka.stream.ActorMaterializer
import audit.collector.CollectorApi
import audit.viewer.ViewerApi
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.policies.{ConstantReconnectionPolicy, DefaultRetryPolicy}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object AuditApp extends App with CollectorApi with ViewerApi with Api {
implicit val system = ActorSystem("Audit")
implicit val executionContext = system.dispatcher
implicit val materializer = ActorMaterializer()
val settings = new AuditSettings(ConfigFactory.load())
import settings._
val retryPolicy = DefaultRetryPolicy.INSTANCE
val reconnectionPolicy = new ConstantReconnectionPolicy(CassandraReconnectionDelay)
val cluster = Cluster.builder().
addContactPoint(CassandraHost).
withRetryPolicy(retryPolicy).
withReconnectionPolicy(reconnectionPolicy)
val nodeSupervisor = BackoffSupervisor.props(AuditNode.props(cluster, settings), "node", 1.second, 2.minutes, 0.2f)
val auditNode = system.actorOf(nodeSupervisor)
val collector = auditNode
val viewer = auditNode
val route = collectorRoute ~ viewerRoute
val binding = Http().bindAndHandle(route, AppHost, AppPort)
println("Up and running ...")
}
开发者ID:grzesiekw,项目名称:audit,代码行数:46,代码来源:AuditApp.scala
示例8: KafkaPublisher
//设置package包名称以及导入依赖的类
package nl.tradecloud.kafka
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, Props, SupervisorStrategy}
import akka.kafka.ProducerSettings
import akka.pattern.BackoffSupervisor
import akka.stream.Materializer
import nl.tradecloud.kafka.command.Publish
import nl.tradecloud.kafka.config.KafkaConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
class KafkaPublisher(system: ActorSystem)(implicit mat: Materializer, context: ActorRefFactory) {
import KafkaPublisher._
implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("dispatchers.kafka-dispatcher")
val kafkaConfig = KafkaConfig(system.settings.config)
private lazy val publisherId = KafkaClientIdSequenceNumber.getAndIncrement
private def publisherSettings = {
val keySerializer = new StringSerializer
val valueSerializer = new ByteArraySerializer
ProducerSettings(system, keySerializer, valueSerializer).withBootstrapServers(kafkaConfig.brokers)
}
private val publisherProps: Props = KafkaPublisherActor.props(kafkaConfig, publisherSettings)
private val backoffPublisherProps: Props = BackoffSupervisor.propsWithSupervisorStrategy(
publisherProps, s"KafkaPublisherActor$publisherId", 3.seconds,
30.seconds, 1.0, SupervisorStrategy.stoppingStrategy
)
private val publishActor = context.actorOf(backoffPublisherProps, s"KafkaBackoffPublisher$publisherId")
def publish(topic: String, msg: AnyRef): Future[Done] = {
val completed: Promise[Done] = Promise()
publishActor ! Publish(topic, msg, completed)
completed.future
}
}
object KafkaPublisher {
private val KafkaClientIdSequenceNumber = new AtomicInteger(1)
}
开发者ID:tradecloud,项目名称:kafka-akka-extension,代码行数:53,代码来源:KafkaPublisher.scala
示例9: Ch4Ping1Actor
//设置package包名称以及导入依赖的类
package com.akkastarting.chapter4
import scala.concurrent.duration._
import akka.actor.SupervisorStrategy.{Escalate, Stop, Restart, Resume}
import akka.actor.{Actor, ActorLogging, Props, OneForOneStrategy}
import akka.pattern.{BackoffOptions, Backoff, BackoffSupervisor}
class Ch4Ping1Actor extends Actor with ActorLogging {
log.info("Ping1Actor Created, ...")
val child2Supervisor = BackoffSupervisor.props(createStrategy(Props[Ch4Ping2Actor], "Ping2Actor"))
val child3Supervisor = BackoffSupervisor.props(createStrategy(Props[Ch4Ping3Actor], "Ping3Actor"))
val child2 = context.system.actorOf(child2Supervisor, "Ping2ActorWithSupervisor")
val child3 = context.system.actorOf(child3Supervisor, "Ping3ActorWithSupervisor")
def receive = {
case message: Bad =>
(child2 ! message)(sender)
(child3 ! message)(sender)
case message: Good =>
(child2 ! message)(sender)
(child3 ! message)(sender)
}
def createStrategy(props: Props, name: String): BackoffOptions = {
Backoff.onStop(props,
childName = name,
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2)
.withManualReset
.withSupervisorStrategy(OneForOneStrategy() {
case ex: ArithmeticException =>
Resume
case ex: NullPointerException =>
Restart
case ex: IllegalArgumentException =>
Stop
case _ =>
Escalate
})
}
}
开发者ID:imjuni,项目名称:AkkaStartingUsingScala,代码行数:45,代码来源:Ch4Ping1Actor.scala
示例10: CacheGuardian
//设置package包名称以及导入依赖的类
package com.carjump
import akka.actor._
import com.carjump.http.ReqParams
import net.ceedubs.ficus.Ficus._
import scala.concurrent.duration._
import akka.actor.SupervisorStrategy.Directive
import akka.pattern.{ Backoff, BackoffSupervisor }
object CacheGuardian {
def props = Props[CacheGuardian].withDispatcher(Cache.DispatcherName)
}
class CacheGuardian extends Actor with ActorLogging {
val conf = context.system.settings.config
val url = conf.as[String]("ws.url")
val pref = conf.as[String]("ws.path")
val pullInterval = conf.as[FiniteDuration]("ws.interval")
val minBackoffInterval = conf.as[FiniteDuration]("ws.min_backoff")
val maxBackoffInterval = conf.as[FiniteDuration]("ws.max_backoff")
val decider: PartialFunction[Throwable, Directive] = {
case ex: Throwable ?
log.error(ex, "Cache has failed unexpectedly. We will try to recreate it later")
akka.actor.SupervisorStrategy.Stop
}
val props = BackoffSupervisor.props(
Backoff.onStop(childProps = Cache.props(url, pref, pullInterval), childName = "cache",
minBackoff = minBackoffInterval, maxBackoff = maxBackoffInterval, randomFactor = 0.2)
.withSupervisorStrategy(OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))))
val fetcher = context.actorOf(props)
override def receive: Receive = {
case msg: ReqParams ? (fetcher forward msg)
}
}
开发者ID:haghard,项目名称:cache-demon,代码行数:41,代码来源:CacheGuardian.scala
注:本文中的akka.pattern.BackoffSupervisor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论