• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ActorContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.ActorContext的典型用法代码示例。如果您正苦于以下问题:Scala ActorContext类的具体用法?Scala ActorContext怎么用?Scala ActorContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ActorContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessorHierarchySpec

//设置package包名称以及导入依赖的类
package akka.stream

import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import akka.actor.ActorContext
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorRef
import scala.collection.immutable.TreeSet
import scala.util.control.NonFatal
import akka.stream.impl.ActorBasedFlowMaterializer

class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") {

  val materializer = FlowMaterializer(MaterializerSettings())

  def self = ActorBasedFlowMaterializer.currentActorContext().self

  "An ActorBasedFlowMaterializer" must {

    "generate the right level of descendants" in {
      val f = Flow(() ? {
        testActor ! self
        Flow(List(1)).map(x ? { testActor ! self; x }).toProducer(materializer)
      }).take(3).foreach(x ? {
        testActor ! self
        Flow(x).foreach(_ ? testActor ! self).consume(materializer)
      }).toFuture(materializer)
      Await.result(f, 3.seconds)
      val refs = receiveWhile(idle = 250.millis) {
        case r: ActorRef ? r
      }
      try {
        refs.toSet.size should be(8)
        refs.distinct.map(_.path.elements.size).groupBy(x ? x).mapValues(x ? x.size) should be(Map(2 -> 2, 3 -> 6))
      } catch {
        case NonFatal(e) ?
          println(refs.map(_.toString).to[TreeSet].mkString("\n"))
          throw e
      }
    }

  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:46,代码来源:ProcessorHierarchySpec.scala


示例2: LoggingReceive

//设置package包名称以及导入依赖的类
package akka.event

import language.existentials

import akka.actor.Actor.Receive
import akka.actor.ActorContext
import akka.actor.ActorCell
import akka.event.Logging.Debug

object LoggingReceive {

  
class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String])(implicit context: ActorContext) extends Receive {
  def this(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) = this(source, r, None)
  def isDefinedAt(o: Any): Boolean = {
    val handled = r.isDefinedAt(o)
    val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor)
    context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o
      + (label match {
        case Some(l) ? " in state " + l
        case _       ? ""
      })))
    handled
  }
  def apply(o: Any): Unit = r(o)
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:27,代码来源:LoggingReceive.scala


示例3: HttpClientProvider

//设置package包名称以及导入依赖的类
package org.zalando.react.nakadi.client.providers

import java.security.SecureRandom
import java.security.cert.X509Certificate
import javax.net.ssl.{SSLContext, TrustManager, X509TrustManager}

import akka.actor.ActorContext
import akka.http.scaladsl.Http.OutgoingConnection
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.stream.scaladsl.Flow

import scala.concurrent.Future
import scala.concurrent.duration._


class HttpClientProvider(actorContext: ActorContext,
                         server: String, port: Int,
                         isConnectionSSL: Boolean = false,
                         acceptAnyCertificate: Boolean = false,
                         connectionTimeout: FiniteDuration) {

  val http = Http(actorContext.system)

  private val settings = {
    ClientConnectionSettings
      .apply(actorContext.system)
      .withConnectingTimeout(connectionTimeout)
      .withIdleTimeout(Duration.Inf)
  }

  val connection: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {

    isConnectionSSL match {
      case true =>
        val sslContext = if (!acceptAnyCertificate) SSLContext.getDefault else {

          val permissiveTrustManager: TrustManager = new X509TrustManager() {
            override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = {}
            override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = {}
            override def getAcceptedIssuers(): Array[X509Certificate] = Array.empty
          }

          val ctx = SSLContext.getInstance("TLS")
          ctx.init(Array.empty, Array(permissiveTrustManager), new SecureRandom())
          ctx
        }
        http.outgoingConnectionHttps(server, port, new HttpsConnectionContext(sslContext), settings = settings)
      case false =>
        http.outgoingConnection(server, port, settings = settings)
    }
  }

} 
开发者ID:zalando-nakadi,项目名称:reactive-nakadi,代码行数:56,代码来源:HttpClientProvider.scala


示例4: RemoteStrategy

//设置package包名称以及导入依赖的类
package prisoners_dilemma

import akka.actor.ActorContext
import java.net.URI

class RemoteStrategy(context:ActorContext, firstMove: URI)
extends Strategizer {

  def newGame(): RoundStrategy = {
     // call first URI
     val (move, uri) = parseResponse("la la ala la ala")
     new RemoteRoundStrategy(context, move, uri)
  }

  private def parseResponse(response: String ): (Move, URI) = ???

 private class RemoteRoundStrategy(context:ActorContext, myMove: Move, nextURI: URI) extends RoundStrategy {
  val currentMove = myMove
  def next(m: Move) = {
    // call URI
    val (nextMove, laterURI) = parseResponse("bananas")
    new RemoteRoundStrategy(context, nextMove, laterURI)
  }
 }

} 
开发者ID:Mharlin,项目名称:better-testing-workshop,代码行数:27,代码来源:RemoteStrategy.scala


示例5: ClusterRouterGroupSpecification

//设置package包名称以及导入依赖的类
package io.clouderite.commons.scala.berries.akka

import akka.actor.{ActorContext, ActorRef}
import akka.cluster.routing.{ClusterRouterGroup, ClusterRouterGroupSettings}
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.{ConsistentHashingGroup, Group, RandomGroup, TailChoppingGroup}

import scala.concurrent.duration.FiniteDuration

case class ClusterRouterGroupSpecification(name: String, path: String, role: String) {
  def routerSettings(implicit context: ActorContext): ClusterRouterGroupSettings = {
    ClusterRouterGroupSettings(totalInstances = 100, routeesPaths = List(path), allowLocalRoutees = true, useRole = Some(role))
  }
}

case class TailChoppingSpecification(within: FiniteDuration, interval: FiniteDuration) {
  def routerStrategy(implicit context: ActorContext): Group = {
    TailChoppingGroup(Nil, within = within, interval = interval)
  }
}

object ClusterRouterGroupFactory {
  def consistentHashingGroup(specification: ClusterRouterGroupSpecification)(hashMapping: ConsistentHashMapping)(implicit context: ActorContext): ActorRef = {
    context.actorOf(
      ClusterRouterGroup(
        ConsistentHashingGroup(Nil, hashMapping = hashMapping),
        specification.routerSettings
      ).props(), name = specification.name)
  }

  def randomGroup(specification: ClusterRouterGroupSpecification)(implicit context: ActorContext): ActorRef = {
    context.actorOf(
      ClusterRouterGroup(
        RandomGroup(Nil),
        specification.routerSettings
      ).props(), name = specification.name)
  }

  def tailChoppingGroup(specification: ClusterRouterGroupSpecification, choppingSpecification: TailChoppingSpecification)(implicit context: ActorContext): ActorRef = {
    context.actorOf(ClusterRouterGroup(
      choppingSpecification.routerStrategy,
      specification.routerSettings)
      .props(), name = specification.name)
  }
} 
开发者ID:clouderite,项目名称:scala-berries,代码行数:46,代码来源:ClusterRouterGroupFactory.scala


示例6: LoggingReceive

//设置package包名称以及导入依赖的类
package akka.event

import language.existentials
import akka.actor.Actor.Receive
import akka.actor.ActorContext
import akka.actor.ActorCell
import akka.actor.DiagnosticActorLogging
import akka.event.Logging.{ LogEvent, LogLevel }
import akka.actor.AbstractActor
import scala.runtime.BoxedUnit

object LoggingReceive {

  
class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String], logLevel: LogLevel)(implicit context: ActorContext) extends Receive {
  def this(source: Option[AnyRef], r: Receive, label: Option[String])(implicit context: ActorContext) = this(source, r, label, Logging.DebugLevel)
  def this(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) = this(source, r, None, Logging.DebugLevel)
  def isDefinedAt(o: Any): Boolean = {
    val handled = r.isDefinedAt(o)
    if (context.system.eventStream.logLevel >= logLevel) {
      val src = source getOrElse context.asInstanceOf[ActorCell].actor
      val (str, clazz) = LogSource.fromAnyRef(src)
      val message = "received " + (if (handled) "handled" else "unhandled") + " message " + o + " from " + context.sender() +
        (label match {
          case Some(l) ? " in state " + l
          case _ ? ""
        })
      val event = src match {
        case a: DiagnosticActorLogging ? LogEvent(logLevel, str, clazz, message, a.log.mdc)
        case _ ? LogEvent(logLevel, str, clazz, message)
      }
      context.system.eventStream.publish(event)
    }
    handled
  }
  def apply(o: Any): Unit = r(o)
} 
开发者ID:rorygraves,项目名称:perf_tester,代码行数:38,代码来源:LoggingReceive.scala


示例7: LocalFileStreamSource

//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source

import java.io.File
import java.nio.file.Path

import akka.actor.{Actor, ActorContext, ActorRef, Props}
import akka.stream.actor.ActorPublisher
import build.unstable.sonic.JsonProtocol._
import build.unstable.sonic.model.{Query, RequestContext, SonicMessage}
import build.unstable.sonicd.SonicdLogging
import build.unstable.sonicd.source.file.{FileWatcher, FileWatcherWorker, LocalFilePublisher}
import spray.json._


class LocalFileStreamSource(query: Query, actorContext: ActorContext, context: RequestContext)
  extends SonicdSource(query, actorContext, context) {

  lazy val publisher: Props = {
    val path = getConfig[String]("path")
    val tail = getOption[Boolean]("tail").getOrElse(true)

    val glob = FileWatcher.parseGlob(path)
    val workerProps = { dir: Path ? Props(classOf[FileWatcherWorker], dir) }
    val watchers = LocalFilePublisher.getWatchers(glob, actorContext, workerProps)

    Props(classOf[LocalFileStreamPublisher], query.id.get, query.query, tail, glob.fileFilterMaybe, watchers, context)

  }
}

class LocalFileStreamPublisher(val queryId: Long,
                               val rawQuery: String,
                               val tail: Boolean,
                               val fileFilterMaybe: Option[String],
                               val watchersPair: Vector[(File, ActorRef)],
                               val ctx: RequestContext)
  extends Actor with ActorPublisher[SonicMessage] with SonicdLogging with LocalFilePublisher {

  override def parseUTF8Data(raw: String): JsValue = JsString(raw)

} 
开发者ID:ernestrc,项目名称:sonicd,代码行数:42,代码来源:LocalFileStreamSource.scala


示例8: SonicSource

//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source

import java.net.InetSocketAddress

import akka.actor.{ActorContext, Props}
import build.unstable.sonic.JsonProtocol._
import build.unstable.sonic.client.SonicPublisher
import build.unstable.sonic.model.{Query, RequestContext, SonicCommand}
import build.unstable.sonic.scaladsl.Sonic._
import spray.json.JsObject

class SonicSource(q: Query, actorContext: ActorContext, context: RequestContext)
  extends SonicdSource(q, actorContext, context) {

  val host: String = getConfig[String]("host")
  val port: Int = getOption[Int]("port").getOrElse(8889)
  val config: JsObject = getConfig[JsObject]("config")
  val addr = new InetSocketAddress(host, port)

  val query: SonicCommand = Query(q.query, config, q.auth).copy(trace_id = q.traceId)

  val supervisorName = s"sonic_${addr.hashCode()}"

  lazy val publisher: Props = {
    val sonicSupervisor = actorContext.child(supervisorName).getOrElse {
      actorContext.actorOf(sonicSupervisorProps(addr), supervisorName)
    }

    Props(classOf[SonicPublisher], sonicSupervisor, query, false)
  }
} 
开发者ID:ernestrc,项目名称:sonicd,代码行数:32,代码来源:SonicSource.scala


示例9: SonicdSource

//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source

import akka.actor.ActorContext
import build.unstable.sonic.model.{DataSource, Query, RequestContext}
import build.unstable.sonicd.SonicdLogging
import build.unstable.sonicd.source.SonicdSource._
import build.unstable.sonicd.system.actor.SonicdController._
import spray.json.JsonFormat
import build.unstable.sonic.JsonProtocol._

abstract class SonicdSource(query: Query, actorContext: ActorContext, context: RequestContext)
  extends DataSource(query, actorContext, context) with SonicdLogging {

  def getConfig[T: JsonFormat](key: String): T = {
    val value = query.sonicdConfig.fields.get(key).flatMap(_.convertTo[Option[T]])
      .getOrElse(throw new MissingConfigurationException(key))
    log.debug("getConfig({})={}", key, value)
    value
  }

  def getOption[T: JsonFormat](key: String): Option[T] = {
    val value = query.sonicdConfig.fields.get(key).flatMap(_.convertTo[Option[T]])
    log.debug("getOption({})={}", key, value)
    value
  }

}

object SonicdSource {

  class MissingConfigurationException(missing: String) extends Exception(s"config is missing '$missing' field")

} 
开发者ID:ernestrc,项目名称:sonicd,代码行数:34,代码来源:SonicdSource.scala


示例10: SyntheticSource

//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source

import akka.actor.{ActorContext, Props}
import build.unstable.sonic.model.{Query, RequestContext}
import build.unstable.sonic.server.source.SyntheticPublisher
import spray.json.JsObject
import build.unstable.sonic.JsonProtocol._

class SyntheticSource(query: Query, actorContext: ActorContext, context: RequestContext)
  extends SonicdSource(query, actorContext, context) {

  val publisher: Props = {
    val seed = getOption[Int]("seed")
    val size = getOption[Int]("size")
    val progress = getOption[Int]("progress-delay").getOrElse(10)
    val indexed = getOption[Boolean]("indexed").getOrElse(false)

    //user pre-defined schema
    val schema = getOption[JsObject]("schema")

    Props(classOf[SyntheticPublisher], seed, size, progress, query.query, indexed, schema, context)
  }
} 
开发者ID:ernestrc,项目名称:sonicd,代码行数:24,代码来源:SyntheticSource.scala


示例11: MockSource

//设置package包名称以及导入依赖的类
package build.unstable.sonicd.service

import akka.actor.{Actor, ActorContext, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import build.unstable.sonic.model._
import build.unstable.sonicd.SonicdLogging

import scala.collection.mutable

class MockSource(query: Query, actorContext: ActorContext, context: RequestContext)
  extends DataSource(query, actorContext, context) {

  override def publisher: Props = Props(classOf[ProxyPublisher])
}

class ProxyPublisher extends Actor with ActorPublisher[SonicMessage] with SonicdLogging {

  val buffer = mutable.Queue.empty[SonicMessage]

  override def unhandled(message: Any): Unit = {
    log.warning(">>>>>>>>>>>>>>>> unhandled message for mock publisher: {}", message)
  }

  override def receive: Receive = {
    case c: StreamCompleted ?
      if (isActive && totalDemand > 0) {
        onNext(c)
        onCompleteThenStop()
      } else context.become({
        case Request(_) ?
          onNext(c)
          onCompleteThenStop()
      })
    case m: SonicMessage ?
      if (isActive && totalDemand > 0) onNext(m)
      else buffer.enqueue(m)
    case r: Request ?
      while (isActive && totalDemand > 0 && buffer.nonEmpty) {
        onNext(buffer.dequeue())
      }
  }
} 
开发者ID:ernestrc,项目名称:sonicd,代码行数:44,代码来源:MockSource.scala


示例12: ConfigurationStore

//设置package包名称以及导入依赖的类
package phu.quang.le.utils

import scala.collection.mutable.Map
import akka.actor.ActorContext

object ConfigurationStore {
  val entries = Map[String, AnyRef]()
  
  def put(key: String, value: AnyRef) {
    entries += ((key, value))
  }
  
  def get[A] = entries.values.find(_.isInstanceOf[A]).asInstanceOf[Option[A]]
}

trait Configured {
  def configured[A](implicit actorContext: ActorContext): A = 
    ConfigurationStore.get[A].get
}

trait Configuration {
  def configure[R <: AnyRef](f: => R) = {
    val a = f
    ConfigurationStore.put(a.getClass.getName, a)
    a
  }
} 
开发者ID:p-le,项目名称:manga-batch,代码行数:28,代码来源:Config.scala


示例13: Settings

//设置package包名称以及导入依赖的类
package com.mikemunhall.simpletwitterstats

import akka.actor.{ActorContext, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import com.typesafe.config.Config

object Settings extends ExtensionId[Settings] with ExtensionIdProvider {

  override def lookup = Settings
  override def createExtension(system: ExtendedActorSystem) = new Settings(system.settings.config, system)
  def apply(context: ActorContext): Settings = apply(context.system)
}

class Settings(config: Config, extendedSystem: ExtendedActorSystem) extends Extension {

  object API {
    object Http {
      val Port = config.getInt("simple-twitter-stats.api.http.port")
      val Interface = config.getString("simple-twitter-stats.api.http.interface")
    }
  }

  object Server {
    object Twitter {
      object Client {
        object OAuth {
          object Consumer {
            val key = config.getString("simple-twitter-stats.server.twitter.client.oauth.consumer.key")
            val secret = config.getString("simple-twitter-stats.server.twitter.client.oauth.consumer.secret")
          }
          object Access {
            val token = config.getString("simple-twitter-stats.server.twitter.client.oauth.access.token")
            val secret = config.getString("simple-twitter-stats.server.twitter.client.oauth.access.secret")
          }
        }
      }
    }
  }

} 
开发者ID:mmunhall,项目名称:simple-twitter-stats,代码行数:40,代码来源:Settings.scala


示例14: sendMessageToNextTask

//设置package包名称以及导入依赖的类
package actors.traits

import akka.actor.{ActorContext, Props, ActorRef}


  def sendMessageToNextTask(previousSender: ActorRef, oldRouteSlipMessage: RouteSlipMessage, newMessage: AnyRef)(implicit context: ActorContext) {
    var copySlip: RouteSlipMessage = oldRouteSlipMessage.copy(message = newMessage)

    if(copySlip.forwardToSender && copySlip.originalSender.isEmpty) {

      //lazy val ForwardToFutureActor = context.actorOf(Props[ForwardToFutureActor], "ForwardToFutureActor")
      copySlip = copySlip.copy(originalSender = Some(previousSender))
    }

    val nextTask = copySlip.routeSlip.head
    val newSlip = copySlip.routeSlip.tail
    copySlip = copySlip.copy(routeSlip = newSlip)

    if (newSlip.isEmpty) {
      if(copySlip.originalSender.isDefined) {
        nextTask.tell(copySlip.message, copySlip.originalSender.get)
      } else {
        nextTask ! copySlip.message
      }
    } else {
      nextTask ! copySlip
    }
  }
} 
开发者ID:GoranSchumacher,项目名称:massive-actors,代码行数:30,代码来源:RouteSlip.scala


示例15: Internals

//设置package包名称以及导入依赖的类
package com.github.astonbitecode.zoocache

import akka.actor.{ Props, ActorRef, ActorContext, ActorSystem }
import scala.concurrent.ExecutionContextExecutor

private[astonbitecode] object Internals {
  trait ActorCreatable {
    val dispatcher: ExecutionContextExecutor
    def actorOf(props: Props): ActorRef
  }

  case class CreateFromActorContext(actorContext: ActorContext) extends ActorCreatable {
    override val dispatcher: ExecutionContextExecutor = actorContext.dispatcher
    override def actorOf(props: Props): ActorRef = actorContext.actorOf(props)
  }

  case class CreateFromActorSystem(actorSystem: ActorSystem) extends ActorCreatable {
    override val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher
    override def actorOf(props: Props): ActorRef = actorSystem.actorOf(props)
  }

  object implicits {

    implicit def actorContext2ActorCreatable(actorContext: ActorContext): ActorCreatable = {
      CreateFromActorContext(actorContext)
    }

    implicit def actorSystem2ActorCreatable(actorSystem: ActorSystem): ActorCreatable = {
      CreateFromActorSystem(actorSystem)
    }
  }
} 
开发者ID:astonbitecode,项目名称:scakka-zoo-cache,代码行数:33,代码来源:Internals.scala


示例16: BotRunner

//设置package包名称以及导入依赖的类
import akka.actor.{Props, ActorRef, ActorContext, ActorSystem}
import io.scalac.slack.api.{Start, BotInfo}
import io.scalac.slack.common.actors.SlackBotActor
import io.scalac.slack.websockets.WebSocket
import io.scalac.slack.{MessageEventBus, BotModules}
import io.scalac.slack.bots.system.{CommandsRecognizerBot, HelpBot}
import io.scalac.slack.common.Shutdownable

object BotRunner extends Shutdownable {
  val system = ActorSystem("SlackBotSystem")
  val eventBus = new MessageEventBus
  val slackBot = system.actorOf(Props(classOf[SlackBotActor], new ExampleBotsBundle(), eventBus, this, None), "slack-bot")
  var botInfo: Option[BotInfo] = None

  def main(args: Array[String]) {
    try {
      slackBot ! Start

      system.awaitTermination()
      println("Shutdown successful...")
    } catch {
      case e: Exception =>
        println("An unhandled exception occurred...", e)
        system.shutdown()
        system.awaitTermination()
    }
  }

  sys.addShutdownHook(shutdown())

  override def shutdown(): Unit = {
    slackBot ! WebSocket.Release
    system.shutdown()
    system.awaitTermination()
  }

  class ExampleBotsBundle() extends BotModules {
    override def registerModules(context: ActorContext, websocketClient: ActorRef) = {
      context.actorOf(Props(classOf[CommandsRecognizerBot], eventBus), "commandProcessor")
      context.actorOf(Props(classOf[HelpBot], eventBus), "helpBot")
      context.actorOf(Props(classOf[MarketBot], eventBus), "MarketBot")
    }
  }

} 
开发者ID:justin-yan,项目名称:marketbot,代码行数:46,代码来源:BotRunner.scala


示例17: FutureMonitor

//设置package包名称以及导入依赖的类
package com.vivint.ceph.lib

import akka.actor.{ ActorContext, Kill }
import akka.event.LoggingAdapter
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success,Failure}

object FutureMonitor {
  def logSuccess[T](log: LoggingAdapter, f: Future[T], desc: String)(implicit ex: ExecutionContext): Future[T] = {
    log.debug(s"${desc} : pulling state")
    f.onComplete {
      case Success(_) => log.debug("{} : success", desc)
      case Failure(ex) =>
        log.error(ex, "{}: failure", desc)
    }
    f
  }

  def crashSelfOnFailure(f: Future[Any], log: LoggingAdapter, description: String)(
    implicit context: ActorContext): Unit = {
    var failed = false
    if (failed) // prevent infinite loops if all children actors get restarted
      context.stop(context.self)
    else
      f.onFailure {
        case ex =>
          failed = true
          context.self ! Kill
          log.error(ex, s"Unexpected error for ${description}")
      }(context.dispatcher)
  }
} 
开发者ID:vivint-smarthome,项目名称:ceph-on-mesos,代码行数:33,代码来源:FutureMonitor.scala


示例18: Ex01Manager

//设置package包名称以及导入依赖的类
package ekiaa.akka.otp.examples.ex01

import akka.actor.{Actor, Props, ActorContext, ActorRef}
import akka.pattern.ask
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.Future
import scala.concurrent.duration._

object Ex01Manager {

  var instance: Option[ActorRef] = None

  def start(context: ActorContext) = {
    synchronized {
      instance match {
        case None =>
          instance = Some(context.actorOf(props, "Ex01Manager"))
        case _ =>
      }
    }
  }

  def props: Props = Props(new Ex01Manager)

  private final case class StartWorker(name: String)

  def startWorker(name: String): Future[Any] = {
    instance match {
      case Some(manager) =>
        ask(manager, StartWorker(name))(30.seconds)
      case None =>
        import scala.concurrent.ExecutionContext.Implicits.global
        Future { None }
    }
  }

}

class Ex01Manager extends Actor with StrictLogging {

  override def preStart() = {
    logger.debug(s"Ex01Manager started with path ${context.self.path}")
  }

  import Ex01Manager._

  val receive: Receive = {
    case StartWorker(name) =>
      val replyTo = sender()
      Ex01Worker.map.get(name) match {
        case result @ Some(_) =>
          replyTo ! result
        case None =>
          import scala.concurrent.ExecutionContext.Implicits.global
          Ex01Worker.start(name).map(result => { replyTo ! result })
      }
    case _ =>
  }

} 
开发者ID:ekiaa,项目名称:akka-otp,代码行数:62,代码来源:Ex01Manager.scala


示例19: Ex01WorkerSup

//设置package包名称以及导入依赖的类
package ekiaa.akka.otp.examples.ex01

import akka.actor.{Actor, ActorContext, Props, ActorRef}
import akka.pattern.ask
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.duration._

import scala.concurrent.Future

object Ex01WorkerSup {

  var instance: Option[ActorRef] = None

  def start(context: ActorContext) = {
    synchronized {
      instance match {
        case None =>
          instance = Some(context.actorOf(props, "Ex01WorkerSup"))
        case _ =>
      }
    }
  }

  def props: Props = Props(new Ex01WorkerSup)

  private final case class StartChild(args: Any)

  def startChild(args: Any): Future[Any] = {
    instance match {
      case Some(supervisor) =>
        ask(supervisor, StartChild(args))(30.seconds)
      case _ =>
        import scala.concurrent.ExecutionContext.Implicits.global
        Future { None }
    }
  }
}

class Ex01WorkerSup extends Actor with StrictLogging {

  override def preStart() = {
    logger.debug(s"Ex01WorkerSup started with path ${context.self.path}")
  }

  import Ex01WorkerSup._

  val receive: Receive = {

    case StartChild(args) =>
      import scala.concurrent.ExecutionContext.Implicits.global
      val replyTo = sender()
      Ex01Worker.start(args, context).map(result => { replyTo ! result })
    case _ =>

  }

} 
开发者ID:ekiaa,项目名称:akka-otp,代码行数:59,代码来源:Ex01WorkerSup.scala


示例20: Sink

//设置package包名称以及导入依赖的类
package pl.edu.agh.workflow.elements

import akka.actor.{Actor, ActorContext, ActorLogging, Props}
import pl.edu.agh.messages.{Get, GetGroupedOut, GetOut, ResultMessage}

class Sink[R] extends Actor with ActorLogging {

  var out = List.empty[R]

  def receive = {
    case ResultMessage(data: R) =>
      //log.info("CHILD")
      out :+= data
    case GetOut =>
      val o = out
      //This line is needed to feed workflow feature
      //out = List.empty[R]
      sender ! o
    case GetGroupedOut(size: Int) =>
      val o = out
      //This line is needed to feed workflow feature
      //out = List.empty[R]
      sender ! o.grouped(size)
    case Get =>
      sender ! this
  }

}

object Sink {
  def apply[R](context: ActorContext) = context.actorOf(Sink.props)
  def apply[R](name: String, context: ActorContext) = context.actorOf(Sink.props, name)
  def props[R] = Props[Sink[R]]
} 
开发者ID:kkrzys,项目名称:akkaflow,代码行数:35,代码来源:Sink.scala



注:本文中的akka.actor.ActorContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala compact类代码示例发布时间:2022-05-23
下一篇:
Scala forAll类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap