本文整理汇总了Scala中akka.actor.ActorContext类的典型用法代码示例。如果您正苦于以下问题:Scala ActorContext类的具体用法?Scala ActorContext怎么用?Scala ActorContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActorContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessorHierarchySpec
//设置package包名称以及导入依赖的类
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import akka.actor.ActorContext
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorRef
import scala.collection.immutable.TreeSet
import scala.util.control.NonFatal
import akka.stream.impl.ActorBasedFlowMaterializer
class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") {
val materializer = FlowMaterializer(MaterializerSettings())
def self = ActorBasedFlowMaterializer.currentActorContext().self
"An ActorBasedFlowMaterializer" must {
"generate the right level of descendants" in {
val f = Flow(() ? {
testActor ! self
Flow(List(1)).map(x ? { testActor ! self; x }).toProducer(materializer)
}).take(3).foreach(x ? {
testActor ! self
Flow(x).foreach(_ ? testActor ! self).consume(materializer)
}).toFuture(materializer)
Await.result(f, 3.seconds)
val refs = receiveWhile(idle = 250.millis) {
case r: ActorRef ? r
}
try {
refs.toSet.size should be(8)
refs.distinct.map(_.path.elements.size).groupBy(x ? x).mapValues(x ? x.size) should be(Map(2 -> 2, 3 -> 6))
} catch {
case NonFatal(e) ?
println(refs.map(_.toString).to[TreeSet].mkString("\n"))
throw e
}
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:46,代码来源:ProcessorHierarchySpec.scala
示例2: LoggingReceive
//设置package包名称以及导入依赖的类
package akka.event
import language.existentials
import akka.actor.Actor.Receive
import akka.actor.ActorContext
import akka.actor.ActorCell
import akka.event.Logging.Debug
object LoggingReceive {
class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String])(implicit context: ActorContext) extends Receive {
def this(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) = this(source, r, None)
def isDefinedAt(o: Any): Boolean = {
val handled = r.isDefinedAt(o)
val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor)
context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o
+ (label match {
case Some(l) ? " in state " + l
case _ ? ""
})))
handled
}
def apply(o: Any): Unit = r(o)
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:27,代码来源:LoggingReceive.scala
示例3: HttpClientProvider
//设置package包名称以及导入依赖的类
package org.zalando.react.nakadi.client.providers
import java.security.SecureRandom
import java.security.cert.X509Certificate
import javax.net.ssl.{SSLContext, TrustManager, X509TrustManager}
import akka.actor.ActorContext
import akka.http.scaladsl.Http.OutgoingConnection
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.http.scaladsl.{Http, HttpsConnectionContext}
import akka.stream.scaladsl.Flow
import scala.concurrent.Future
import scala.concurrent.duration._
class HttpClientProvider(actorContext: ActorContext,
server: String, port: Int,
isConnectionSSL: Boolean = false,
acceptAnyCertificate: Boolean = false,
connectionTimeout: FiniteDuration) {
val http = Http(actorContext.system)
private val settings = {
ClientConnectionSettings
.apply(actorContext.system)
.withConnectingTimeout(connectionTimeout)
.withIdleTimeout(Duration.Inf)
}
val connection: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
isConnectionSSL match {
case true =>
val sslContext = if (!acceptAnyCertificate) SSLContext.getDefault else {
val permissiveTrustManager: TrustManager = new X509TrustManager() {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = {}
override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = {}
override def getAcceptedIssuers(): Array[X509Certificate] = Array.empty
}
val ctx = SSLContext.getInstance("TLS")
ctx.init(Array.empty, Array(permissiveTrustManager), new SecureRandom())
ctx
}
http.outgoingConnectionHttps(server, port, new HttpsConnectionContext(sslContext), settings = settings)
case false =>
http.outgoingConnection(server, port, settings = settings)
}
}
}
开发者ID:zalando-nakadi,项目名称:reactive-nakadi,代码行数:56,代码来源:HttpClientProvider.scala
示例4: RemoteStrategy
//设置package包名称以及导入依赖的类
package prisoners_dilemma
import akka.actor.ActorContext
import java.net.URI
class RemoteStrategy(context:ActorContext, firstMove: URI)
extends Strategizer {
def newGame(): RoundStrategy = {
// call first URI
val (move, uri) = parseResponse("la la ala la ala")
new RemoteRoundStrategy(context, move, uri)
}
private def parseResponse(response: String ): (Move, URI) = ???
private class RemoteRoundStrategy(context:ActorContext, myMove: Move, nextURI: URI) extends RoundStrategy {
val currentMove = myMove
def next(m: Move) = {
// call URI
val (nextMove, laterURI) = parseResponse("bananas")
new RemoteRoundStrategy(context, nextMove, laterURI)
}
}
}
开发者ID:Mharlin,项目名称:better-testing-workshop,代码行数:27,代码来源:RemoteStrategy.scala
示例5: ClusterRouterGroupSpecification
//设置package包名称以及导入依赖的类
package io.clouderite.commons.scala.berries.akka
import akka.actor.{ActorContext, ActorRef}
import akka.cluster.routing.{ClusterRouterGroup, ClusterRouterGroupSettings}
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.{ConsistentHashingGroup, Group, RandomGroup, TailChoppingGroup}
import scala.concurrent.duration.FiniteDuration
case class ClusterRouterGroupSpecification(name: String, path: String, role: String) {
def routerSettings(implicit context: ActorContext): ClusterRouterGroupSettings = {
ClusterRouterGroupSettings(totalInstances = 100, routeesPaths = List(path), allowLocalRoutees = true, useRole = Some(role))
}
}
case class TailChoppingSpecification(within: FiniteDuration, interval: FiniteDuration) {
def routerStrategy(implicit context: ActorContext): Group = {
TailChoppingGroup(Nil, within = within, interval = interval)
}
}
object ClusterRouterGroupFactory {
def consistentHashingGroup(specification: ClusterRouterGroupSpecification)(hashMapping: ConsistentHashMapping)(implicit context: ActorContext): ActorRef = {
context.actorOf(
ClusterRouterGroup(
ConsistentHashingGroup(Nil, hashMapping = hashMapping),
specification.routerSettings
).props(), name = specification.name)
}
def randomGroup(specification: ClusterRouterGroupSpecification)(implicit context: ActorContext): ActorRef = {
context.actorOf(
ClusterRouterGroup(
RandomGroup(Nil),
specification.routerSettings
).props(), name = specification.name)
}
def tailChoppingGroup(specification: ClusterRouterGroupSpecification, choppingSpecification: TailChoppingSpecification)(implicit context: ActorContext): ActorRef = {
context.actorOf(ClusterRouterGroup(
choppingSpecification.routerStrategy,
specification.routerSettings)
.props(), name = specification.name)
}
}
开发者ID:clouderite,项目名称:scala-berries,代码行数:46,代码来源:ClusterRouterGroupFactory.scala
示例6: LoggingReceive
//设置package包名称以及导入依赖的类
package akka.event
import language.existentials
import akka.actor.Actor.Receive
import akka.actor.ActorContext
import akka.actor.ActorCell
import akka.actor.DiagnosticActorLogging
import akka.event.Logging.{ LogEvent, LogLevel }
import akka.actor.AbstractActor
import scala.runtime.BoxedUnit
object LoggingReceive {
class LoggingReceive(source: Option[AnyRef], r: Receive, label: Option[String], logLevel: LogLevel)(implicit context: ActorContext) extends Receive {
def this(source: Option[AnyRef], r: Receive, label: Option[String])(implicit context: ActorContext) = this(source, r, label, Logging.DebugLevel)
def this(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) = this(source, r, None, Logging.DebugLevel)
def isDefinedAt(o: Any): Boolean = {
val handled = r.isDefinedAt(o)
if (context.system.eventStream.logLevel >= logLevel) {
val src = source getOrElse context.asInstanceOf[ActorCell].actor
val (str, clazz) = LogSource.fromAnyRef(src)
val message = "received " + (if (handled) "handled" else "unhandled") + " message " + o + " from " + context.sender() +
(label match {
case Some(l) ? " in state " + l
case _ ? ""
})
val event = src match {
case a: DiagnosticActorLogging ? LogEvent(logLevel, str, clazz, message, a.log.mdc)
case _ ? LogEvent(logLevel, str, clazz, message)
}
context.system.eventStream.publish(event)
}
handled
}
def apply(o: Any): Unit = r(o)
}
开发者ID:rorygraves,项目名称:perf_tester,代码行数:38,代码来源:LoggingReceive.scala
示例7: LocalFileStreamSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source
import java.io.File
import java.nio.file.Path
import akka.actor.{Actor, ActorContext, ActorRef, Props}
import akka.stream.actor.ActorPublisher
import build.unstable.sonic.JsonProtocol._
import build.unstable.sonic.model.{Query, RequestContext, SonicMessage}
import build.unstable.sonicd.SonicdLogging
import build.unstable.sonicd.source.file.{FileWatcher, FileWatcherWorker, LocalFilePublisher}
import spray.json._
class LocalFileStreamSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends SonicdSource(query, actorContext, context) {
lazy val publisher: Props = {
val path = getConfig[String]("path")
val tail = getOption[Boolean]("tail").getOrElse(true)
val glob = FileWatcher.parseGlob(path)
val workerProps = { dir: Path ? Props(classOf[FileWatcherWorker], dir) }
val watchers = LocalFilePublisher.getWatchers(glob, actorContext, workerProps)
Props(classOf[LocalFileStreamPublisher], query.id.get, query.query, tail, glob.fileFilterMaybe, watchers, context)
}
}
class LocalFileStreamPublisher(val queryId: Long,
val rawQuery: String,
val tail: Boolean,
val fileFilterMaybe: Option[String],
val watchersPair: Vector[(File, ActorRef)],
val ctx: RequestContext)
extends Actor with ActorPublisher[SonicMessage] with SonicdLogging with LocalFilePublisher {
override def parseUTF8Data(raw: String): JsValue = JsString(raw)
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:42,代码来源:LocalFileStreamSource.scala
示例8: SonicSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source
import java.net.InetSocketAddress
import akka.actor.{ActorContext, Props}
import build.unstable.sonic.JsonProtocol._
import build.unstable.sonic.client.SonicPublisher
import build.unstable.sonic.model.{Query, RequestContext, SonicCommand}
import build.unstable.sonic.scaladsl.Sonic._
import spray.json.JsObject
class SonicSource(q: Query, actorContext: ActorContext, context: RequestContext)
extends SonicdSource(q, actorContext, context) {
val host: String = getConfig[String]("host")
val port: Int = getOption[Int]("port").getOrElse(8889)
val config: JsObject = getConfig[JsObject]("config")
val addr = new InetSocketAddress(host, port)
val query: SonicCommand = Query(q.query, config, q.auth).copy(trace_id = q.traceId)
val supervisorName = s"sonic_${addr.hashCode()}"
lazy val publisher: Props = {
val sonicSupervisor = actorContext.child(supervisorName).getOrElse {
actorContext.actorOf(sonicSupervisorProps(addr), supervisorName)
}
Props(classOf[SonicPublisher], sonicSupervisor, query, false)
}
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:32,代码来源:SonicSource.scala
示例9: SonicdSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source
import akka.actor.ActorContext
import build.unstable.sonic.model.{DataSource, Query, RequestContext}
import build.unstable.sonicd.SonicdLogging
import build.unstable.sonicd.source.SonicdSource._
import build.unstable.sonicd.system.actor.SonicdController._
import spray.json.JsonFormat
import build.unstable.sonic.JsonProtocol._
abstract class SonicdSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends DataSource(query, actorContext, context) with SonicdLogging {
def getConfig[T: JsonFormat](key: String): T = {
val value = query.sonicdConfig.fields.get(key).flatMap(_.convertTo[Option[T]])
.getOrElse(throw new MissingConfigurationException(key))
log.debug("getConfig({})={}", key, value)
value
}
def getOption[T: JsonFormat](key: String): Option[T] = {
val value = query.sonicdConfig.fields.get(key).flatMap(_.convertTo[Option[T]])
log.debug("getOption({})={}", key, value)
value
}
}
object SonicdSource {
class MissingConfigurationException(missing: String) extends Exception(s"config is missing '$missing' field")
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:34,代码来源:SonicdSource.scala
示例10: SyntheticSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source
import akka.actor.{ActorContext, Props}
import build.unstable.sonic.model.{Query, RequestContext}
import build.unstable.sonic.server.source.SyntheticPublisher
import spray.json.JsObject
import build.unstable.sonic.JsonProtocol._
class SyntheticSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends SonicdSource(query, actorContext, context) {
val publisher: Props = {
val seed = getOption[Int]("seed")
val size = getOption[Int]("size")
val progress = getOption[Int]("progress-delay").getOrElse(10)
val indexed = getOption[Boolean]("indexed").getOrElse(false)
//user pre-defined schema
val schema = getOption[JsObject]("schema")
Props(classOf[SyntheticPublisher], seed, size, progress, query.query, indexed, schema, context)
}
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:24,代码来源:SyntheticSource.scala
示例11: MockSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.service
import akka.actor.{Actor, ActorContext, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import build.unstable.sonic.model._
import build.unstable.sonicd.SonicdLogging
import scala.collection.mutable
class MockSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends DataSource(query, actorContext, context) {
override def publisher: Props = Props(classOf[ProxyPublisher])
}
class ProxyPublisher extends Actor with ActorPublisher[SonicMessage] with SonicdLogging {
val buffer = mutable.Queue.empty[SonicMessage]
override def unhandled(message: Any): Unit = {
log.warning(">>>>>>>>>>>>>>>> unhandled message for mock publisher: {}", message)
}
override def receive: Receive = {
case c: StreamCompleted ?
if (isActive && totalDemand > 0) {
onNext(c)
onCompleteThenStop()
} else context.become({
case Request(_) ?
onNext(c)
onCompleteThenStop()
})
case m: SonicMessage ?
if (isActive && totalDemand > 0) onNext(m)
else buffer.enqueue(m)
case r: Request ?
while (isActive && totalDemand > 0 && buffer.nonEmpty) {
onNext(buffer.dequeue())
}
}
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:44,代码来源:MockSource.scala
示例12: ConfigurationStore
//设置package包名称以及导入依赖的类
package phu.quang.le.utils
import scala.collection.mutable.Map
import akka.actor.ActorContext
object ConfigurationStore {
val entries = Map[String, AnyRef]()
def put(key: String, value: AnyRef) {
entries += ((key, value))
}
def get[A] = entries.values.find(_.isInstanceOf[A]).asInstanceOf[Option[A]]
}
trait Configured {
def configured[A](implicit actorContext: ActorContext): A =
ConfigurationStore.get[A].get
}
trait Configuration {
def configure[R <: AnyRef](f: => R) = {
val a = f
ConfigurationStore.put(a.getClass.getName, a)
a
}
}
开发者ID:p-le,项目名称:manga-batch,代码行数:28,代码来源:Config.scala
示例13: Settings
//设置package包名称以及导入依赖的类
package com.mikemunhall.simpletwitterstats
import akka.actor.{ActorContext, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import com.typesafe.config.Config
object Settings extends ExtensionId[Settings] with ExtensionIdProvider {
override def lookup = Settings
override def createExtension(system: ExtendedActorSystem) = new Settings(system.settings.config, system)
def apply(context: ActorContext): Settings = apply(context.system)
}
class Settings(config: Config, extendedSystem: ExtendedActorSystem) extends Extension {
object API {
object Http {
val Port = config.getInt("simple-twitter-stats.api.http.port")
val Interface = config.getString("simple-twitter-stats.api.http.interface")
}
}
object Server {
object Twitter {
object Client {
object OAuth {
object Consumer {
val key = config.getString("simple-twitter-stats.server.twitter.client.oauth.consumer.key")
val secret = config.getString("simple-twitter-stats.server.twitter.client.oauth.consumer.secret")
}
object Access {
val token = config.getString("simple-twitter-stats.server.twitter.client.oauth.access.token")
val secret = config.getString("simple-twitter-stats.server.twitter.client.oauth.access.secret")
}
}
}
}
}
}
开发者ID:mmunhall,项目名称:simple-twitter-stats,代码行数:40,代码来源:Settings.scala
示例14: sendMessageToNextTask
//设置package包名称以及导入依赖的类
package actors.traits
import akka.actor.{ActorContext, Props, ActorRef}
def sendMessageToNextTask(previousSender: ActorRef, oldRouteSlipMessage: RouteSlipMessage, newMessage: AnyRef)(implicit context: ActorContext) {
var copySlip: RouteSlipMessage = oldRouteSlipMessage.copy(message = newMessage)
if(copySlip.forwardToSender && copySlip.originalSender.isEmpty) {
//lazy val ForwardToFutureActor = context.actorOf(Props[ForwardToFutureActor], "ForwardToFutureActor")
copySlip = copySlip.copy(originalSender = Some(previousSender))
}
val nextTask = copySlip.routeSlip.head
val newSlip = copySlip.routeSlip.tail
copySlip = copySlip.copy(routeSlip = newSlip)
if (newSlip.isEmpty) {
if(copySlip.originalSender.isDefined) {
nextTask.tell(copySlip.message, copySlip.originalSender.get)
} else {
nextTask ! copySlip.message
}
} else {
nextTask ! copySlip
}
}
}
开发者ID:GoranSchumacher,项目名称:massive-actors,代码行数:30,代码来源:RouteSlip.scala
示例15: Internals
//设置package包名称以及导入依赖的类
package com.github.astonbitecode.zoocache
import akka.actor.{ Props, ActorRef, ActorContext, ActorSystem }
import scala.concurrent.ExecutionContextExecutor
private[astonbitecode] object Internals {
trait ActorCreatable {
val dispatcher: ExecutionContextExecutor
def actorOf(props: Props): ActorRef
}
case class CreateFromActorContext(actorContext: ActorContext) extends ActorCreatable {
override val dispatcher: ExecutionContextExecutor = actorContext.dispatcher
override def actorOf(props: Props): ActorRef = actorContext.actorOf(props)
}
case class CreateFromActorSystem(actorSystem: ActorSystem) extends ActorCreatable {
override val dispatcher: ExecutionContextExecutor = actorSystem.dispatcher
override def actorOf(props: Props): ActorRef = actorSystem.actorOf(props)
}
object implicits {
implicit def actorContext2ActorCreatable(actorContext: ActorContext): ActorCreatable = {
CreateFromActorContext(actorContext)
}
implicit def actorSystem2ActorCreatable(actorSystem: ActorSystem): ActorCreatable = {
CreateFromActorSystem(actorSystem)
}
}
}
开发者ID:astonbitecode,项目名称:scakka-zoo-cache,代码行数:33,代码来源:Internals.scala
示例16: BotRunner
//设置package包名称以及导入依赖的类
import akka.actor.{Props, ActorRef, ActorContext, ActorSystem}
import io.scalac.slack.api.{Start, BotInfo}
import io.scalac.slack.common.actors.SlackBotActor
import io.scalac.slack.websockets.WebSocket
import io.scalac.slack.{MessageEventBus, BotModules}
import io.scalac.slack.bots.system.{CommandsRecognizerBot, HelpBot}
import io.scalac.slack.common.Shutdownable
object BotRunner extends Shutdownable {
val system = ActorSystem("SlackBotSystem")
val eventBus = new MessageEventBus
val slackBot = system.actorOf(Props(classOf[SlackBotActor], new ExampleBotsBundle(), eventBus, this, None), "slack-bot")
var botInfo: Option[BotInfo] = None
def main(args: Array[String]) {
try {
slackBot ! Start
system.awaitTermination()
println("Shutdown successful...")
} catch {
case e: Exception =>
println("An unhandled exception occurred...", e)
system.shutdown()
system.awaitTermination()
}
}
sys.addShutdownHook(shutdown())
override def shutdown(): Unit = {
slackBot ! WebSocket.Release
system.shutdown()
system.awaitTermination()
}
class ExampleBotsBundle() extends BotModules {
override def registerModules(context: ActorContext, websocketClient: ActorRef) = {
context.actorOf(Props(classOf[CommandsRecognizerBot], eventBus), "commandProcessor")
context.actorOf(Props(classOf[HelpBot], eventBus), "helpBot")
context.actorOf(Props(classOf[MarketBot], eventBus), "MarketBot")
}
}
}
开发者ID:justin-yan,项目名称:marketbot,代码行数:46,代码来源:BotRunner.scala
示例17: FutureMonitor
//设置package包名称以及导入依赖的类
package com.vivint.ceph.lib
import akka.actor.{ ActorContext, Kill }
import akka.event.LoggingAdapter
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success,Failure}
object FutureMonitor {
def logSuccess[T](log: LoggingAdapter, f: Future[T], desc: String)(implicit ex: ExecutionContext): Future[T] = {
log.debug(s"${desc} : pulling state")
f.onComplete {
case Success(_) => log.debug("{} : success", desc)
case Failure(ex) =>
log.error(ex, "{}: failure", desc)
}
f
}
def crashSelfOnFailure(f: Future[Any], log: LoggingAdapter, description: String)(
implicit context: ActorContext): Unit = {
var failed = false
if (failed) // prevent infinite loops if all children actors get restarted
context.stop(context.self)
else
f.onFailure {
case ex =>
failed = true
context.self ! Kill
log.error(ex, s"Unexpected error for ${description}")
}(context.dispatcher)
}
}
开发者ID:vivint-smarthome,项目名称:ceph-on-mesos,代码行数:33,代码来源:FutureMonitor.scala
示例18: Ex01Manager
//设置package包名称以及导入依赖的类
package ekiaa.akka.otp.examples.ex01
import akka.actor.{Actor, Props, ActorContext, ActorRef}
import akka.pattern.ask
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.Future
import scala.concurrent.duration._
object Ex01Manager {
var instance: Option[ActorRef] = None
def start(context: ActorContext) = {
synchronized {
instance match {
case None =>
instance = Some(context.actorOf(props, "Ex01Manager"))
case _ =>
}
}
}
def props: Props = Props(new Ex01Manager)
private final case class StartWorker(name: String)
def startWorker(name: String): Future[Any] = {
instance match {
case Some(manager) =>
ask(manager, StartWorker(name))(30.seconds)
case None =>
import scala.concurrent.ExecutionContext.Implicits.global
Future { None }
}
}
}
class Ex01Manager extends Actor with StrictLogging {
override def preStart() = {
logger.debug(s"Ex01Manager started with path ${context.self.path}")
}
import Ex01Manager._
val receive: Receive = {
case StartWorker(name) =>
val replyTo = sender()
Ex01Worker.map.get(name) match {
case result @ Some(_) =>
replyTo ! result
case None =>
import scala.concurrent.ExecutionContext.Implicits.global
Ex01Worker.start(name).map(result => { replyTo ! result })
}
case _ =>
}
}
开发者ID:ekiaa,项目名称:akka-otp,代码行数:62,代码来源:Ex01Manager.scala
示例19: Ex01WorkerSup
//设置package包名称以及导入依赖的类
package ekiaa.akka.otp.examples.ex01
import akka.actor.{Actor, ActorContext, Props, ActorRef}
import akka.pattern.ask
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.duration._
import scala.concurrent.Future
object Ex01WorkerSup {
var instance: Option[ActorRef] = None
def start(context: ActorContext) = {
synchronized {
instance match {
case None =>
instance = Some(context.actorOf(props, "Ex01WorkerSup"))
case _ =>
}
}
}
def props: Props = Props(new Ex01WorkerSup)
private final case class StartChild(args: Any)
def startChild(args: Any): Future[Any] = {
instance match {
case Some(supervisor) =>
ask(supervisor, StartChild(args))(30.seconds)
case _ =>
import scala.concurrent.ExecutionContext.Implicits.global
Future { None }
}
}
}
class Ex01WorkerSup extends Actor with StrictLogging {
override def preStart() = {
logger.debug(s"Ex01WorkerSup started with path ${context.self.path}")
}
import Ex01WorkerSup._
val receive: Receive = {
case StartChild(args) =>
import scala.concurrent.ExecutionContext.Implicits.global
val replyTo = sender()
Ex01Worker.start(args, context).map(result => { replyTo ! result })
case _ =>
}
}
开发者ID:ekiaa,项目名称:akka-otp,代码行数:59,代码来源:Ex01WorkerSup.scala
示例20: Sink
//设置package包名称以及导入依赖的类
package pl.edu.agh.workflow.elements
import akka.actor.{Actor, ActorContext, ActorLogging, Props}
import pl.edu.agh.messages.{Get, GetGroupedOut, GetOut, ResultMessage}
class Sink[R] extends Actor with ActorLogging {
var out = List.empty[R]
def receive = {
case ResultMessage(data: R) =>
//log.info("CHILD")
out :+= data
case GetOut =>
val o = out
//This line is needed to feed workflow feature
//out = List.empty[R]
sender ! o
case GetGroupedOut(size: Int) =>
val o = out
//This line is needed to feed workflow feature
//out = List.empty[R]
sender ! o.grouped(size)
case Get =>
sender ! this
}
}
object Sink {
def apply[R](context: ActorContext) = context.actorOf(Sink.props)
def apply[R](name: String, context: ActorContext) = context.actorOf(Sink.props, name)
def props[R] = Props[Sink[R]]
}
开发者ID:kkrzys,项目名称:akkaflow,代码行数:35,代码来源:Sink.scala
注:本文中的akka.actor.ActorContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论