• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala SupervisorStrategy类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.SupervisorStrategy的典型用法代码示例。如果您正苦于以下问题:Scala SupervisorStrategy类的具体用法?Scala SupervisorStrategy怎么用?Scala SupervisorStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SupervisorStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: GetAddress

//设置package包名称以及导入依赖的类
package wow.common.network

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, Props, SupervisorStrategy}
import akka.io.Tcp._
import akka.io.{IO, Tcp}

case object GetAddress


class TCPServer[A <: TCPSessionFactory](val factory: A, val address: String, val port: Int)
  extends Actor with ActorLogging {
  log.debug("Binding server with socket")
  IO(Tcp)(context.system) ! Bind(self, new InetSocketAddress(address, port))

  override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

  override def postStop(): Unit = log.debug(s"Stopped TCP server for $address:$port")

  def receive: PartialFunction[Any, Unit] = {
    case Bound(localAddress) =>
      log.debug(s"TCP port opened at: ${localAddress.getHostString}:${localAddress.getPort}")

    case Connected(remote, local) =>
      log.debug(s"Remote connection set from $remote to $local")
      val handlerRef = context.actorOf(factory.props(sender), factory.PreferredName + TCPSession.PreferredName(remote))
      sender ! Register(handlerRef)

    case CommandFailed(_: Bind) => context stop self
  }
}

object TCPServer {
  def props[A <: TCPSessionFactory](companion: A, address: String, port: Int): Props = Props(classOf[TCPServer[A]],
    companion,
    address,
    port)

  val PreferredName = "tcp"
} 
开发者ID:SKNZ,项目名称:SpinaciCore,代码行数:42,代码来源:TCPServer.scala


示例2: CriticalProcessesManager

//设置package包名称以及导入依赖的类
package net.hvieira.yeoldeonlinestore.actor

import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy}
import net.hvieira.yeoldeonlinestore.actor.CriticalProcessesManager._
import net.hvieira.yeoldeonlinestore.actor.store.StoreManager
import net.hvieira.yeoldeonlinestore.actor.user.UserManager
import net.hvieira.yeoldeonlinestore.api.Item

object CriticalProcessesManager {
  private val STORE_MANAGER = "store-manager"
  private val USER_MANAGER = "user-manager"

  def props(itemProvider: () => Iterable[Item]) = Props(new CriticalProcessesManager(itemProvider))
}

class CriticalProcessesManager(private val itemProvider: () => Iterable[Item]) extends Actor {

  override def preStart(): Unit = {
    context.actorOf(StoreManager.props(3, itemProvider), STORE_MANAGER)
    context.actorOf(UserManager.props(), USER_MANAGER)
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case _ => SupervisorStrategy.restart
  }

  override def receive: Receive = {
    case IntroduceUserManagerReq =>
      val possibleRef = context.child(USER_MANAGER)
      possibleRef match {
        case Some(ref) => sender ! IntroductionResponse(ref)
        case None => throw new IllegalStateException("User manager actor ref does not exist")
      }

    case IntroduceStoreManagerReq =>
      val possibleRef = context.child(STORE_MANAGER)
      possibleRef match {
        case Some(ref) => sender ! IntroductionResponse(ref)
        case None => throw new IllegalStateException("Store manager actor ref does not exist")
      }
  }
}

case object IntroduceUserManagerReq
case object IntroduceStoreManagerReq

final case class IntroductionResponse(ref: ActorRef) 
开发者ID:hvieira,项目名称:ye-olde-online-store-akka,代码行数:48,代码来源:CriticalProcessesManager.scala


示例3: TodoServiceProtocol

//设置package包名称以及导入依赖的类
package services.actor

import akka.actor.{ Actor, ActorLogging, Props, SupervisorStrategy }
import dto.Todo
import org.joda.time.DateTime
import shade.memcached.{ Configuration, Memcached }

object TodoServiceProtocol {

  sealed trait Request
  case object GetList extends Request
  case class CreateTodo(text: String, limitAt: DateTime) extends Request
  case class UpdateTodo(id: Long, text: String, limitAt: DateTime) extends Request

  sealed trait Response
  case class Failure(cause: Throwable) extends Response
  case class ListGot(list: Seq[Todo]) extends Response
  case object Success extends Response

}

class TodoServiceActor extends Actor with ActorLogging {
  import TodoServiceProtocol._

  override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy

  val client = Memcached(Configuration(""))(context.dispatcher)

  override def receive: Receive = {
    case msg: Request =>
      val memcached = context.actorOf(TodoMemcachedActor.props(client))
      memcached forward msg
  }

}

object TodoServiceActor {
  def props: Props = Props(classOf[TodoServiceActor])
} 
开发者ID:hayasshi,项目名称:web-app-skeleton,代码行数:40,代码来源:TodoServiceActor.scala


示例4: SuppervisedFsu

//设置package包名称以及导入依赖的类
package com.wincom.dcim.sharded

import akka.actor.{Actor, ActorInitializationException, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy}


class SuppervisedFsu extends Actor {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: IllegalArgumentException ? SupervisorStrategy.Restart
    case _: ActorInitializationException ? SupervisorStrategy.Restart
    case _: DeathPactException ? SupervisorStrategy.Restart
    case _: Exception ? SupervisorStrategy.Restart
  }
  val fsuActor = context.actorOf(Props[FsuActor], s"${self.path.name}")

  def receive = {
    case msg ? fsuActor forward msg
  }
} 
开发者ID:xtwxy,项目名称:akka-tests,代码行数:19,代码来源:SuppervisedFsu.scala


示例5: Notifier

//设置package包名称以及导入依赖的类
package auctionsystem

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.event.LoggingReceive
import akka.actor.Props
import auctionsystem.Notifier.Notification
import auctionsystem.Notifier.RepeatRequest
import auctionsystem.exceptions.RemoteServerErrorException
import akka.actor.SupervisorStrategy.Resume
import akka.actor.SupervisorStrategy.Stop

class Notifier extends Actor {
  val auctionPublisher = context.actorSelection("akka.tcp://[email protected]:2552/user/auctionPublisher")
  var requestsMap = Map [ActorRef, Notification]()
  var idx = 0;
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3) {
     case _: RemoteServerErrorException =>
       println(s"An error occurred when trying to connect with remote server, node that caused failure: ${sender.path}. Resuming...")
       self ! RepeatRequest(requestsMap(sender), sender)
       Resume
 
     case e =>
       println(s"Something else went wrong: $e")
       Stop
  }
  
  def receive = LoggingReceive {
    case Notification(auction, highestBidder, highestBid) =>
      val notifierRequest = context.actorOf(Props(new NotifierRequest(auctionPublisher)), "notifierRequest" + idx);      
      idx = idx + 1
      val notification = Notification(auction, highestBidder, highestBid)
      requestsMap += (notifierRequest -> notification)
      notifierRequest ! notification
  }
}

object Notifier {
  final case class RepeatRequest(notification: Notification, notifierRequest: ActorRef)
  final case class Notification(auctionName: String, highestBidder: ActorRef, highestBid: Int);
} 
开发者ID:Stawicki,项目名称:reactive-scala,代码行数:44,代码来源:Notifier.scala


示例6: SearchDatabaseActor

//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}

import scala.concurrent.duration.Duration
import scala.io.Source._

class SearchDatabaseActor(filename: String) extends Actor {
  override def receive: Receive = {
    case Search(title) =>
      val lines = fromFile(filename).getLines
      sender ! SearchResponse(lines.filter(s => s.startsWith(title)).toArray)
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Stop
    }
}

case class SearchResponse(books: Array[String]) 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:21,代码来源:SearchDatabaseActor.scala


示例7: ClientActor

//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}

import scala.concurrent.duration._

class ClientActor extends Actor {
  override def receive: Receive = {
    case Ordered =>
      println("\033[33m:: Book was ordered\033[0m")
    case Searched(titles, prices) =>
      for (i <- titles.indices)
        println("\033[33m:: " + s"'${titles(i)}' costs ${prices(i)} PLN" + "\033[0m")
    case SearchError(message) =>
      println("\033[33m:: Search failed in database: " + message + "\033[0m")
    case StreamLine(line) =>
      println("\033[37m" + line + "\033[0m")
  }

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(10, Duration(60, "seconds")) {
    case _: Exception => Resume
  }
} 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:23,代码来源:ClientActor.scala


示例8: StreamActor

//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}

import scala.concurrent.duration.Duration
import scala.io.Source.fromFile

class StreamActor extends Actor {
  override def receive: Receive = {
    case StreamBook(filename) =>
      val materializer = ActorMaterializer.create(context)  // Materializing and running a stream always requires a Materializer to be in implicit scope.
      val sink = Source
        .actorRef(1000, OverflowStrategy.dropNew)           // If the buffer is full when a new element arrives, drops the new element.
        .throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping)   // throttle - to slow down the stream to 1 element per second.
        .to(Sink.actorRef(sender, NotUsed))                 // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
        .run()(materializer)
      val lines = fromFile(filename).getLines
      lines.foreach(line => sink ! StreamLine(line))
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Restart
    }
}

case class StreamBook(fileName: String)
case class StreamLine(line: String) 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala


示例9: OrderActor

//设置package包名称以及导入依赖的类
import java.io.FileWriter

import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}

import scala.concurrent.duration.Duration

class OrderActor() extends Actor {
  override def receive: Receive = {
    case Order(title: String) =>
      val currentSender = sender
      orderSingleton.orderBook(title)
      currentSender ! Ordered
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Escalate // because of other threads
    }
}

case class Order(title: String)
case class Ordered()

object orderSingleton {
  def orderBook(title: String): Unit = {
    new FileWriter("orders.txt", true) { // append to end of file
      write(s"$title\n")
      close()
    }
  }
} 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:33,代码来源:OrderActor.scala


示例10: RootActorSystem

//设置package包名称以及导入依赖的类
package com.wavesplatform.actor

import akka.actor.{ActorSystem, AllForOneStrategy, SupervisorStrategy, SupervisorStrategyConfigurator}
import com.typesafe.config.Config
import scorex.utils.ScorexLogging

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object RootActorSystem extends ScorexLogging {
  @volatile private var failed = false

  final class EscalatingStrategy extends SupervisorStrategyConfigurator {
    override def create(): SupervisorStrategy = AllForOneStrategy(loggingEnabled = false) {
      case t: Throwable =>
        failed = true
        log.error("Root actor got exception, escalate", t)
        SupervisorStrategy.Escalate
    }
  }

  def start(id: String, config: Config)(init: ActorSystem => Unit): Unit = {
    val system = ActorSystem(id, config)
    try {
      init(system)
    } catch {
      case e: Exception =>
        log.error(s"Error while initializing actor system $id", e)
        sys.exit(1)
    }

    Await.result(system.whenTerminated, Duration.Inf)
    if (failed) {
      sys.exit(1)
    } else {
      sys.exit(0)
    }
  }
} 
开发者ID:wavesplatform,项目名称:Waves,代码行数:40,代码来源:RootActorSystem.scala


示例11: AccountSupervisor

//设置package包名称以及导入依赖的类
package sample.blog

import akka.actor.{Actor, ActorInitializationException, ActorLogging, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy, Terminated}
import scala.concurrent.duration._



class AccountSupervisor extends Actor with ActorLogging {
  val counter = context.actorOf(Props[AccountEntity], "account-")

  context.watch(counter)

  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2) {
    case _: IllegalArgumentException     ? SupervisorStrategy.Resume
    case _: ActorInitializationException ? SupervisorStrategy.Stop
    case _: DeathPactException           ? SupervisorStrategy.Stop
    case _: Exception                    ? SupervisorStrategy.Restart
  }

  def receive = {
    case x:Terminated =>
      log.info(s"The child ${sender} terminated due to - ${x}")
    case msg ? {
      log.info(s"Supervisor of this message ${msg} - ${self.path}")
      counter forward msg
    }
  }
} 
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:29,代码来源:AccountSupervisor.scala


示例12: Supervision

//设置package包名称以及导入依赖的类
package com.pagerduty.scheduler.akka

import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{AllForOneStrategy, SupervisorStrategy, SupervisorStrategyConfigurator}
import com.pagerduty.scheduler.Scheduler

object Supervision {
  val AlwaysEscalateStrategy = AllForOneStrategy() {
    case _: Throwable => Escalate
  }

  
  def makeAlwaysEscalateTopicSupervisorStrategy(logger: Scheduler.Logging): AllForOneStrategy = {
    AllForOneStrategy() {
      case throwable: Throwable => {
        logger.reportActorSystemRestart(throwable)
        Escalate
      }
    }
  }
}

final class UserGuardianEscalateStrategy extends SupervisorStrategyConfigurator {
  override def create(): SupervisorStrategy = Supervision.AlwaysEscalateStrategy
} 
开发者ID:PagerDuty,项目名称:scheduler,代码行数:26,代码来源:Supervision.scala


示例13: SimplifiedTweetProcessorActor

//设置package包名称以及导入依赖的类
package org.eigengo.rsa.ingest.v100

import java.util.UUID

import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.ActorMaterializer
import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord, KafkaSerializer}
import com.google.protobuf.ByteString
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
import org.eigengo.rsa.Envelope

object SimplifiedTweetProcessorActor {

  def props(config: Config): Props = {
    val producerConf = KafkaProducer.Conf(
      config.getConfig("tweet-image-producer"),
      new StringSerializer,
      KafkaSerializer[Envelope](_.toByteArray)
    )
    Props(classOf[SimplifiedTweetProcessorActor], producerConf)
  }
}

class SimplifiedTweetProcessorActor(producerConf: KafkaProducer.Conf[String, Envelope]) extends Actor {
  private[this] val producer = KafkaProducer(conf = producerConf)
  implicit val _ = ActorMaterializer()

  import scala.concurrent.duration._
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10.seconds) {
    case _ ? SupervisorStrategy.Restart
  }

  override def receive: Receive = {
    case TweetImage(handle, content) ?
      producer.send(KafkaProducerRecord("tweet-image", handle,
        Envelope(version = 100,
          handle = handle,
          ingestionTimestamp = System.nanoTime(),
          processingTimestamp = System.nanoTime(),
          messageId = UUID.randomUUID().toString,
          correlationId = UUID.randomUUID().toString,
          payload = content)))
    case SimplifiedTweet(handle, mediaUrls) ?
      mediaUrls.foreach { mediaUrl ?
        import context.dispatcher
        val request = HttpRequest(method = HttpMethods.GET, uri = Uri(mediaUrl))
        val timeout = 1000.millis
        Http(context.system).singleRequest(request).flatMap(_.entity.toStrict(timeout)).foreach { entity ?
          self ! TweetImage(handle, ByteString.copyFrom(entity.data.toArray))
        }
      }
  }

} 
开发者ID:eigengo,项目名称:reactive-summit-2016,代码行数:58,代码来源:SimplifiedTweetProcessorActor.scala


示例14: AkkaSupervision

//设置package包名称以及导入依赖的类
package trove.core.event

import akka.actor.SupervisorStrategy.{Resume, Stop}
import akka.actor.{ActorInitializationException, ActorKilledException, OneForOneStrategy, SupervisorStrategy, SupervisorStrategyConfigurator}
import grizzled.slf4j.Logging

class AkkaSupervision extends SupervisorStrategyConfigurator with Logging {
  override def create(): SupervisorStrategy = {
    logger.debug("Creating custom akka supervisor strategy")
    OneForOneStrategy() {
      case _: ActorInitializationException => Stop
      case _: ActorKilledException         => Stop
      case e: Exception                    =>
        logger.error("Exception in event listener", e)
        Resume
    }    
  }
} 
开发者ID:emanchgo,项目名称:trove,代码行数:19,代码来源:AkkaSupervision.scala


示例15: Intact

//设置package包名称以及导入依赖的类
package oriana

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{OneForOneStrategy, SupervisorStrategy, Props, Actor}

sealed trait FuseStatus
case object Intact extends FuseStatus
case object Blown extends FuseStatus

class Fuse(childSpec: Props) extends Actor {
  case object FuseBlown
  var status: FuseStatus = Intact
  val child = context.actorOf(childSpec)

  def receive = {
    case Fuse.ReadStatus => sender() ! status
    case FuseBlown => status = Blown
    case x if status == Intact => child forward x
  }

  override def supervisorStrategy = OneForOneStrategy() {
    case _: Throwable =>
      self ! FuseBlown
      Stop
  }
}

object Fuse {
  case object ReadStatus
  def props(childSpec: Props) = Props(new Fuse(childSpec))
} 
开发者ID:Norwae,项目名称:oriana,代码行数:32,代码来源:Fuse.scala


示例16: AuditNode

//设置package包名称以及导入依赖的类
package audit

import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import audit.collector.CollectorActor
import audit.collector.CollectorActor.CollectorRequest
import audit.viewer.ViewerActor
import audit.viewer.ViewerActor.ViewerRequest
import com.datastax.driver.core.Cluster

class AuditNode(cluster: Cluster.Builder, settings: AuditSettings) extends Actor {
  val session = cluster.build().connect()

  val collector = context.actorOf(CollectorActor.props(session), "collector")
  val viewer = context.actorOf(ViewerActor.props(session), "viewer")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case e: Exception => Escalate
  }

  override def receive: Receive = {
    case req: CollectorRequest => collector forward req
    case req: ViewerRequest => viewer forward req
  }
}

object AuditNode {
  def props(cluster: Cluster.Builder, settings: AuditSettings) = Props(new AuditNode(cluster, settings))
} 
开发者ID:grzesiekw,项目名称:audit,代码行数:30,代码来源:AuditNode.scala


示例17: GeneralSupervisor

//设置package包名称以及导入依赖的类
package com.bisphone.sarf.implv1.util

import akka.actor.{Actor, AllForOneStrategy, Props, SupervisorStrategy}

import scala.concurrent.duration._


private[implv1] class GeneralSupervisor (
   props: Props,
   maxNrOfRetries: Int,
   withinTimeRange: FiniteDuration,
   loggingEnabled: Boolean,
   recoveryStrategy: PartialFunction[Throwable, SupervisorStrategy.Directive]
) extends Actor {

   override val supervisorStrategy = AllForOneStrategy(
      maxNrOfRetries = maxNrOfRetries,
      withinTimeRange = withinTimeRange,
      loggingEnabled = loggingEnabled
   )(recoveryStrategy)

   val ref = context.actorOf(props, "actor")

   override def receive: Receive = {
      case msg => ref.tell(msg, sender)
   }

   override def postStop (): Unit = {
      context.children.foreach { child => context stop child }
   }

}

object GeneralSupervisor {
   def props (
      props: Props,
      maxNrOfRetries: Int,
      withinTimeRange: FiniteDuration,
      loggingEnabled: Boolean
   )(
      recoveryStrategy: PartialFunction[Throwable, SupervisorStrategy.Directive]
   ) = Props {
      new GeneralSupervisor(
         props,
         maxNrOfRetries,
         withinTimeRange,
         loggingEnabled,
         recoveryStrategy
      )
   }
} 
开发者ID:bisphone,项目名称:SARF,代码行数:52,代码来源:GeneralSupervisor.scala


示例18: LogProcSupervisor

//设置package包名称以及导入依赖的类
package chapter4

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{OneForOneStrategy, SupervisorStrategy, Actor, Props}


class LogProcSupervisor(dbSupervisorProps:Props) extends Actor{

      //if file corrupted,keep deal the next msg
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(){
    case _:CorruptedFileException=>Resume
  }
    //by dbSupervisorProps, create db supervisor
  val dbSupervisor=context.actorOf(dbSupervisorProps)
  //this supervisor monitoer the logProcessor
  val logProcProps=Props(new LogProcessor(dbSupervisor))
  val logProcessor=context.actorOf(logProcProps)

  def receive={
    case m=> logProcessor forward(m)
  }

} 
开发者ID:wjingyao2008,项目名称:firsttry,代码行数:24,代码来源:LogProcSupervisor.scala


示例19: DiscoveryGuardian

//设置package包名称以及导入依赖的类
package linguistic

import java.time.LocalDateTime
import java.util.TimeZone

import akka.actor.{Actor, ActorLogging, Props, SupervisorStrategy}

import scala.concurrent.duration._

object DiscoveryGuardian {
  def props(env: String, httpPort: Int, hostName: String) =
    Props(new DiscoveryGuardian(env, httpPort, hostName))
      .withDispatcher(HttpServer.HttpDispatcher)
}

class DiscoveryGuardian(env: String, httpPort: Int, hostName: String) extends Actor with ActorLogging {
  override val supervisorStrategy = SupervisorStrategy.stoppingStrategy

  val system = context.system
  val config = context.system.settings.config
  val timeout = FiniteDuration(config.getDuration("constructr.join-timeout").toNanos, NANOSECONDS)
  system.scheduler.scheduleOnce(timeout)(self ! 'Discovered)(system.dispatcher)

  override def receive: Receive = {
    case 'Discovered =>
      context.system.actorOf(HttpServer.props(httpPort, hostName,
        config.getString("akka.http.ssl.keypass"), config.getString("akka.http.ssl.storepass")), "http-server")

      println(Console.GREEN +
        """
              ___   ___   ___  __   __  ___   ___
             / __| | __| | _ \ \ \ / / | __| | _ \
             \__ \ | _|  |   /  \ V /  | _|  |   /
             |___/ |___| |_|_\   \_/   |___| |_|_\

        """ + Console.RESET)

      val tz = TimeZone.getDefault.getID
      val greeting = new StringBuilder()
        .append('\n')
        .append("=================================================================================================")
        .append('\n')
        .append(s"? ? ?  Environment: ${env} TimeZone: $tz Started at ${LocalDateTime.now}  ? ? ?")
        .append('\n')
        .append(s"? ? ?  ConstructR service-discovery: ${config.getString("constructr.coordination.class-name")} on ${config.getString("constructr.coordination.host")}  ? ? ?")
        .append('\n')
        .append(s"? ? ?  Akka cluster: ${config.getInt("akka.remote.netty.tcp.port")}  ? ? ?")
        .append('\n')
        .append(s"? ? ?  Akka seeds: ${config.getStringList("akka.cluster.seed-nodes")}  ? ? ?")
        .append('\n')
        .append(s"? ? ?  Cassandra domain points: ${config.getStringList("cassandra-journal.contact-points")}  ? ? ?")
        .append('\n')
        .append(s"? ? ?  Server online at https://${config.getString("akka.http.interface")}:${httpPort}   ? ? ?")
        .append('\n')
        .append("=================================================================================================")
      system.log.info(greeting.toString)
  }
} 
开发者ID:haghard,项目名称:linguistic,代码行数:59,代码来源:DiscoveryGuardian.scala


示例20: KafkaPublisher

//设置package包名称以及导入依赖的类
package nl.tradecloud.kafka

import java.util.concurrent.atomic.AtomicInteger

import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, Props, SupervisorStrategy}
import akka.kafka.ProducerSettings
import akka.pattern.BackoffSupervisor
import akka.stream.Materializer
import nl.tradecloud.kafka.command.Publish
import nl.tradecloud.kafka.config.KafkaConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}

class KafkaPublisher(system: ActorSystem)(implicit mat: Materializer, context: ActorRefFactory) {
  import KafkaPublisher._

  implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("dispatchers.kafka-dispatcher")

  val kafkaConfig = KafkaConfig(system.settings.config)

  private lazy val publisherId = KafkaClientIdSequenceNumber.getAndIncrement

  private def publisherSettings = {
    val keySerializer = new StringSerializer
    val valueSerializer = new ByteArraySerializer

    ProducerSettings(system, keySerializer, valueSerializer).withBootstrapServers(kafkaConfig.brokers)
  }

  private val publisherProps: Props = KafkaPublisherActor.props(kafkaConfig, publisherSettings)
  private val backoffPublisherProps: Props = BackoffSupervisor.propsWithSupervisorStrategy(
    publisherProps, s"KafkaPublisherActor$publisherId", 3.seconds,
    30.seconds, 1.0, SupervisorStrategy.stoppingStrategy
  )
  private val publishActor = context.actorOf(backoffPublisherProps, s"KafkaBackoffPublisher$publisherId")

  def publish(topic: String, msg: AnyRef): Future[Done] = {
    val completed: Promise[Done] = Promise()

    publishActor ! Publish(topic, msg, completed)

    completed.future
  }

}

object KafkaPublisher {
  private val KafkaClientIdSequenceNumber = new AtomicInteger(1)
} 
开发者ID:tradecloud,项目名称:kafka-akka-extension,代码行数:53,代码来源:KafkaPublisher.scala



注:本文中的akka.actor.SupervisorStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Futures类代码示例发布时间:2022-05-23
下一篇:
Scala SessionState类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap