本文整理汇总了Scala中akka.actor.SupervisorStrategy类的典型用法代码示例。如果您正苦于以下问题:Scala SupervisorStrategy类的具体用法?Scala SupervisorStrategy怎么用?Scala SupervisorStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SupervisorStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GetAddress
//设置package包名称以及导入依赖的类
package wow.common.network
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, Props, SupervisorStrategy}
import akka.io.Tcp._
import akka.io.{IO, Tcp}
case object GetAddress
class TCPServer[A <: TCPSessionFactory](val factory: A, val address: String, val port: Int)
extends Actor with ActorLogging {
log.debug("Binding server with socket")
IO(Tcp)(context.system) ! Bind(self, new InetSocketAddress(address, port))
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
override def postStop(): Unit = log.debug(s"Stopped TCP server for $address:$port")
def receive: PartialFunction[Any, Unit] = {
case Bound(localAddress) =>
log.debug(s"TCP port opened at: ${localAddress.getHostString}:${localAddress.getPort}")
case Connected(remote, local) =>
log.debug(s"Remote connection set from $remote to $local")
val handlerRef = context.actorOf(factory.props(sender), factory.PreferredName + TCPSession.PreferredName(remote))
sender ! Register(handlerRef)
case CommandFailed(_: Bind) => context stop self
}
}
object TCPServer {
def props[A <: TCPSessionFactory](companion: A, address: String, port: Int): Props = Props(classOf[TCPServer[A]],
companion,
address,
port)
val PreferredName = "tcp"
}
开发者ID:SKNZ,项目名称:SpinaciCore,代码行数:42,代码来源:TCPServer.scala
示例2: CriticalProcessesManager
//设置package包名称以及导入依赖的类
package net.hvieira.yeoldeonlinestore.actor
import akka.actor.{Actor, ActorRef, OneForOneStrategy, Props, SupervisorStrategy}
import net.hvieira.yeoldeonlinestore.actor.CriticalProcessesManager._
import net.hvieira.yeoldeonlinestore.actor.store.StoreManager
import net.hvieira.yeoldeonlinestore.actor.user.UserManager
import net.hvieira.yeoldeonlinestore.api.Item
object CriticalProcessesManager {
private val STORE_MANAGER = "store-manager"
private val USER_MANAGER = "user-manager"
def props(itemProvider: () => Iterable[Item]) = Props(new CriticalProcessesManager(itemProvider))
}
class CriticalProcessesManager(private val itemProvider: () => Iterable[Item]) extends Actor {
override def preStart(): Unit = {
context.actorOf(StoreManager.props(3, itemProvider), STORE_MANAGER)
context.actorOf(UserManager.props(), USER_MANAGER)
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ => SupervisorStrategy.restart
}
override def receive: Receive = {
case IntroduceUserManagerReq =>
val possibleRef = context.child(USER_MANAGER)
possibleRef match {
case Some(ref) => sender ! IntroductionResponse(ref)
case None => throw new IllegalStateException("User manager actor ref does not exist")
}
case IntroduceStoreManagerReq =>
val possibleRef = context.child(STORE_MANAGER)
possibleRef match {
case Some(ref) => sender ! IntroductionResponse(ref)
case None => throw new IllegalStateException("Store manager actor ref does not exist")
}
}
}
case object IntroduceUserManagerReq
case object IntroduceStoreManagerReq
final case class IntroductionResponse(ref: ActorRef)
开发者ID:hvieira,项目名称:ye-olde-online-store-akka,代码行数:48,代码来源:CriticalProcessesManager.scala
示例3: TodoServiceProtocol
//设置package包名称以及导入依赖的类
package services.actor
import akka.actor.{ Actor, ActorLogging, Props, SupervisorStrategy }
import dto.Todo
import org.joda.time.DateTime
import shade.memcached.{ Configuration, Memcached }
object TodoServiceProtocol {
sealed trait Request
case object GetList extends Request
case class CreateTodo(text: String, limitAt: DateTime) extends Request
case class UpdateTodo(id: Long, text: String, limitAt: DateTime) extends Request
sealed trait Response
case class Failure(cause: Throwable) extends Response
case class ListGot(list: Seq[Todo]) extends Response
case object Success extends Response
}
class TodoServiceActor extends Actor with ActorLogging {
import TodoServiceProtocol._
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
val client = Memcached(Configuration(""))(context.dispatcher)
override def receive: Receive = {
case msg: Request =>
val memcached = context.actorOf(TodoMemcachedActor.props(client))
memcached forward msg
}
}
object TodoServiceActor {
def props: Props = Props(classOf[TodoServiceActor])
}
开发者ID:hayasshi,项目名称:web-app-skeleton,代码行数:40,代码来源:TodoServiceActor.scala
示例4: SuppervisedFsu
//设置package包名称以及导入依赖的类
package com.wincom.dcim.sharded
import akka.actor.{Actor, ActorInitializationException, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy}
class SuppervisedFsu extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: IllegalArgumentException ? SupervisorStrategy.Restart
case _: ActorInitializationException ? SupervisorStrategy.Restart
case _: DeathPactException ? SupervisorStrategy.Restart
case _: Exception ? SupervisorStrategy.Restart
}
val fsuActor = context.actorOf(Props[FsuActor], s"${self.path.name}")
def receive = {
case msg ? fsuActor forward msg
}
}
开发者ID:xtwxy,项目名称:akka-tests,代码行数:19,代码来源:SuppervisedFsu.scala
示例5: Notifier
//设置package包名称以及导入依赖的类
package auctionsystem
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.event.LoggingReceive
import akka.actor.Props
import auctionsystem.Notifier.Notification
import auctionsystem.Notifier.RepeatRequest
import auctionsystem.exceptions.RemoteServerErrorException
import akka.actor.SupervisorStrategy.Resume
import akka.actor.SupervisorStrategy.Stop
class Notifier extends Actor {
val auctionPublisher = context.actorSelection("akka.tcp://[email protected]:2552/user/auctionPublisher")
var requestsMap = Map [ActorRef, Notification]()
var idx = 0;
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3) {
case _: RemoteServerErrorException =>
println(s"An error occurred when trying to connect with remote server, node that caused failure: ${sender.path}. Resuming...")
self ! RepeatRequest(requestsMap(sender), sender)
Resume
case e =>
println(s"Something else went wrong: $e")
Stop
}
def receive = LoggingReceive {
case Notification(auction, highestBidder, highestBid) =>
val notifierRequest = context.actorOf(Props(new NotifierRequest(auctionPublisher)), "notifierRequest" + idx);
idx = idx + 1
val notification = Notification(auction, highestBidder, highestBid)
requestsMap += (notifierRequest -> notification)
notifierRequest ! notification
}
}
object Notifier {
final case class RepeatRequest(notification: Notification, notifierRequest: ActorRef)
final case class Notification(auctionName: String, highestBidder: ActorRef, highestBid: Int);
}
开发者ID:Stawicki,项目名称:reactive-scala,代码行数:44,代码来源:Notifier.scala
示例6: SearchDatabaseActor
//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import scala.concurrent.duration.Duration
import scala.io.Source._
class SearchDatabaseActor(filename: String) extends Actor {
override def receive: Receive = {
case Search(title) =>
val lines = fromFile(filename).getLines
sender ! SearchResponse(lines.filter(s => s.startsWith(title)).toArray)
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Stop
}
}
case class SearchResponse(books: Array[String])
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:21,代码来源:SearchDatabaseActor.scala
示例7: ClientActor
//设置package包名称以及导入依赖的类
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import scala.concurrent.duration._
class ClientActor extends Actor {
override def receive: Receive = {
case Ordered =>
println("\033[33m:: Book was ordered\033[0m")
case Searched(titles, prices) =>
for (i <- titles.indices)
println("\033[33m:: " + s"'${titles(i)}' costs ${prices(i)} PLN" + "\033[0m")
case SearchError(message) =>
println("\033[33m:: Search failed in database: " + message + "\033[0m")
case StreamLine(line) =>
println("\033[37m" + line + "\033[0m")
}
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Resume
}
}
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:23,代码来源:ClientActor.scala
示例8: StreamActor
//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import scala.concurrent.duration.Duration
import scala.io.Source.fromFile
class StreamActor extends Actor {
override def receive: Receive = {
case StreamBook(filename) =>
val materializer = ActorMaterializer.create(context) // Materializing and running a stream always requires a Materializer to be in implicit scope.
val sink = Source
.actorRef(1000, OverflowStrategy.dropNew) // If the buffer is full when a new element arrives, drops the new element.
.throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping) // throttle - to slow down the stream to 1 element per second.
.to(Sink.actorRef(sender, NotUsed)) // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
.run()(materializer)
val lines = fromFile(filename).getLines
lines.foreach(line => sink ! StreamLine(line))
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Restart
}
}
case class StreamBook(fileName: String)
case class StreamLine(line: String)
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala
示例9: OrderActor
//设置package包名称以及导入依赖的类
import java.io.FileWriter
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import scala.concurrent.duration.Duration
class OrderActor() extends Actor {
override def receive: Receive = {
case Order(title: String) =>
val currentSender = sender
orderSingleton.orderBook(title)
currentSender ! Ordered
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Escalate // because of other threads
}
}
case class Order(title: String)
case class Ordered()
object orderSingleton {
def orderBook(title: String): Unit = {
new FileWriter("orders.txt", true) { // append to end of file
write(s"$title\n")
close()
}
}
}
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:33,代码来源:OrderActor.scala
示例10: RootActorSystem
//设置package包名称以及导入依赖的类
package com.wavesplatform.actor
import akka.actor.{ActorSystem, AllForOneStrategy, SupervisorStrategy, SupervisorStrategyConfigurator}
import com.typesafe.config.Config
import scorex.utils.ScorexLogging
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object RootActorSystem extends ScorexLogging {
@volatile private var failed = false
final class EscalatingStrategy extends SupervisorStrategyConfigurator {
override def create(): SupervisorStrategy = AllForOneStrategy(loggingEnabled = false) {
case t: Throwable =>
failed = true
log.error("Root actor got exception, escalate", t)
SupervisorStrategy.Escalate
}
}
def start(id: String, config: Config)(init: ActorSystem => Unit): Unit = {
val system = ActorSystem(id, config)
try {
init(system)
} catch {
case e: Exception =>
log.error(s"Error while initializing actor system $id", e)
sys.exit(1)
}
Await.result(system.whenTerminated, Duration.Inf)
if (failed) {
sys.exit(1)
} else {
sys.exit(0)
}
}
}
开发者ID:wavesplatform,项目名称:Waves,代码行数:40,代码来源:RootActorSystem.scala
示例11: AccountSupervisor
//设置package包名称以及导入依赖的类
package sample.blog
import akka.actor.{Actor, ActorInitializationException, ActorLogging, DeathPactException, OneForOneStrategy, Props, SupervisorStrategy, Terminated}
import scala.concurrent.duration._
class AccountSupervisor extends Actor with ActorLogging {
val counter = context.actorOf(Props[AccountEntity], "account-")
context.watch(counter)
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2) {
case _: IllegalArgumentException ? SupervisorStrategy.Resume
case _: ActorInitializationException ? SupervisorStrategy.Stop
case _: DeathPactException ? SupervisorStrategy.Stop
case _: Exception ? SupervisorStrategy.Restart
}
def receive = {
case x:Terminated =>
log.info(s"The child ${sender} terminated due to - ${x}")
case msg ? {
log.info(s"Supervisor of this message ${msg} - ${self.path}")
counter forward msg
}
}
}
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:29,代码来源:AccountSupervisor.scala
示例12: Supervision
//设置package包名称以及导入依赖的类
package com.pagerduty.scheduler.akka
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{AllForOneStrategy, SupervisorStrategy, SupervisorStrategyConfigurator}
import com.pagerduty.scheduler.Scheduler
object Supervision {
val AlwaysEscalateStrategy = AllForOneStrategy() {
case _: Throwable => Escalate
}
def makeAlwaysEscalateTopicSupervisorStrategy(logger: Scheduler.Logging): AllForOneStrategy = {
AllForOneStrategy() {
case throwable: Throwable => {
logger.reportActorSystemRestart(throwable)
Escalate
}
}
}
}
final class UserGuardianEscalateStrategy extends SupervisorStrategyConfigurator {
override def create(): SupervisorStrategy = Supervision.AlwaysEscalateStrategy
}
开发者ID:PagerDuty,项目名称:scheduler,代码行数:26,代码来源:Supervision.scala
示例13: SimplifiedTweetProcessorActor
//设置package包名称以及导入依赖的类
package org.eigengo.rsa.ingest.v100
import java.util.UUID
import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.ActorMaterializer
import cakesolutions.kafka.{KafkaProducer, KafkaProducerRecord, KafkaSerializer}
import com.google.protobuf.ByteString
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
import org.eigengo.rsa.Envelope
object SimplifiedTweetProcessorActor {
def props(config: Config): Props = {
val producerConf = KafkaProducer.Conf(
config.getConfig("tweet-image-producer"),
new StringSerializer,
KafkaSerializer[Envelope](_.toByteArray)
)
Props(classOf[SimplifiedTweetProcessorActor], producerConf)
}
}
class SimplifiedTweetProcessorActor(producerConf: KafkaProducer.Conf[String, Envelope]) extends Actor {
private[this] val producer = KafkaProducer(conf = producerConf)
implicit val _ = ActorMaterializer()
import scala.concurrent.duration._
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10.seconds) {
case _ ? SupervisorStrategy.Restart
}
override def receive: Receive = {
case TweetImage(handle, content) ?
producer.send(KafkaProducerRecord("tweet-image", handle,
Envelope(version = 100,
handle = handle,
ingestionTimestamp = System.nanoTime(),
processingTimestamp = System.nanoTime(),
messageId = UUID.randomUUID().toString,
correlationId = UUID.randomUUID().toString,
payload = content)))
case SimplifiedTweet(handle, mediaUrls) ?
mediaUrls.foreach { mediaUrl ?
import context.dispatcher
val request = HttpRequest(method = HttpMethods.GET, uri = Uri(mediaUrl))
val timeout = 1000.millis
Http(context.system).singleRequest(request).flatMap(_.entity.toStrict(timeout)).foreach { entity ?
self ! TweetImage(handle, ByteString.copyFrom(entity.data.toArray))
}
}
}
}
开发者ID:eigengo,项目名称:reactive-summit-2016,代码行数:58,代码来源:SimplifiedTweetProcessorActor.scala
示例14: AkkaSupervision
//设置package包名称以及导入依赖的类
package trove.core.event
import akka.actor.SupervisorStrategy.{Resume, Stop}
import akka.actor.{ActorInitializationException, ActorKilledException, OneForOneStrategy, SupervisorStrategy, SupervisorStrategyConfigurator}
import grizzled.slf4j.Logging
class AkkaSupervision extends SupervisorStrategyConfigurator with Logging {
override def create(): SupervisorStrategy = {
logger.debug("Creating custom akka supervisor strategy")
OneForOneStrategy() {
case _: ActorInitializationException => Stop
case _: ActorKilledException => Stop
case e: Exception =>
logger.error("Exception in event listener", e)
Resume
}
}
}
开发者ID:emanchgo,项目名称:trove,代码行数:19,代码来源:AkkaSupervision.scala
示例15: Intact
//设置package包名称以及导入依赖的类
package oriana
import akka.actor.SupervisorStrategy.Stop
import akka.actor.{OneForOneStrategy, SupervisorStrategy, Props, Actor}
sealed trait FuseStatus
case object Intact extends FuseStatus
case object Blown extends FuseStatus
class Fuse(childSpec: Props) extends Actor {
case object FuseBlown
var status: FuseStatus = Intact
val child = context.actorOf(childSpec)
def receive = {
case Fuse.ReadStatus => sender() ! status
case FuseBlown => status = Blown
case x if status == Intact => child forward x
}
override def supervisorStrategy = OneForOneStrategy() {
case _: Throwable =>
self ! FuseBlown
Stop
}
}
object Fuse {
case object ReadStatus
def props(childSpec: Props) = Props(new Fuse(childSpec))
}
开发者ID:Norwae,项目名称:oriana,代码行数:32,代码来源:Fuse.scala
示例16: AuditNode
//设置package包名称以及导入依赖的类
package audit
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.{Actor, OneForOneStrategy, Props, SupervisorStrategy}
import audit.collector.CollectorActor
import audit.collector.CollectorActor.CollectorRequest
import audit.viewer.ViewerActor
import audit.viewer.ViewerActor.ViewerRequest
import com.datastax.driver.core.Cluster
class AuditNode(cluster: Cluster.Builder, settings: AuditSettings) extends Actor {
val session = cluster.build().connect()
val collector = context.actorOf(CollectorActor.props(session), "collector")
val viewer = context.actorOf(ViewerActor.props(session), "viewer")
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case e: Exception => Escalate
}
override def receive: Receive = {
case req: CollectorRequest => collector forward req
case req: ViewerRequest => viewer forward req
}
}
object AuditNode {
def props(cluster: Cluster.Builder, settings: AuditSettings) = Props(new AuditNode(cluster, settings))
}
开发者ID:grzesiekw,项目名称:audit,代码行数:30,代码来源:AuditNode.scala
示例17: GeneralSupervisor
//设置package包名称以及导入依赖的类
package com.bisphone.sarf.implv1.util
import akka.actor.{Actor, AllForOneStrategy, Props, SupervisorStrategy}
import scala.concurrent.duration._
private[implv1] class GeneralSupervisor (
props: Props,
maxNrOfRetries: Int,
withinTimeRange: FiniteDuration,
loggingEnabled: Boolean,
recoveryStrategy: PartialFunction[Throwable, SupervisorStrategy.Directive]
) extends Actor {
override val supervisorStrategy = AllForOneStrategy(
maxNrOfRetries = maxNrOfRetries,
withinTimeRange = withinTimeRange,
loggingEnabled = loggingEnabled
)(recoveryStrategy)
val ref = context.actorOf(props, "actor")
override def receive: Receive = {
case msg => ref.tell(msg, sender)
}
override def postStop (): Unit = {
context.children.foreach { child => context stop child }
}
}
object GeneralSupervisor {
def props (
props: Props,
maxNrOfRetries: Int,
withinTimeRange: FiniteDuration,
loggingEnabled: Boolean
)(
recoveryStrategy: PartialFunction[Throwable, SupervisorStrategy.Directive]
) = Props {
new GeneralSupervisor(
props,
maxNrOfRetries,
withinTimeRange,
loggingEnabled,
recoveryStrategy
)
}
}
开发者ID:bisphone,项目名称:SARF,代码行数:52,代码来源:GeneralSupervisor.scala
示例18: LogProcSupervisor
//设置package包名称以及导入依赖的类
package chapter4
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{OneForOneStrategy, SupervisorStrategy, Actor, Props}
class LogProcSupervisor(dbSupervisorProps:Props) extends Actor{
//if file corrupted,keep deal the next msg
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(){
case _:CorruptedFileException=>Resume
}
//by dbSupervisorProps, create db supervisor
val dbSupervisor=context.actorOf(dbSupervisorProps)
//this supervisor monitoer the logProcessor
val logProcProps=Props(new LogProcessor(dbSupervisor))
val logProcessor=context.actorOf(logProcProps)
def receive={
case m=> logProcessor forward(m)
}
}
开发者ID:wjingyao2008,项目名称:firsttry,代码行数:24,代码来源:LogProcSupervisor.scala
示例19: DiscoveryGuardian
//设置package包名称以及导入依赖的类
package linguistic
import java.time.LocalDateTime
import java.util.TimeZone
import akka.actor.{Actor, ActorLogging, Props, SupervisorStrategy}
import scala.concurrent.duration._
object DiscoveryGuardian {
def props(env: String, httpPort: Int, hostName: String) =
Props(new DiscoveryGuardian(env, httpPort, hostName))
.withDispatcher(HttpServer.HttpDispatcher)
}
class DiscoveryGuardian(env: String, httpPort: Int, hostName: String) extends Actor with ActorLogging {
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
val system = context.system
val config = context.system.settings.config
val timeout = FiniteDuration(config.getDuration("constructr.join-timeout").toNanos, NANOSECONDS)
system.scheduler.scheduleOnce(timeout)(self ! 'Discovered)(system.dispatcher)
override def receive: Receive = {
case 'Discovered =>
context.system.actorOf(HttpServer.props(httpPort, hostName,
config.getString("akka.http.ssl.keypass"), config.getString("akka.http.ssl.storepass")), "http-server")
println(Console.GREEN +
"""
___ ___ ___ __ __ ___ ___
/ __| | __| | _ \ \ \ / / | __| | _ \
\__ \ | _| | / \ V / | _| | /
|___/ |___| |_|_\ \_/ |___| |_|_\
""" + Console.RESET)
val tz = TimeZone.getDefault.getID
val greeting = new StringBuilder()
.append('\n')
.append("=================================================================================================")
.append('\n')
.append(s"? ? ? Environment: ${env} TimeZone: $tz Started at ${LocalDateTime.now} ? ? ?")
.append('\n')
.append(s"? ? ? ConstructR service-discovery: ${config.getString("constructr.coordination.class-name")} on ${config.getString("constructr.coordination.host")} ? ? ?")
.append('\n')
.append(s"? ? ? Akka cluster: ${config.getInt("akka.remote.netty.tcp.port")} ? ? ?")
.append('\n')
.append(s"? ? ? Akka seeds: ${config.getStringList("akka.cluster.seed-nodes")} ? ? ?")
.append('\n')
.append(s"? ? ? Cassandra domain points: ${config.getStringList("cassandra-journal.contact-points")} ? ? ?")
.append('\n')
.append(s"? ? ? Server online at https://${config.getString("akka.http.interface")}:${httpPort} ? ? ?")
.append('\n')
.append("=================================================================================================")
system.log.info(greeting.toString)
}
}
开发者ID:haghard,项目名称:linguistic,代码行数:59,代码来源:DiscoveryGuardian.scala
示例20: KafkaPublisher
//设置package包名称以及导入依赖的类
package nl.tradecloud.kafka
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, Props, SupervisorStrategy}
import akka.kafka.ProducerSettings
import akka.pattern.BackoffSupervisor
import akka.stream.Materializer
import nl.tradecloud.kafka.command.Publish
import nl.tradecloud.kafka.config.KafkaConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
class KafkaPublisher(system: ActorSystem)(implicit mat: Materializer, context: ActorRefFactory) {
import KafkaPublisher._
implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("dispatchers.kafka-dispatcher")
val kafkaConfig = KafkaConfig(system.settings.config)
private lazy val publisherId = KafkaClientIdSequenceNumber.getAndIncrement
private def publisherSettings = {
val keySerializer = new StringSerializer
val valueSerializer = new ByteArraySerializer
ProducerSettings(system, keySerializer, valueSerializer).withBootstrapServers(kafkaConfig.brokers)
}
private val publisherProps: Props = KafkaPublisherActor.props(kafkaConfig, publisherSettings)
private val backoffPublisherProps: Props = BackoffSupervisor.propsWithSupervisorStrategy(
publisherProps, s"KafkaPublisherActor$publisherId", 3.seconds,
30.seconds, 1.0, SupervisorStrategy.stoppingStrategy
)
private val publishActor = context.actorOf(backoffPublisherProps, s"KafkaBackoffPublisher$publisherId")
def publish(topic: String, msg: AnyRef): Future[Done] = {
val completed: Promise[Done] = Promise()
publishActor ! Publish(topic, msg, completed)
completed.future
}
}
object KafkaPublisher {
private val KafkaClientIdSequenceNumber = new AtomicInteger(1)
}
开发者ID:tradecloud,项目名称:kafka-akka-extension,代码行数:53,代码来源:KafkaPublisher.scala
注:本文中的akka.actor.SupervisorStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论