• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ActorRef类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.ActorRef的典型用法代码示例。如果您正苦于以下问题:Scala ActorRef类的具体用法?Scala ActorRef怎么用?Scala ActorRef使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ActorRef类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: OrderProcessor

//设置package包名称以及导入依赖的类
package com.example

import java.util.UUID

import scaldi.Injector
import akka.actor.{Actor, ActorRef, PoisonPill}
import scaldi.akka.AkkaInjectable

import scala.math.BigDecimal.RoundingMode

class OrderProcessor(implicit inj: Injector) extends Actor with AkkaInjectable {
  import Messages._

  val priceCalculator = injectActorRef [PriceCalculator]

  def receive = idle

  val idle: Receive = {
    case orderInfo @ ProcessOrder(user: User, itemId: Long, netAmount: Int) =>
      println(s"Processing order for user $user.")

      priceCalculator ! CalculatePrice(netAmount)

      context become workingHard(orderInfo, sender)
  }

  def workingHard(orderInfo: ProcessOrder, reportTo: ActorRef): Receive = {
    case CancelProcessing =>
      reportTo ! OrderProcessingFailed("Canceled..")
      self ! PoisonPill
    case GrossPriceCalculated(_, grossPrice) =>
      println("Processing order.....")

      reportTo ! OrderProcessed(UUID.randomUUID().toString, grossPrice)
      self ! PoisonPill
  }
}

class PriceCalculator extends Actor {
  import Messages._

  def receive = {
    case CalculatePrice(netAmount) =>
      val grossCent = (netAmount * BigDecimal("1.19")).setScale(0, RoundingMode.HALF_UP).toIntExact
      sender ! GrossPriceCalculated(netAmount, grossCent)
  }
} 
开发者ID:shigemk2,项目名称:my-scaldi-akka-sample,代码行数:48,代码来源:Order.scala


示例2: RegistryServiceActor

//设置package包名称以及导入依赖的类
package com.pacbio.secondary.smrtlink.actors

import java.util.UUID

import akka.actor.{Props, ActorRef, Actor}
import com.pacbio.common.actors.{PacBioActor, ActorRefFactoryProvider}
import com.pacbio.common.dependency.Singleton
import com.pacbio.secondary.smrtlink.models.{RegistryResourceUpdate, RegistryProxyRequest, RegistryResourceCreate}

// TODO(smcclellan): Add scaladoc

object RegistryServiceActor {
  case class GetResources(id: Option[String])
  case class GetResource(uuid: UUID)
  case class CreateResource(create: RegistryResourceCreate)
  case class UpdateResource(uuid: UUID, update: RegistryResourceUpdate)
  case class DeleteResource(uuid: UUID)
  case class ProxyRequest(uuid: UUID, req: RegistryProxyRequest)
}

class RegistryServiceActor(registryDao: RegistryDao) extends PacBioActor {
  import RegistryServiceActor._

  def receive: Receive = {
    case GetResources(id)             => respondWith(registryDao.getResources(id))
    case GetResource(uuid)            => respondWith(registryDao.getResource(uuid))
    case CreateResource(create)       => respondWith(registryDao.createResource(create))
    case UpdateResource(uuid, update) => respondWith(registryDao.updateResource(uuid, update))
    case DeleteResource(uuid)         => respondWith(registryDao.deleteResource(uuid))
    case ProxyRequest(uuid, req)      => respondWith(registryDao.proxyRequest(uuid, req))
  }
}

trait RegistryServiceActorRefProvider {
  this: RegistryDaoProvider with ActorRefFactoryProvider =>

  val registryServiceActorRef: Singleton[ActorRef] =
    Singleton(() => actorRefFactory().actorOf(Props(classOf[RegistryServiceActor], registryDao()), "RegistryServiceActor"))
}

trait RegistryServiceActorProvider {
  this: RegistryDaoProvider =>

  val registryServiceActor: Singleton[RegistryServiceActor] = Singleton(() => new RegistryServiceActor(registryDao()))
} 
开发者ID:PacificBiosciences,项目名称:smrtflow,代码行数:46,代码来源:RegistryServiceActor.scala


示例3: GroupProcessorRegion

//设置package包名称以及导入依赖的类
package im.actor.server.group

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }

object GroupProcessorRegion {
  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    case GroupEnvelope(groupId, Some(dialogEnvelope), _, _) ?
      (
        groupId.toString,
        dialogEnvelope
      )
    case env @ GroupEnvelope(groupId, _, command, query) ?
      (
        groupId.toString,
        // payload
        if (query.isDefined) {
          env.getField(GroupEnvelope.descriptor.findFieldByNumber(query.number))
        } else {
          env.getField(GroupEnvelope.descriptor.findFieldByNumber(command.number))
        }
      )
  }

  private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
    case env: GroupEnvelope ? (env.groupId % 100).toString // TODO: configurable
  }

  private val typeName = "GroupProcessor"

  private def start(props: Props)(implicit system: ActorSystem): GroupProcessorRegion =
    GroupProcessorRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))

  def start()(implicit system: ActorSystem): GroupProcessorRegion = start(GroupProcessor.props)

  def startProxy()(implicit system: ActorSystem): GroupProcessorRegion =
    GroupProcessorRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))
}

case class GroupProcessorRegion(ref: ActorRef)

case class GroupViewRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:54,代码来源:GroupProcessorRegion.scala


示例4: AuctionSearch

//设置package包名称以及导入依赖的类
package reactive4.homework

import akka.actor.{Actor, ActorRef}
import akka.event.LoggingReceive
import reactive4.homework.AuctionSearch.{AddAuction, SearchAuction, SearchResult}

class AuctionSearch extends Actor {

  var map:Map[String, ActorRef] = Map()

  override def receive: Receive = LoggingReceive {
    case msg: AddAuction =>
      map = map + ((msg.title, msg.auction))
    case msg: SearchAuction =>
      val list: List[ActorRef] = map.filterKeys(_.contains(msg.query)).values.toList
      sender() ! SearchResult(msg.query, list)
  }
}

object AuctionSearch {

  case class AddAuction(title: String, auction:ActorRef)
  case class SearchAuction(query: String)
  case class SearchResult(query: String, auctions: List[ActorRef])

} 
开发者ID:Passarinho4,项目名称:reactive-lab4,代码行数:27,代码来源:AuctionSearch.scala


示例5: CodebaseAnalyzerAkkaApp

//设置package包名称以及导入依赖的类
package tutor

import akka.actor.{ActorRef, ActorSystem}
import tutor.CodebaseAnalyzeAggregatorActor.AnalyzeDirectory

import scala.io.StdIn

object CodebaseAnalyzerAkkaApp extends App {

  val system = ActorSystem("CodebaseAnalyzer")
  val codebaseAnalyzerControllerActor: ActorRef = system.actorOf(CodebaseAnalyzerControllerActor.props())

  var shouldContinue = true
  try {
    while (shouldContinue) {
      println("please input source file folder or :q to quit")
      val input = StdIn.readLine()
      if (input == ":q") {
        shouldContinue = false
      } else {
        codebaseAnalyzerControllerActor ! AnalyzeDirectory(input)
      }
    }
  } finally {
    println("good bye!")
    system.terminate()
  }
} 
开发者ID:notyy,项目名称:CodeAnalyzerTutorial,代码行数:29,代码来源:CodebaseAnalyzerAkkaApp.scala


示例6: KillActor

//设置package包名称以及导入依赖的类
package com.alvinalexander.bubbles

import akka.actor.Actor
import akka.actor.ActorRef

case class KillActor(actorRef: ActorRef)
case class SetInitialNumActors(num: Int)


class ActorManager(bubbles: Array[ActorRef]) extends Actor {
  
  private var activeActorCount = bubbles.size

  def receive = {
    case KillActor(actorRef) => doKillActorAction(actorRef)
    case GameOver => doGameOverAction
    case _ =>
  }
  
  def doKillActorAction(actorRef: ActorRef) {
    context.stop(actorRef)
    activeActorCount -= 1
    if (activeActorCount == 0) {
      val mainFrameActor = getMainFrameActor
      mainFrameActor ! ShowYouWinWindow
      shutdownApplication
    }
  }
  
  def doGameOverAction {
    stopAllBubbles
    val mainFrameActor = getMainFrameActor
    mainFrameActor ! ShowGameOverWindow
    // TODO don't shut the app down until the 'Game Over' overlay is shown
    //shutdownApplication
  }
  
  // use StopMoving so the panel redraws properly at the end (vs. context.stop) 
  def stopAllBubbles {
    for (b <- bubbles) {
      b ! StopMoving
    }
  }
  
  def getMainFrameActor = context.actorFor(Seq("..", MAIN_FRAME_ACTOR_NAME))

  def shutdownApplication {
    context.system.shutdown
    Thread.sleep(3000)
    System.exit(0)
  }

} 
开发者ID:arunhprasadbh,项目名称:AkkaKillTheCharactersGame,代码行数:54,代码来源:ActorManager.scala


示例7: TorrentStream

//设置package包名称以及导入依赖的类
package com.karasiq.torrentstream

import akka.actor.ActorRef
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.karasiq.bittorrent.format.Torrent
import com.karasiq.bittorrent.streams.TorrentSource

case class TorrentStream(size: Long, source: Source[ByteString, akka.NotUsed])

object TorrentStream {
  def create(torrentManager: ActorRef, torrent: Torrent, fileName: String, ranges: Seq[(Long, Long)] = Nil): TorrentStream = {
    val file = torrent.data.files.find(_.name == fileName).getOrElse(torrent.data.files.head)
    val pieces = if (ranges.nonEmpty) {
      TorrentFileOffset.absoluteOffsets(torrent, ranges.map(o ? TorrentFileOffset(file, o._1, o._2)))
    } else {
      TorrentFileOffset.file(torrent, file)
    }
    val source = Source.single(torrent)
      .via(TorrentSource.dispatcher(torrentManager))
      .flatMapConcat(dsp ? TorrentSource.pieces(dsp.actorRef, pieces.pieces.toVector))
      .transform(() ? new TorrentStreamingStage(torrent.data.pieceLength, pieces.offsets))

    TorrentStream(pieces.size, source)
  }
} 
开发者ID:Karasiq,项目名称:torrentstream,代码行数:27,代码来源:TorrentStream.scala


示例8: ReportServiceActor

//设置package包名称以及导入依赖的类
package com.github.unknownnpc.remotedebugtool.actor

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import com.github.unknownnpc.remotedebugtool.config.{AppConfig, RemoteDebugToolConfig}
import com.github.unknownnpc.remotedebugtool.domain._
import com.github.unknownnpc.remotedebugtool.exception.ReportException
import com.github.unknownnpc.remotedebugtool.message.{MainAppActorStop, ReportServicePayload, ReportServicePrint}

import scala.collection.mutable.ListBuffer

class ReportServiceActor(mainAppActorRef: ActorRef) extends Actor with ActorLogging {

  self: AppConfig =>

  val values = ListBuffer.empty[ReportRow]

  override def receive = {

    case ReportServicePayload(payload) =>
      log.debug(s"Print service received incoming payload: [$payload]")
      values += reportRowFrom(payload)

    case ReportServicePrint =>
      log.debug(s"Received print command")
      log.info(systemConfig.reportFormatter.format(values.toList))
      mainAppActorRef ! MainAppActorStop

  }


  private def reportRowFrom(payload: BreakpointPayload) = {
    val testTarget = findServerById(payload.breakpoint.targetId)
    JvmReportRow(testTarget.id,
      testTarget.address,
      testTarget.port,
      payload.breakpoint.line,
      payload.breakpoint.className,
      payload.breakpointValue
    )
  }

  private def findServerById(id: ID) = {
    servers.find(_.id == id).getOrElse(
      throw ReportException("Unable to match payload to server instance")
    )
  }

}

object ReportServiceActor {
  def props(mainAppActorRef: ActorRef) =
    Props(new ReportServiceActor(mainAppActorRef) with RemoteDebugToolConfig)
} 
开发者ID:UnknownNPC,项目名称:remote-debug-tool,代码行数:54,代码来源:ReportServiceActor.scala


示例9: AddressTerminatedTopicBenchSpec

//设置package包名称以及导入依赖的类
package akka.event

import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.Props
import akka.testkit._

object AddressTerminatedTopicBenchSpec {

  class Subscriber(testActor: ActorRef) extends Actor {
    AddressTerminatedTopic(context.system).subscribe(self)
    testActor ! "started"

    override def postStop(): Unit = {
      AddressTerminatedTopic(context.system).unsubscribe(self)
    }

    def receive = Actor.emptyBehavior
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class AddressTerminatedTopicBenchSpec extends AkkaSpec("akka.loglevel=INFO") {
  import AddressTerminatedTopicBenchSpec._

  "Subscribe and unsubscribe of AddressTerminated" must {

    "be quick" in {
      val sys = ActorSystem(system.name + "2", system.settings.config)
      try {
        val num = 20000

        val t1 = System.nanoTime()
        val p = Props(classOf[Subscriber], testActor)
        val subscribers = Vector.fill(num)(sys.actorOf(p))
        receiveN(num, 10.seconds)
        log.info("Starting {} actors took {} ms", num, (System.nanoTime() - t1).nanos.toMillis)

        val t2 = System.nanoTime()
        shutdown(sys, 10.seconds, verifySystemShutdown = true)
        log.info("Stopping {} actors took {} ms", num, (System.nanoTime() - t2).nanos.toMillis)
      } finally {
        if (!sys.isTerminated) shutdown(sys)
      }
    }

  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:52,代码来源:AddressTerminatedTopicBenchSpec.scala


示例10: TestSetup

//设置package包名称以及导入依赖的类
package akka.io

import scala.annotation.tailrec
import scala.collection.immutable
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.actor.ActorRef
import akka.io.Inet.SocketOption
import akka.testkit.SocketUtil._
import Tcp._

trait TcpIntegrationSpecSupport { _: AkkaSpec ?

  class TestSetup(shouldBindServer: Boolean = true) {
    val bindHandler = TestProbe()
    val endpoint = temporaryServerAddress()

    if (shouldBindServer) bindServer()

    def bindServer(): Unit = {
      val bindCommander = TestProbe()
      bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
      bindCommander.expectMsg(Bound(endpoint))
    }

    def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
      val connectCommander = TestProbe()
      connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions))
      val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected]
      val clientHandler = TestProbe()
      connectCommander.sender() ! Register(clientHandler.ref)

      val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected]
      val serverHandler = TestProbe()
      bindHandler.sender() ! Register(serverHandler.ref)

      (clientHandler, connectCommander.sender(), serverHandler, bindHandler.sender())
    }

    @tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit =
      if (remaining > 0) {
        val recv = handler.expectMsgType[Received]
        expectReceivedData(handler, remaining - recv.data.size)
      }

    
    def connectOptions: immutable.Traversable[SocketOption] = Nil
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:TcpIntegrationSpecSupport.scala


示例11: LookupActor

//设置package包名称以及导入依赖的类
package sample.remote.calculator

import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.ReceiveTimeout
import akka.actor.Terminated

class LookupActor(path: String) extends Actor {

  sendIdentifyRequest()

  def sendIdentifyRequest(): Unit = {
    context.actorSelection(path) ! Identify(path)
    import context.dispatcher
    context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
  }

  def receive = identifying

  def identifying: Actor.Receive = {
    case ActorIdentity(`path`, Some(actor)) =>
      context.watch(actor)
      context.become(active(actor))
    case ActorIdentity(`path`, None) => println(s"Remote actor not available: $path")
    case ReceiveTimeout              => sendIdentifyRequest()
    case _                           => println("Not ready yet")
  }

  def active(actor: ActorRef): Actor.Receive = {
    case op: MathOp => actor ! op
    case result: MathResult => result match {
      case AddResult(n1, n2, r) =>
        printf("Add result: %d + %d = %d\n", n1, n2, r)
      case SubtractResult(n1, n2, r) =>
        printf("Sub result: %d - %d = %d\n", n1, n2, r)
    }
    case Terminated(`actor`) =>
      println("Calculator terminated")
      sendIdentifyRequest()
      context.become(identifying)
    case ReceiveTimeout =>
    // ignore

  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:49,代码来源:LookupActor.scala


示例12: CustomRouteExample

//设置package包名称以及导入依赖的类
package sample.camel

import org.apache.camel.Exchange
import org.apache.camel.Processor
import org.apache.camel.builder.RouteBuilder
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.camel.CamelExtension
import akka.camel.CamelMessage
import akka.camel.Consumer
import akka.camel.Producer

object CustomRouteExample {

  def main(args: Array[String]): Unit = {
    val system = ActorSystem("some-system")
    val producer = system.actorOf(Props[RouteProducer])
    val mediator = system.actorOf(Props(classOf[RouteTransformer], producer))
    val consumer = system.actorOf(Props(classOf[RouteConsumer], mediator))
    CamelExtension(system).context.addRoutes(new CustomRouteBuilder)
  }

  class RouteConsumer(transformer: ActorRef) extends Actor with Consumer {
    def endpointUri = "jetty:http://0.0.0.0:8877/camel/welcome"

    def receive = {
      // Forward a string representation of the message body to transformer
      case msg: CamelMessage => transformer.forward(msg.withBodyAs[String])
    }
  }

  class RouteTransformer(producer: ActorRef) extends Actor {
    def receive = {
      // example: transform message body "foo" to "- foo -" and forward result
      // to producer
      case msg: CamelMessage =>
        producer.forward(msg.mapBody((body: String) => "- %s -" format body))
    }
  }

  class RouteProducer extends Actor with Producer {
    def endpointUri = "direct:welcome"
  }

  class CustomRouteBuilder extends RouteBuilder {
    def configure {
      from("direct:welcome").process(new Processor() {
        def process(exchange: Exchange) {
          // Create a 'welcome' message from the input message
          exchange.getOut.setBody("Welcome %s" format exchange.getIn.getBody)
        }
      })
    }
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:59,代码来源:CustomRouteExample.scala


示例13: RemoteDeploymentWatcher

//设置package包名称以及导入依赖的类
package akka.remote

import akka.actor.InternalActorRef
import akka.actor.Terminated
import akka.actor.Actor
import akka.actor.ActorRef
import akka.dispatch.sysmsg.DeathWatchNotification
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }


private[akka] class RemoteDeploymentWatcher extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
  import RemoteDeploymentWatcher._
  var supervisors = Map.empty[ActorRef, InternalActorRef]

  def receive = {
    case WatchRemote(a, supervisor: InternalActorRef) ?
      supervisors += (a -> supervisor)
      context.watch(a)

    case t @ Terminated(a) if supervisors isDefinedAt a ?
      // send extra DeathWatchNotification to the supervisor so that it will remove the child
      supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = t.existenceConfirmed,
        addressTerminated = t.addressTerminated))
      supervisors -= a

    case _: Terminated ?
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:29,代码来源:RemoteDeploymentWatcher.scala


示例14: ProtobufSerializer

//设置package包名称以及导入依赖的类
package akka.remote.serialization

import akka.actor.{ ExtendedActorSystem, ActorRef }
import akka.remote.WireFormats.ActorRefData
import akka.serialization.{ Serializer, Serialization }
import com.google.protobuf.Message

object ProtobufSerializer {

  
class ProtobufSerializer extends Serializer {
  val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
  def includeManifest: Boolean = true
  def identifier = 2

  def toBinary(obj: AnyRef): Array[Byte] = obj match {
    case m: Message ? m.toByteArray
    case _          ? throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
  }

  def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
    clazz match {
      case None    ? throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
      case Some(c) ? c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
    }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:27,代码来源:ProtobufSerializer.scala


示例15: PipeableFuture

//设置package包名称以及导入依赖的类
package akka.pattern

import language.implicitConversions
import scala.concurrent.{ Future, ExecutionContext }
import scala.util.{ Failure, Success }
import akka.actor.{ Status, ActorRef, Actor }
import akka.actor.ActorSelection

trait PipeToSupport {

  final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) {
    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
      future onComplete {
        case Success(r) ? recipient ! r
        case Failure(f) ? recipient ! Status.Failure(f)
      }
      future
    }
    def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
      future onComplete {
        case Success(r) ? recipient ! r
        case Failure(f) ? recipient ! Status.Failure(f)
      }
      future
    }
    def to(recipient: ActorRef): PipeableFuture[T] = to(recipient, Actor.noSender)
    def to(recipient: ActorRef, sender: ActorRef): PipeableFuture[T] = {
      pipeTo(recipient)(sender)
      this
    }
    def to(recipient: ActorSelection): PipeableFuture[T] = to(recipient, Actor.noSender)
    def to(recipient: ActorSelection, sender: ActorRef): PipeableFuture[T] = {
      pipeToSelection(recipient)(sender)
      this
    }
  }

  
  implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T] = new PipeableFuture(future)
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:41,代码来源:PipeToSupport.scala


示例16: AddressTerminatedTopic

//设置package包名称以及导入依赖的类
package akka.event

import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.AddressTerminated
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider


private[akka] final class AddressTerminatedTopic extends Extension {

  private val subscribers = new AtomicReference[Set[ActorRef]](Set.empty[ActorRef])

  @tailrec def subscribe(subscriber: ActorRef): Unit = {
    val current = subscribers.get
    if (!subscribers.compareAndSet(current, current + subscriber))
      subscribe(subscriber) // retry
  }

  @tailrec def unsubscribe(subscriber: ActorRef): Unit = {
    val current = subscribers.get
    if (!subscribers.compareAndSet(current, current - subscriber))
      unsubscribe(subscriber) // retry
  }

  def publish(msg: AddressTerminated): Unit = {
    subscribers.get foreach { _.tell(msg, ActorRef.noSender) }
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:35,代码来源:AddressTerminatedTopic.scala


示例17: TcpIncomingConnection

//设置package包名称以及导入依赖的类
package akka.io

import java.nio.channels.SocketChannel
import scala.collection.immutable
import akka.actor.ActorRef
import akka.io.Inet.SocketOption


private[io] class TcpIncomingConnection(_tcp: TcpExt,
                                        _channel: SocketChannel,
                                        registry: ChannelRegistry,
                                        bindHandler: ActorRef,
                                        options: immutable.Traversable[SocketOption],
                                        readThrottling: Boolean)
  extends TcpConnection(_tcp, _channel, readThrottling) {

  signDeathPact(bindHandler)

  registry.register(channel, initialOps = 0)

  def receive = {
    case registration: ChannelRegistration ? completeConnect(registration, bindHandler, options)
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:25,代码来源:TcpIncomingConnection.scala


示例18: SnapshotDirectoryFailureSpec

//设置package包名称以及导入依赖的类
package akka.persistence

import akka.testkit.{ ImplicitSender, EventFilter, TestEvent, AkkaSpec }
import java.io.{ IOException, File }
import akka.actor.{ ActorInitializationException, Props, ActorRef }

object SnapshotDirectoryFailureSpec {
  val inUseSnapshotPath = "target/inUseSnapshotPath"

  class TestProcessor(name: String, probe: ActorRef) extends Processor {

    override def persistenceId: String = name

    override def preStart(): Unit = ()

    def receive = {
      case s: String               ? saveSnapshot(s)
      case SaveSnapshotSuccess(md) ? probe ! md.sequenceNr
      case other                   ? probe ! other
    }
  }
}

class SnapshotDirectoryFailureSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotDirectoryFailureSpec", extraConfig = Some(
  s"""
    |akka.persistence.snapshot-store.local.dir = "${SnapshotDirectoryFailureSpec.inUseSnapshotPath}"
  """.stripMargin))) with ImplicitSender {

  import SnapshotDirectoryFailureSpec._

  val file = new File(inUseSnapshotPath)

  override protected def atStartup() {
    if (!file.createNewFile()) throw new IOException(s"Failed to create test file [${file.getCanonicalFile}]")
  }

  override protected def afterTermination() {
    if (!file.delete()) throw new IOException(s"Failed to delete test file [${file.getCanonicalFile}]")
  }

  "A local snapshot store configured with an failing directory name " must {
    "throw an exception at startup" in {
      EventFilter[ActorInitializationException](occurrences = 1).intercept {
        val processor = system.actorOf(Props(classOf[TestProcessor], "SnapshotDirectoryFailureSpec-1", testActor))
        processor ! "blahonga"
      }
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:SnapshotDirectoryFailureSpec.scala


示例19: PublishSubscribe

//设置package包名称以及导入依赖的类
package docs.camel

object PublishSubscribe {
  //#PubSub
  import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
  import akka.camel.{ Producer, CamelMessage, Consumer }

  class Subscriber(name: String, uri: String) extends Actor with Consumer {
    def endpointUri = uri

    def receive = {
      case msg: CamelMessage => println("%s received: %s" format (name, msg.body))
    }
  }

  class Publisher(name: String, uri: String) extends Actor with Producer {

    def endpointUri = uri

    // one-way communication with JMS
    override def oneway = true
  }

  class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
    def endpointUri = uri

    def receive = {
      case msg: CamelMessage => {
        publisher ! msg.bodyAs[String]
        sender() ! ("message published")
      }
    }
  }

  // Add below to a Boot class
  // Setup publish/subscribe example
  val system = ActorSystem("some-system")
  val jmsUri = "jms:topic:test"
  val jmsSubscriber1 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-1", jmsUri))
  val jmsSubscriber2 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-2", jmsUri))
  val jmsPublisher = system.actorOf(Props(classOf[Publisher], "jms-publisher", jmsUri))
  val jmsPublisherBridge = system.actorOf(Props(classOf[PublisherBridge], "jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))
  //#PubSub
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:45,代码来源:PublishSubscribe.scala


示例20: TwitterService

//设置package包名称以及导入依赖的类
package com.inspiringsolutions.tweet.services

import javax.inject.{Inject, Singleton}

import akka.actor.{ActorRef, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.inspiringsolutions.tweet.actors.CompleteStream
import com.inspiringsolutions.tweet.models.{LimitNotice, Tweet}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods._
import org.slf4j.LoggerFactory
import play.api.Play

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}

@Singleton
class TwitterService @Inject() (twitterStreamService: TwitterStreamProducerService)  {

  private val log = LoggerFactory.getLogger(getClass)

  implicit lazy val actorSystem: ActorSystem = Play.unsafeApplication.injector.instanceOf[ActorSystem]
  implicit val materializer = ActorMaterializer()

  implicit val formats = DefaultFormats

  def processStreamToActorRef(streamConsumerRef: ActorRef, hashTag: String) {
    val streamFuture = twitterStreamService.produceStream(hashTag)

    streamFuture.onComplete {
      case Failure(ex) => log.error("Error while processing stream", ex)
      case Success(stream) =>
        stream.scan("")((acc, curr) => {
          if (acc.contains("\r\n"))
            curr.utf8String
          else
            acc + curr.utf8String
        })
        .filter(_.contains("\r\n")).filterNot(_.trim.isEmpty)
        .map { tryParse }
        .runWith(Sink.actorRef(streamConsumerRef, CompleteStream))
    }
  }

  private def tryParse(json: String) = {
    val parsed = parse(json)
    val tweetAttempt = Try(parsed.extract[Tweet])
    if (tweetAttempt.isSuccess) {
      tweetAttempt
    } else {
      Try(parsed.extract[LimitNotice])
    }
  }
} 
开发者ID:gitter-badger,项目名称:inspiring-tooling,代码行数:56,代码来源:TwitterService.scala



注:本文中的akka.actor.ActorRef类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala AtomicInteger类代码示例发布时间:2022-05-23
下一篇:
Scala FlatSpec类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap