• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala BackoffSupervisor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.pattern.BackoffSupervisor的典型用法代码示例。如果您正苦于以下问题:Scala BackoffSupervisor类的具体用法?Scala BackoffSupervisor怎么用?Scala BackoffSupervisor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BackoffSupervisor类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Main

//设置package包名称以及导入依赖的类
package com.example

import akka.actor.{ActorSystem,Props}
import akka.pattern.{Backoff,BackoffSupervisor}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import kamon.Kamon

object Main {
  def main(args: Array[String]): Unit = {
	  Kamon.start()
    if (args.isEmpty || args.head == "Server")
      startServerSystem()
    if (args.isEmpty || args.head == "Writer")
      startWriterSystem()
    if (args.isEmpty || args.head == "Reader")
      startReaderSystem()
  }
  
  def startServerSystem() = {
    val system = ActorSystem("ServerSystem", ConfigFactory.load("server"))
    import system.dispatcher
    val serverActor = supervise(system, ServerActor.props, "serverActor")
    system.scheduler.schedule(1 second, 1 seconds){serverActor ! "print"}
  }
  
  def startWriterSystem() = {
    val config = ConfigFactory.load("writer")
    val serverHost = config.getString("serverHost")
    val serverPort = config.getInt("serverPort")
    val system = ActorSystem("WriterSystem", config)
    val serverSelection = system.actorSelection(s"akka.tcp://[email protected]$serverHost:$serverPort/user/serverActor")
    val writerActor = supervise(system, WriterActor.props(serverSelection), "writerActor")
  }
  
  def startReaderSystem() = {
    val config = ConfigFactory.load("reader")
    val serverHost = config.getString("serverHost")
    val serverPort = config.getInt("serverPort")
    val system = ActorSystem("ReaderSystem", config)
    import system.dispatcher
    val serverSelection = system.actorSelection(s"akka.tcp://[email protected]$serverHost:$serverPort/user/serverActor")
    val readerActor = supervise(system, ReaderActor.props(serverSelection, 1000), "readerActor")
    system.scheduler.schedule(1 second, 1 seconds){readerActor ! "print"}
  }
  
  def supervise(system: ActorSystem, childProps: Props, name: String) = {
    val supervisor = BackoffSupervisor.props(
      Backoff.onStop(
        childProps,
        childName = name,
        minBackoff = 3 seconds,
        maxBackoff = 30 seconds,
        randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
      ))
    system.actorOf(supervisor, name = name)

  }
} 
开发者ID:enear,项目名称:server-writer-reader,代码行数:60,代码来源:ApplicationMain.scala


示例2: Main

//设置package包名称以及导入依赖的类
package sample.kamon

import akka.actor._
import akka.pattern.{Backoff, BackoffSupervisor}
import kamon.Kamon
import scala.concurrent.duration._

object Main extends App {

  Kamon.start()

  val system = ActorSystem("application")

  val actorProps = Props[LongConsumer]

  val supervisor = BackoffSupervisor.props(
    Backoff.onStop(
      actorProps,
      childName = "telemetry-consumer",
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
    ).withSupervisorStrategy(
      OneForOneStrategy() {
        case ex =>
          system.log.error(ex, "There was an error in KafkaActor")
          SupervisorStrategy.Restart //Here we can add some log or send a notification
      })
  )

  system.actorOf(supervisor)
} 
开发者ID:frossi85,项目名称:akka-kamon-sample,代码行数:33,代码来源:Main.scala


示例3: SimulateWindTurbines

//设置package包名称以及导入依赖的类
package sample.stream_actor

import akka.actor.ActorSystem
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import sample.WindTurbineSimulator

import scala.concurrent.duration._


object SimulateWindTurbines extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val endpoint = "ws://127.0.0.1:8080"
  val numberOfTurbines = 5
  Source(1 to numberOfTurbines)
    .throttle(
      elements = 100, //number of elements to be taken from bucket
      per = 1.second,
      maximumBurst = 100,  //capacity of bucket
      mode = ThrottleMode.shaping
    )
    .map { _ =>
      val id = java.util.UUID.randomUUID.toString

      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          WindTurbineSimulator.props(id, endpoint),
          childName = id,
          minBackoff = 1.second,
          maxBackoff = 30.seconds,
          randomFactor = 0.2
        ))

      system.actorOf(supervisor, name = s"$id-backoff-supervisor")
    }
    .runWith(Sink.ignore)
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:41,代码来源:SimulateWindTurbines.scala


示例4: BootstrapActor

//设置package包名称以及导入依赖的类
package com.microworkflow

import akka.actor.{Actor, Props}
import akka.pattern.BackoffSupervisor
import akka.stream.{ActorMaterializer, Materializer}

import scala.concurrent.duration._

class BootstrapActor private(propsBuilder: Materializer ? Props) extends Actor {

  override def receive: Receive = {
    case _ ? 
  }

  implicit val materializer = ActorMaterializer()
  val supervisorProps =
    BackoffSupervisor.props(
      propsBuilder(materializer)
      , "rest-router"
      , minBackoff = 5.seconds
      , maxBackoff = 50.seconds
      , randomFactor = 0.11
    )
  context.actorOf(supervisorProps)
}

object BootstrapActor {

  def props(propsBuilder: Materializer ? Props): Props =
    Props(classOf[BootstrapActor], propsBuilder)
} 
开发者ID:polymorphic,项目名称:akka-http-svc.g8,代码行数:32,代码来源:BootstrapActor.scala


示例5:

//设置package包名称以及导入依赖的类
package org.dohrm.storyline.users

import java.util.concurrent.TimeUnit

import akka.actor.Props
import akka.pattern.BackoffSupervisor
import org.dohrm.auth0.directives.Auth0
import org.dohrm.storyline.users.actors.UserRepository
import org.dohrm.storyline.users.directives.UserSecurity
import org.dohrm.toolkit.context.{ActorContext, FutureContext, JdbcContext}

import scala.concurrent.duration.FiniteDuration


trait UserContext extends UserSecurity {
  self: ActorContext
    with Auth0
    with FutureContext
    with JdbcContext
  =>

  override val userRepositoryActor = as.actorOf(
    BackoffSupervisor.props(
      Props(new UserRepository),
      "crud-user",
      FiniteDuration(3, TimeUnit.SECONDS),
      FiniteDuration(30, TimeUnit.SECONDS),
      0.42
    )
  )


} 
开发者ID:dohr-michael,项目名称:storyline,代码行数:34,代码来源:UserContext.scala


示例6: Main1

//设置package包名称以及导入依赖的类
package ex1

import java.util.concurrent.atomic.AtomicInteger

import akka.actor._
import akka.pattern.{Backoff, BackoffSupervisor}

import scala.concurrent.Await
import scala.concurrent.duration._

object Main1 {
  val x = new AtomicInteger(0)

  def main(args: Array[String]) {
    val system = ActorSystem("mofu")
    val ahoProp = BackoffSupervisor.props(
      Backoff.onFailure(
        childProps = BackOffSupervisorActor.props,
        childName = "aho",
        minBackoff = 1.milliseconds,
        maxBackoff = 2.seconds,
        randomFactor = 0
        )
    )
    val ref = system.actorOf(ahoProp)
    (1 to 100).foreach { x =>
      ref ! x
      Thread.sleep(100)
    }
    Await.result(system.whenTerminated, Duration.Inf)
  }
}

class BackOffSupervisorActor extends Actor {
  import Main1.x
  def receive: Receive = {
    case _ if x.get() < 10 =>
      x.incrementAndGet()
      throw new RuntimeException(s"restarted ${x.get()} at ${System.currentTimeMillis()}")
    case _ =>
      println("yeah!")
      context.system.terminate()
  }
}
object BackOffSupervisorActor {
  def props = Props(classOf[BackOffSupervisorActor])
} 
开发者ID:matsu-chara,项目名称:AkkaSandbox,代码行数:48,代码来源:BackOffSupervisorActor.scala


示例7: AuditApp

//设置package包名称以及导入依赖的类
package audit

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.pattern.BackoffSupervisor
import akka.stream.ActorMaterializer
import audit.collector.CollectorApi
import audit.viewer.ViewerApi
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.policies.{ConstantReconnectionPolicy, DefaultRetryPolicy}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

object AuditApp extends App with CollectorApi with ViewerApi with Api {

  implicit val system = ActorSystem("Audit")
  implicit val executionContext = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val settings = new AuditSettings(ConfigFactory.load())
  import settings._

  val retryPolicy = DefaultRetryPolicy.INSTANCE
  val reconnectionPolicy = new ConstantReconnectionPolicy(CassandraReconnectionDelay)
  val cluster = Cluster.builder().
    addContactPoint(CassandraHost).
    withRetryPolicy(retryPolicy).
    withReconnectionPolicy(reconnectionPolicy)

  val nodeSupervisor = BackoffSupervisor.props(AuditNode.props(cluster, settings), "node", 1.second, 2.minutes, 0.2f)

  val auditNode = system.actorOf(nodeSupervisor)

  val collector = auditNode
  val viewer = auditNode

  val route = collectorRoute ~ viewerRoute

  val binding = Http().bindAndHandle(route, AppHost, AppPort)

  println("Up and running ...")

} 
开发者ID:grzesiekw,项目名称:audit,代码行数:46,代码来源:AuditApp.scala


示例8: KafkaPublisher

//设置package包名称以及导入依赖的类
package nl.tradecloud.kafka

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, Props, SupervisorStrategy}
import akka.kafka.ProducerSettings
import akka.pattern.BackoffSupervisor
import akka.stream.Materializer
import nl.tradecloud.kafka.command.Publish
import nl.tradecloud.kafka.config.KafkaConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class KafkaPublisher(system: ActorSystem)(implicit mat: Materializer, context: ActorRefFactory) {
  import KafkaPublisher._

  implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("dispatchers.kafka-dispatcher")

  val kafkaConfig = KafkaConfig(system.settings.config)

  private lazy val publisherId = KafkaClientIdSequenceNumber.getAndIncrement

  private def publisherSettings = {
    val keySerializer = new StringSerializer
    val valueSerializer = new ByteArraySerializer

    ProducerSettings(system, keySerializer, valueSerializer).withBootstrapServers(kafkaConfig.brokers)
  }

  private val publisherProps: Props = KafkaPublisherActor.props(kafkaConfig, publisherSettings)
  private val backoffPublisherProps: Props = BackoffSupervisor.propsWithSupervisorStrategy(
    publisherProps, s"KafkaPublisherActor$publisherId", 3.seconds,
    30.seconds, 1.0, SupervisorStrategy.stoppingStrategy
  )
  private val publishActor = context.actorOf(backoffPublisherProps, s"KafkaBackoffPublisher$publisherId")

  def publish(topic: String, msg: AnyRef): Future[Done] = {
    val completed: Promise[Done] = Promise()

    publishActor ! Publish(topic, msg, completed)

    completed.future
  }

}

object KafkaPublisher {
  private val KafkaClientIdSequenceNumber = new AtomicInteger(1)
} 
开发者ID:tradecloud,项目名称:kafka-akka-extension,代码行数:53,代码来源:KafkaPublisher.scala


示例9: Ch4Ping1Actor

//设置package包名称以及导入依赖的类
package com.akkastarting.chapter4

import scala.concurrent.duration._
import akka.actor.SupervisorStrategy.{Escalate, Stop, Restart, Resume}
import akka.actor.{Actor, ActorLogging, Props, OneForOneStrategy}
import akka.pattern.{BackoffOptions, Backoff, BackoffSupervisor}

class Ch4Ping1Actor extends Actor with ActorLogging {
  log.info("Ping1Actor Created, ...")

  val child2Supervisor = BackoffSupervisor.props(createStrategy(Props[Ch4Ping2Actor], "Ping2Actor"))
  val child3Supervisor = BackoffSupervisor.props(createStrategy(Props[Ch4Ping3Actor], "Ping3Actor"))
  val child2 = context.system.actorOf(child2Supervisor, "Ping2ActorWithSupervisor")
  val child3 = context.system.actorOf(child3Supervisor, "Ping3ActorWithSupervisor")

  def receive = {
    case message: Bad =>
      (child2 ! message)(sender)
      (child3 ! message)(sender)
    case message: Good =>
      (child2 ! message)(sender)
      (child3 ! message)(sender)
  }

  def createStrategy(props: Props, name: String): BackoffOptions = {
    
    Backoff.onStop(props,
      childName = name,
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2)
      .withManualReset
      .withSupervisorStrategy(OneForOneStrategy() {
        case ex: ArithmeticException =>
          Resume
        case ex: NullPointerException =>
          Restart
        case ex: IllegalArgumentException =>
          Stop
        case _ =>
          Escalate
      })
  }
} 
开发者ID:imjuni,项目名称:AkkaStartingUsingScala,代码行数:45,代码来源:Ch4Ping1Actor.scala


示例10: CacheGuardian

//设置package包名称以及导入依赖的类
package com.carjump

import akka.actor._
import com.carjump.http.ReqParams
import net.ceedubs.ficus.Ficus._
import scala.concurrent.duration._
import akka.actor.SupervisorStrategy.Directive
import akka.pattern.{ Backoff, BackoffSupervisor }

object CacheGuardian {
  def props = Props[CacheGuardian].withDispatcher(Cache.DispatcherName)
}

class CacheGuardian extends Actor with ActorLogging {
  val conf = context.system.settings.config

  val url = conf.as[String]("ws.url")
  val pref = conf.as[String]("ws.path")
  val pullInterval = conf.as[FiniteDuration]("ws.interval")

  val minBackoffInterval = conf.as[FiniteDuration]("ws.min_backoff")
  val maxBackoffInterval = conf.as[FiniteDuration]("ws.max_backoff")

  val decider: PartialFunction[Throwable, Directive] = {
    case ex: Throwable ?
      log.error(ex, "Cache has failed unexpectedly. We will try to recreate it later")
      akka.actor.SupervisorStrategy.Stop
  }

  val props = BackoffSupervisor.props(
    Backoff.onStop(childProps = Cache.props(url, pref, pullInterval), childName = "cache",
      minBackoff = minBackoffInterval, maxBackoff = maxBackoffInterval, randomFactor = 0.2)
      .withSupervisorStrategy(OneForOneStrategy()(decider.orElse(SupervisorStrategy.defaultStrategy.decider))))

  val fetcher = context.actorOf(props)

  override def receive: Receive = {
    case msg: ReqParams ? (fetcher forward msg)
  }
} 
开发者ID:haghard,项目名称:cache-demon,代码行数:41,代码来源:CacheGuardian.scala



注:本文中的akka.pattern.BackoffSupervisor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Cancel类代码示例发布时间:2022-05-23
下一篇:
Scala ChronoField类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap