本文整理汇总了Scala中akka.actor.ActorRef类的典型用法代码示例。如果您正苦于以下问题:Scala ActorRef类的具体用法?Scala ActorRef怎么用?Scala ActorRef使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActorRef类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: OrderProcessor
//设置package包名称以及导入依赖的类
package com.example
import java.util.UUID
import scaldi.Injector
import akka.actor.{Actor, ActorRef, PoisonPill}
import scaldi.akka.AkkaInjectable
import scala.math.BigDecimal.RoundingMode
class OrderProcessor(implicit inj: Injector) extends Actor with AkkaInjectable {
import Messages._
val priceCalculator = injectActorRef [PriceCalculator]
def receive = idle
val idle: Receive = {
case orderInfo @ ProcessOrder(user: User, itemId: Long, netAmount: Int) =>
println(s"Processing order for user $user.")
priceCalculator ! CalculatePrice(netAmount)
context become workingHard(orderInfo, sender)
}
def workingHard(orderInfo: ProcessOrder, reportTo: ActorRef): Receive = {
case CancelProcessing =>
reportTo ! OrderProcessingFailed("Canceled..")
self ! PoisonPill
case GrossPriceCalculated(_, grossPrice) =>
println("Processing order.....")
reportTo ! OrderProcessed(UUID.randomUUID().toString, grossPrice)
self ! PoisonPill
}
}
class PriceCalculator extends Actor {
import Messages._
def receive = {
case CalculatePrice(netAmount) =>
val grossCent = (netAmount * BigDecimal("1.19")).setScale(0, RoundingMode.HALF_UP).toIntExact
sender ! GrossPriceCalculated(netAmount, grossCent)
}
}
开发者ID:shigemk2,项目名称:my-scaldi-akka-sample,代码行数:48,代码来源:Order.scala
示例2: RegistryServiceActor
//设置package包名称以及导入依赖的类
package com.pacbio.secondary.smrtlink.actors
import java.util.UUID
import akka.actor.{Props, ActorRef, Actor}
import com.pacbio.common.actors.{PacBioActor, ActorRefFactoryProvider}
import com.pacbio.common.dependency.Singleton
import com.pacbio.secondary.smrtlink.models.{RegistryResourceUpdate, RegistryProxyRequest, RegistryResourceCreate}
// TODO(smcclellan): Add scaladoc
object RegistryServiceActor {
case class GetResources(id: Option[String])
case class GetResource(uuid: UUID)
case class CreateResource(create: RegistryResourceCreate)
case class UpdateResource(uuid: UUID, update: RegistryResourceUpdate)
case class DeleteResource(uuid: UUID)
case class ProxyRequest(uuid: UUID, req: RegistryProxyRequest)
}
class RegistryServiceActor(registryDao: RegistryDao) extends PacBioActor {
import RegistryServiceActor._
def receive: Receive = {
case GetResources(id) => respondWith(registryDao.getResources(id))
case GetResource(uuid) => respondWith(registryDao.getResource(uuid))
case CreateResource(create) => respondWith(registryDao.createResource(create))
case UpdateResource(uuid, update) => respondWith(registryDao.updateResource(uuid, update))
case DeleteResource(uuid) => respondWith(registryDao.deleteResource(uuid))
case ProxyRequest(uuid, req) => respondWith(registryDao.proxyRequest(uuid, req))
}
}
trait RegistryServiceActorRefProvider {
this: RegistryDaoProvider with ActorRefFactoryProvider =>
val registryServiceActorRef: Singleton[ActorRef] =
Singleton(() => actorRefFactory().actorOf(Props(classOf[RegistryServiceActor], registryDao()), "RegistryServiceActor"))
}
trait RegistryServiceActorProvider {
this: RegistryDaoProvider =>
val registryServiceActor: Singleton[RegistryServiceActor] = Singleton(() => new RegistryServiceActor(registryDao()))
}
开发者ID:PacificBiosciences,项目名称:smrtflow,代码行数:46,代码来源:RegistryServiceActor.scala
示例3: GroupProcessorRegion
//设置package包名称以及导入依赖的类
package im.actor.server.group
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
object GroupProcessorRegion {
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
case GroupEnvelope(groupId, Some(dialogEnvelope), _, _) ?
(
groupId.toString,
dialogEnvelope
)
case env @ GroupEnvelope(groupId, _, command, query) ?
(
groupId.toString,
// payload
if (query.isDefined) {
env.getField(GroupEnvelope.descriptor.findFieldByNumber(query.number))
} else {
env.getField(GroupEnvelope.descriptor.findFieldByNumber(command.number))
}
)
}
private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
case env: GroupEnvelope ? (env.groupId % 100).toString // TODO: configurable
}
private val typeName = "GroupProcessor"
private def start(props: Props)(implicit system: ActorSystem): GroupProcessorRegion =
GroupProcessorRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
def start()(implicit system: ActorSystem): GroupProcessorRegion = start(GroupProcessor.props)
def startProxy()(implicit system: ActorSystem): GroupProcessorRegion =
GroupProcessorRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
}
case class GroupProcessorRegion(ref: ActorRef)
case class GroupViewRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:54,代码来源:GroupProcessorRegion.scala
示例4: AuctionSearch
//设置package包名称以及导入依赖的类
package reactive4.homework
import akka.actor.{Actor, ActorRef}
import akka.event.LoggingReceive
import reactive4.homework.AuctionSearch.{AddAuction, SearchAuction, SearchResult}
class AuctionSearch extends Actor {
var map:Map[String, ActorRef] = Map()
override def receive: Receive = LoggingReceive {
case msg: AddAuction =>
map = map + ((msg.title, msg.auction))
case msg: SearchAuction =>
val list: List[ActorRef] = map.filterKeys(_.contains(msg.query)).values.toList
sender() ! SearchResult(msg.query, list)
}
}
object AuctionSearch {
case class AddAuction(title: String, auction:ActorRef)
case class SearchAuction(query: String)
case class SearchResult(query: String, auctions: List[ActorRef])
}
开发者ID:Passarinho4,项目名称:reactive-lab4,代码行数:27,代码来源:AuctionSearch.scala
示例5: CodebaseAnalyzerAkkaApp
//设置package包名称以及导入依赖的类
package tutor
import akka.actor.{ActorRef, ActorSystem}
import tutor.CodebaseAnalyzeAggregatorActor.AnalyzeDirectory
import scala.io.StdIn
object CodebaseAnalyzerAkkaApp extends App {
val system = ActorSystem("CodebaseAnalyzer")
val codebaseAnalyzerControllerActor: ActorRef = system.actorOf(CodebaseAnalyzerControllerActor.props())
var shouldContinue = true
try {
while (shouldContinue) {
println("please input source file folder or :q to quit")
val input = StdIn.readLine()
if (input == ":q") {
shouldContinue = false
} else {
codebaseAnalyzerControllerActor ! AnalyzeDirectory(input)
}
}
} finally {
println("good bye!")
system.terminate()
}
}
开发者ID:notyy,项目名称:CodeAnalyzerTutorial,代码行数:29,代码来源:CodebaseAnalyzerAkkaApp.scala
示例6: KillActor
//设置package包名称以及导入依赖的类
package com.alvinalexander.bubbles
import akka.actor.Actor
import akka.actor.ActorRef
case class KillActor(actorRef: ActorRef)
case class SetInitialNumActors(num: Int)
class ActorManager(bubbles: Array[ActorRef]) extends Actor {
private var activeActorCount = bubbles.size
def receive = {
case KillActor(actorRef) => doKillActorAction(actorRef)
case GameOver => doGameOverAction
case _ =>
}
def doKillActorAction(actorRef: ActorRef) {
context.stop(actorRef)
activeActorCount -= 1
if (activeActorCount == 0) {
val mainFrameActor = getMainFrameActor
mainFrameActor ! ShowYouWinWindow
shutdownApplication
}
}
def doGameOverAction {
stopAllBubbles
val mainFrameActor = getMainFrameActor
mainFrameActor ! ShowGameOverWindow
// TODO don't shut the app down until the 'Game Over' overlay is shown
//shutdownApplication
}
// use StopMoving so the panel redraws properly at the end (vs. context.stop)
def stopAllBubbles {
for (b <- bubbles) {
b ! StopMoving
}
}
def getMainFrameActor = context.actorFor(Seq("..", MAIN_FRAME_ACTOR_NAME))
def shutdownApplication {
context.system.shutdown
Thread.sleep(3000)
System.exit(0)
}
}
开发者ID:arunhprasadbh,项目名称:AkkaKillTheCharactersGame,代码行数:54,代码来源:ActorManager.scala
示例7: TorrentStream
//设置package包名称以及导入依赖的类
package com.karasiq.torrentstream
import akka.actor.ActorRef
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.karasiq.bittorrent.format.Torrent
import com.karasiq.bittorrent.streams.TorrentSource
case class TorrentStream(size: Long, source: Source[ByteString, akka.NotUsed])
object TorrentStream {
def create(torrentManager: ActorRef, torrent: Torrent, fileName: String, ranges: Seq[(Long, Long)] = Nil): TorrentStream = {
val file = torrent.data.files.find(_.name == fileName).getOrElse(torrent.data.files.head)
val pieces = if (ranges.nonEmpty) {
TorrentFileOffset.absoluteOffsets(torrent, ranges.map(o ? TorrentFileOffset(file, o._1, o._2)))
} else {
TorrentFileOffset.file(torrent, file)
}
val source = Source.single(torrent)
.via(TorrentSource.dispatcher(torrentManager))
.flatMapConcat(dsp ? TorrentSource.pieces(dsp.actorRef, pieces.pieces.toVector))
.transform(() ? new TorrentStreamingStage(torrent.data.pieceLength, pieces.offsets))
TorrentStream(pieces.size, source)
}
}
开发者ID:Karasiq,项目名称:torrentstream,代码行数:27,代码来源:TorrentStream.scala
示例8: ReportServiceActor
//设置package包名称以及导入依赖的类
package com.github.unknownnpc.remotedebugtool.actor
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import com.github.unknownnpc.remotedebugtool.config.{AppConfig, RemoteDebugToolConfig}
import com.github.unknownnpc.remotedebugtool.domain._
import com.github.unknownnpc.remotedebugtool.exception.ReportException
import com.github.unknownnpc.remotedebugtool.message.{MainAppActorStop, ReportServicePayload, ReportServicePrint}
import scala.collection.mutable.ListBuffer
class ReportServiceActor(mainAppActorRef: ActorRef) extends Actor with ActorLogging {
self: AppConfig =>
val values = ListBuffer.empty[ReportRow]
override def receive = {
case ReportServicePayload(payload) =>
log.debug(s"Print service received incoming payload: [$payload]")
values += reportRowFrom(payload)
case ReportServicePrint =>
log.debug(s"Received print command")
log.info(systemConfig.reportFormatter.format(values.toList))
mainAppActorRef ! MainAppActorStop
}
private def reportRowFrom(payload: BreakpointPayload) = {
val testTarget = findServerById(payload.breakpoint.targetId)
JvmReportRow(testTarget.id,
testTarget.address,
testTarget.port,
payload.breakpoint.line,
payload.breakpoint.className,
payload.breakpointValue
)
}
private def findServerById(id: ID) = {
servers.find(_.id == id).getOrElse(
throw ReportException("Unable to match payload to server instance")
)
}
}
object ReportServiceActor {
def props(mainAppActorRef: ActorRef) =
Props(new ReportServiceActor(mainAppActorRef) with RemoteDebugToolConfig)
}
开发者ID:UnknownNPC,项目名称:remote-debug-tool,代码行数:54,代码来源:ReportServiceActor.scala
示例9: AddressTerminatedTopicBenchSpec
//设置package包名称以及导入依赖的类
package akka.event
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.Props
import akka.testkit._
object AddressTerminatedTopicBenchSpec {
class Subscriber(testActor: ActorRef) extends Actor {
AddressTerminatedTopic(context.system).subscribe(self)
testActor ! "started"
override def postStop(): Unit = {
AddressTerminatedTopic(context.system).unsubscribe(self)
}
def receive = Actor.emptyBehavior
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class AddressTerminatedTopicBenchSpec extends AkkaSpec("akka.loglevel=INFO") {
import AddressTerminatedTopicBenchSpec._
"Subscribe and unsubscribe of AddressTerminated" must {
"be quick" in {
val sys = ActorSystem(system.name + "2", system.settings.config)
try {
val num = 20000
val t1 = System.nanoTime()
val p = Props(classOf[Subscriber], testActor)
val subscribers = Vector.fill(num)(sys.actorOf(p))
receiveN(num, 10.seconds)
log.info("Starting {} actors took {} ms", num, (System.nanoTime() - t1).nanos.toMillis)
val t2 = System.nanoTime()
shutdown(sys, 10.seconds, verifySystemShutdown = true)
log.info("Stopping {} actors took {} ms", num, (System.nanoTime() - t2).nanos.toMillis)
} finally {
if (!sys.isTerminated) shutdown(sys)
}
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:52,代码来源:AddressTerminatedTopicBenchSpec.scala
示例10: TestSetup
//设置package包名称以及导入依赖的类
package akka.io
import scala.annotation.tailrec
import scala.collection.immutable
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.actor.ActorRef
import akka.io.Inet.SocketOption
import akka.testkit.SocketUtil._
import Tcp._
trait TcpIntegrationSpecSupport { _: AkkaSpec ?
class TestSetup(shouldBindServer: Boolean = true) {
val bindHandler = TestProbe()
val endpoint = temporaryServerAddress()
if (shouldBindServer) bindServer()
def bindServer(): Unit = {
val bindCommander = TestProbe()
bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
bindCommander.expectMsg(Bound(endpoint))
}
def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
val connectCommander = TestProbe()
connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions))
val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected]
val clientHandler = TestProbe()
connectCommander.sender() ! Register(clientHandler.ref)
val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected]
val serverHandler = TestProbe()
bindHandler.sender() ! Register(serverHandler.ref)
(clientHandler, connectCommander.sender(), serverHandler, bindHandler.sender())
}
@tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit =
if (remaining > 0) {
val recv = handler.expectMsgType[Received]
expectReceivedData(handler, remaining - recv.data.size)
}
def connectOptions: immutable.Traversable[SocketOption] = Nil
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:TcpIntegrationSpecSupport.scala
示例11: LookupActor
//设置package包名称以及导入依赖的类
package sample.remote.calculator
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.ReceiveTimeout
import akka.actor.Terminated
class LookupActor(path: String) extends Actor {
sendIdentifyRequest()
def sendIdentifyRequest(): Unit = {
context.actorSelection(path) ! Identify(path)
import context.dispatcher
context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
}
def receive = identifying
def identifying: Actor.Receive = {
case ActorIdentity(`path`, Some(actor)) =>
context.watch(actor)
context.become(active(actor))
case ActorIdentity(`path`, None) => println(s"Remote actor not available: $path")
case ReceiveTimeout => sendIdentifyRequest()
case _ => println("Not ready yet")
}
def active(actor: ActorRef): Actor.Receive = {
case op: MathOp => actor ! op
case result: MathResult => result match {
case AddResult(n1, n2, r) =>
printf("Add result: %d + %d = %d\n", n1, n2, r)
case SubtractResult(n1, n2, r) =>
printf("Sub result: %d - %d = %d\n", n1, n2, r)
}
case Terminated(`actor`) =>
println("Calculator terminated")
sendIdentifyRequest()
context.become(identifying)
case ReceiveTimeout =>
// ignore
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:49,代码来源:LookupActor.scala
示例12: CustomRouteExample
//设置package包名称以及导入依赖的类
package sample.camel
import org.apache.camel.Exchange
import org.apache.camel.Processor
import org.apache.camel.builder.RouteBuilder
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.camel.CamelExtension
import akka.camel.CamelMessage
import akka.camel.Consumer
import akka.camel.Producer
object CustomRouteExample {
def main(args: Array[String]): Unit = {
val system = ActorSystem("some-system")
val producer = system.actorOf(Props[RouteProducer])
val mediator = system.actorOf(Props(classOf[RouteTransformer], producer))
val consumer = system.actorOf(Props(classOf[RouteConsumer], mediator))
CamelExtension(system).context.addRoutes(new CustomRouteBuilder)
}
class RouteConsumer(transformer: ActorRef) extends Actor with Consumer {
def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"
def receive = {
// Forward a string representation of the message body to transformer
case msg: CamelMessage => transformer.forward(msg.withBodyAs[String])
}
}
class RouteTransformer(producer: ActorRef) extends Actor {
def receive = {
// example: transform message body "foo" to "- foo -" and forward result
// to producer
case msg: CamelMessage =>
producer.forward(msg.mapBody((body: String) => "- %s -" format body))
}
}
class RouteProducer extends Actor with Producer {
def endpointUri = "direct:welcome"
}
class CustomRouteBuilder extends RouteBuilder {
def configure {
from("direct:welcome").process(new Processor() {
def process(exchange: Exchange) {
// Create a 'welcome' message from the input message
exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
}
})
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:59,代码来源:CustomRouteExample.scala
示例13: RemoteDeploymentWatcher
//设置package包名称以及导入依赖的类
package akka.remote
import akka.actor.InternalActorRef
import akka.actor.Terminated
import akka.actor.Actor
import akka.actor.ActorRef
import akka.dispatch.sysmsg.DeathWatchNotification
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
private[akka] class RemoteDeploymentWatcher extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import RemoteDeploymentWatcher._
var supervisors = Map.empty[ActorRef, InternalActorRef]
def receive = {
case WatchRemote(a, supervisor: InternalActorRef) ?
supervisors += (a -> supervisor)
context.watch(a)
case t @ Terminated(a) if supervisors isDefinedAt a ?
// send extra DeathWatchNotification to the supervisor so that it will remove the child
supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = t.existenceConfirmed,
addressTerminated = t.addressTerminated))
supervisors -= a
case _: Terminated ?
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:29,代码来源:RemoteDeploymentWatcher.scala
示例14: ProtobufSerializer
//设置package包名称以及导入依赖的类
package akka.remote.serialization
import akka.actor.{ ExtendedActorSystem, ActorRef }
import akka.remote.WireFormats.ActorRefData
import akka.serialization.{ Serializer, Serialization }
import com.google.protobuf.Message
object ProtobufSerializer {
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def includeManifest: Boolean = true
def identifier = 2
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Message ? m.toByteArray
case _ ? throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
clazz match {
case None ? throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
case Some(c) ? c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:27,代码来源:ProtobufSerializer.scala
示例15: PipeableFuture
//设置package包名称以及导入依赖的类
package akka.pattern
import language.implicitConversions
import scala.concurrent.{ Future, ExecutionContext }
import scala.util.{ Failure, Success }
import akka.actor.{ Status, ActorRef, Actor }
import akka.actor.ActorSelection
trait PipeToSupport {
final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) {
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
future onComplete {
case Success(r) ? recipient ! r
case Failure(f) ? recipient ! Status.Failure(f)
}
future
}
def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
future onComplete {
case Success(r) ? recipient ! r
case Failure(f) ? recipient ! Status.Failure(f)
}
future
}
def to(recipient: ActorRef): PipeableFuture[T] = to(recipient, Actor.noSender)
def to(recipient: ActorRef, sender: ActorRef): PipeableFuture[T] = {
pipeTo(recipient)(sender)
this
}
def to(recipient: ActorSelection): PipeableFuture[T] = to(recipient, Actor.noSender)
def to(recipient: ActorSelection, sender: ActorRef): PipeableFuture[T] = {
pipeToSelection(recipient)(sender)
this
}
}
implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T] = new PipeableFuture(future)
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:41,代码来源:PipeToSupport.scala
示例16: AddressTerminatedTopic
//设置package包名称以及导入依赖的类
package akka.event
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.AddressTerminated
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
private[akka] final class AddressTerminatedTopic extends Extension {
private val subscribers = new AtomicReference[Set[ActorRef]](Set.empty[ActorRef])
@tailrec def subscribe(subscriber: ActorRef): Unit = {
val current = subscribers.get
if (!subscribers.compareAndSet(current, current + subscriber))
subscribe(subscriber) // retry
}
@tailrec def unsubscribe(subscriber: ActorRef): Unit = {
val current = subscribers.get
if (!subscribers.compareAndSet(current, current - subscriber))
unsubscribe(subscriber) // retry
}
def publish(msg: AddressTerminated): Unit = {
subscribers.get foreach { _.tell(msg, ActorRef.noSender) }
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:35,代码来源:AddressTerminatedTopic.scala
示例17: TcpIncomingConnection
//设置package包名称以及导入依赖的类
package akka.io
import java.nio.channels.SocketChannel
import scala.collection.immutable
import akka.actor.ActorRef
import akka.io.Inet.SocketOption
private[io] class TcpIncomingConnection(_tcp: TcpExt,
_channel: SocketChannel,
registry: ChannelRegistry,
bindHandler: ActorRef,
options: immutable.Traversable[SocketOption],
readThrottling: Boolean)
extends TcpConnection(_tcp, _channel, readThrottling) {
signDeathPact(bindHandler)
registry.register(channel, initialOps = 0)
def receive = {
case registration: ChannelRegistration ? completeConnect(registration, bindHandler, options)
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:25,代码来源:TcpIncomingConnection.scala
示例18: SnapshotDirectoryFailureSpec
//设置package包名称以及导入依赖的类
package akka.persistence
import akka.testkit.{ ImplicitSender, EventFilter, TestEvent, AkkaSpec }
import java.io.{ IOException, File }
import akka.actor.{ ActorInitializationException, Props, ActorRef }
object SnapshotDirectoryFailureSpec {
val inUseSnapshotPath = "target/inUseSnapshotPath"
class TestProcessor(name: String, probe: ActorRef) extends Processor {
override def persistenceId: String = name
override def preStart(): Unit = ()
def receive = {
case s: String ? saveSnapshot(s)
case SaveSnapshotSuccess(md) ? probe ! md.sequenceNr
case other ? probe ! other
}
}
}
class SnapshotDirectoryFailureSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotDirectoryFailureSpec", extraConfig = Some(
s"""
|akka.persistence.snapshot-store.local.dir = "${SnapshotDirectoryFailureSpec.inUseSnapshotPath}"
""".stripMargin))) with ImplicitSender {
import SnapshotDirectoryFailureSpec._
val file = new File(inUseSnapshotPath)
override protected def atStartup() {
if (!file.createNewFile()) throw new IOException(s"Failed to create test file [${file.getCanonicalFile}]")
}
override protected def afterTermination() {
if (!file.delete()) throw new IOException(s"Failed to delete test file [${file.getCanonicalFile}]")
}
"A local snapshot store configured with an failing directory name " must {
"throw an exception at startup" in {
EventFilter[ActorInitializationException](occurrences = 1).intercept {
val processor = system.actorOf(Props(classOf[TestProcessor], "SnapshotDirectoryFailureSpec-1", testActor))
processor ! "blahonga"
}
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:SnapshotDirectoryFailureSpec.scala
示例19: PublishSubscribe
//设置package包名称以及导入依赖的类
package docs.camel
object PublishSubscribe {
//#PubSub
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
import akka.camel.{ Producer, CamelMessage, Consumer }
class Subscriber(name: String, uri: String) extends Actor with Consumer {
def endpointUri = uri
def receive = {
case msg: CamelMessage => println("%s received: %s" format (name, msg.body))
}
}
class Publisher(name: String, uri: String) extends Actor with Producer {
def endpointUri = uri
// one-way communication with JMS
override def oneway = true
}
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
def endpointUri = uri
def receive = {
case msg: CamelMessage => {
publisher ! msg.bodyAs[String]
sender() ! ("message published")
}
}
}
// Add below to a Boot class
// Setup publish/subscribe example
val system = ActorSystem("some-system")
val jmsUri = "jms:topic:test"
val jmsSubscriber1 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-1", jmsUri))
val jmsSubscriber2 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-2", jmsUri))
val jmsPublisher = system.actorOf(Props(classOf[Publisher], "jms-publisher", jmsUri))
val jmsPublisherBridge = system.actorOf(Props(classOf[PublisherBridge], "jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))
//#PubSub
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:45,代码来源:PublishSubscribe.scala
示例20: TwitterService
//设置package包名称以及导入依赖的类
package com.inspiringsolutions.tweet.services
import javax.inject.{Inject, Singleton}
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.inspiringsolutions.tweet.actors.CompleteStream
import com.inspiringsolutions.tweet.models.{LimitNotice, Tweet}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods._
import org.slf4j.LoggerFactory
import play.api.Play
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
@Singleton
class TwitterService @Inject() (twitterStreamService: TwitterStreamProducerService) {
private val log = LoggerFactory.getLogger(getClass)
implicit lazy val actorSystem: ActorSystem = Play.unsafeApplication.injector.instanceOf[ActorSystem]
implicit val materializer = ActorMaterializer()
implicit val formats = DefaultFormats
def processStreamToActorRef(streamConsumerRef: ActorRef, hashTag: String) {
val streamFuture = twitterStreamService.produceStream(hashTag)
streamFuture.onComplete {
case Failure(ex) => log.error("Error while processing stream", ex)
case Success(stream) =>
stream.scan("")((acc, curr) => {
if (acc.contains("\r\n"))
curr.utf8String
else
acc + curr.utf8String
})
.filter(_.contains("\r\n")).filterNot(_.trim.isEmpty)
.map { tryParse }
.runWith(Sink.actorRef(streamConsumerRef, CompleteStream))
}
}
private def tryParse(json: String) = {
val parsed = parse(json)
val tweetAttempt = Try(parsed.extract[Tweet])
if (tweetAttempt.isSuccess) {
tweetAttempt
} else {
Try(parsed.extract[LimitNotice])
}
}
}
开发者ID:gitter-badger,项目名称:inspiring-tooling,代码行数:56,代码来源:TwitterService.scala
注:本文中的akka.actor.ActorRef类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论