本文整理汇总了Scala中akka.stream.actor.ActorPublisher类的典型用法代码示例。如果您正苦于以下问题:Scala ActorPublisher类的具体用法?Scala ActorPublisher怎么用?Scala ActorPublisher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActorPublisher类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: deliverBuf
//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query
import akka.actor.ActorLogging
import akka.stream.actor.ActorPublisher
private[akka] trait DeliveryBuffer[T] {
_: ActorPublisher[T] with ActorLogging =>
var buf = Vector.empty[T]
def deliverBuf(): Unit =
if (buf.nonEmpty && totalDemand > 0) {
if (buf.size == 1) {
// optimize for this common case
onNextWithLogging(buf.head)
buf = Vector.empty
} else if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNextWithLogging
} else {
buf foreach onNextWithLogging
buf = Vector.empty
}
}
def onNextWithLogging(element: T): Unit = {
log.debug(s"sending event $element")
onNext(element)
}
}
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:34,代码来源:DeliveryBuffer.scala
示例2: LocalFileStreamSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source
import java.io.File
import java.nio.file.Path
import akka.actor.{Actor, ActorContext, ActorRef, Props}
import akka.stream.actor.ActorPublisher
import build.unstable.sonic.JsonProtocol._
import build.unstable.sonic.model.{Query, RequestContext, SonicMessage}
import build.unstable.sonicd.SonicdLogging
import build.unstable.sonicd.source.file.{FileWatcher, FileWatcherWorker, LocalFilePublisher}
import spray.json._
class LocalFileStreamSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends SonicdSource(query, actorContext, context) {
lazy val publisher: Props = {
val path = getConfig[String]("path")
val tail = getOption[Boolean]("tail").getOrElse(true)
val glob = FileWatcher.parseGlob(path)
val workerProps = { dir: Path ? Props(classOf[FileWatcherWorker], dir) }
val watchers = LocalFilePublisher.getWatchers(glob, actorContext, workerProps)
Props(classOf[LocalFileStreamPublisher], query.id.get, query.query, tail, glob.fileFilterMaybe, watchers, context)
}
}
class LocalFileStreamPublisher(val queryId: Long,
val rawQuery: String,
val tail: Boolean,
val fileFilterMaybe: Option[String],
val watchersPair: Vector[(File, ActorRef)],
val ctx: RequestContext)
extends Actor with ActorPublisher[SonicMessage] with SonicdLogging with LocalFilePublisher {
override def parseUTF8Data(raw: String): JsValue = JsString(raw)
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:42,代码来源:LocalFileStreamSource.scala
示例3: LocalJsonStreamSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.source
import java.io.File
import java.nio.file.Path
import akka.actor._
import akka.stream.actor.ActorPublisher
import build.unstable.sonic.JsonProtocol._
import build.unstable.sonic.model.{Query, RequestContext, SonicMessage}
import build.unstable.sonicd.SonicdLogging
import build.unstable.sonicd.source.file.{FileWatcher, FileWatcherWorker, LocalFilePublisher}
import spray.json._
class LocalJsonStreamSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends SonicdSource(query, actorContext, context) {
val publisher: Props = {
val path = getConfig[String]("path")
val tail = getOption[Boolean]("tail").getOrElse(true)
val glob = FileWatcher.parseGlob(path)
val workerProps = { dir: Path ? Props(classOf[FileWatcherWorker], dir) }
val watchers = LocalFilePublisher.getWatchers(glob, actorContext, workerProps)
Props(classOf[LocalJsonPublisher], query.id.get, query.query, tail, glob.fileFilterMaybe, watchers, context)
}
}
class LocalJsonPublisher(val queryId: Long,
val rawQuery: String,
val tail: Boolean,
val fileFilterMaybe: Option[String],
val watchersPair: Vector[(File, ActorRef)],
val ctx: RequestContext)
extends Actor with ActorPublisher[SonicMessage] with SonicdLogging with LocalFilePublisher {
override def parseUTF8Data(raw: String): JsValue = raw.parseJson
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:40,代码来源:LocalJsonStreamSource.scala
示例4: MockSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.service
import akka.actor.{Actor, ActorContext, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import build.unstable.sonic.model._
import build.unstable.sonicd.SonicdLogging
import scala.collection.mutable
class MockSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends DataSource(query, actorContext, context) {
override def publisher: Props = Props(classOf[ProxyPublisher])
}
class ProxyPublisher extends Actor with ActorPublisher[SonicMessage] with SonicdLogging {
val buffer = mutable.Queue.empty[SonicMessage]
override def unhandled(message: Any): Unit = {
log.warning(">>>>>>>>>>>>>>>> unhandled message for mock publisher: {}", message)
}
override def receive: Receive = {
case c: StreamCompleted ?
if (isActive && totalDemand > 0) {
onNext(c)
onCompleteThenStop()
} else context.become({
case Request(_) ?
onNext(c)
onCompleteThenStop()
})
case m: SonicMessage ?
if (isActive && totalDemand > 0) onNext(m)
else buffer.enqueue(m)
case r: Request ?
while (isActive && totalDemand > 0 && buffer.nonEmpty) {
onNext(buffer.dequeue())
}
}
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:44,代码来源:MockSource.scala
示例5: TaxRate
//设置package包名称以及导入依赖的类
package rcb.tpcdi
import akka.actor.ActorLogging
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage._
case class TaxRate(tx_id: String, tx_name: String, tx_rate: Double)
class Producer extends ActorPublisher[TaxRate] with ActorLogging {
val rnd = new java.util.Random()
def receive = {
case Request(cnt) =>
log.debug("Received request ({}) from subscriber", cnt)
sendTaxRates()
case Cancel =>
log.info("Cancel message received -- stopping")
context.stop(self)
case _ =>
}
def sendTaxRates() {
while(isActive && totalDemand > 0) {
onNext(nextTaxRate())
}
}
def nextTaxRate(): TaxRate = {
TaxRate("US1",
"U.S. Income Tax Bracket for the poor",
(math floor rnd.nextDouble() * 1E5) / 1E5)
}
}
开发者ID:bahadley,项目名称:akka-stream-demo,代码行数:38,代码来源:TaxRateProducer.scala
示例6: NotificationsApp
//设置package包名称以及导入依赖的类
package it.wknd.reactive.backend
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import com.softwaremill.macwire.wire
import com.typesafe.config.ConfigFactory
import it.wknd.reactive.backend.flow.EventGraph
import it.wknd.reactive.backend.model.{HealthNotification, HeartRate, Step}
import it.wknd.reactive.backend.source.{HrActorSource, SourceProvider, StepActorSource}
import scala.concurrent.Future
object NotificationsApp extends App {
implicit val config = ConfigFactory.load()
implicit val actorSystem = ActorSystem("hr-backend")
implicit val ec = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()
lazy val sourceProvider = wire[SourceProvider]
val hrActor = actorSystem.actorOf(Props[HrActorSource])
val hrPub = ActorPublisher[HeartRate](hrActor)
val stepActor = actorSystem.actorOf(Props[StepActorSource])
val stepPub = ActorPublisher[Step](stepActor)
RunnableGraph fromGraph {
EventGraph(
stepSource = Source.fromPublisher(stepPub),
hrSource = Source.fromPublisher(hrPub),
sink = Sink.actorSubscriber[HealthNotification](Props[NotifierActor]))
} run()
val bindingFuture: Future[ServerBinding] =
Http().bindAndHandle(sourceProvider.routes(hrActor = hrActor, stepActor = stepActor), "localhost", 2525)
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:42,代码来源:NotificationsApp.scala
示例7: HrActorSource
//设置package包名称以及导入依赖的类
package it.wknd.reactive.backend.source
import akka.actor.Actor
import akka.stream.actor.ActorPublisher
import it.wknd.reactive.backend.model.HeartRate
class HrActorSource extends Actor with ActorPublisher[HeartRate] {
import akka.stream.actor.ActorPublisherMessage._
var items: List[HeartRate] = List.empty
def receive: Receive = {
case event: HeartRate =>
if (totalDemand == 0) items = items :+ event
else onNext(event)
case Request(demand) =>
if (demand > items.size) {
items foreach onNext
items = List.empty
} else {
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach onNext
}
case other =>
println(s"got other $other")
}
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:31,代码来源:HrActorSource.scala
示例8: StepActorSource
//设置package包名称以及导入依赖的类
package it.wknd.reactive.backend.source
import akka.actor.Actor
import akka.stream.actor.ActorPublisher
import it.wknd.reactive.backend.model.Step
class StepActorSource extends Actor with ActorPublisher[Step] {
import akka.stream.actor.ActorPublisherMessage._
var items: List[Step] = List.empty
def receive: Receive = {
case event: Step =>
if (totalDemand == 0) items = items :+ event
else onNext(event)
case Request(demand: Long) =>
if (demand > items.size) {
items foreach onNext
items = List.empty
} else {
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach onNext
}
case other =>
println(s"got other $other")
}
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:31,代码来源:StepActorSource.scala
示例9: EventStreamActor
//设置package包名称以及导入依赖的类
package actors
import java.util.UUID
import akka.actor.Actor.Receive
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import play.api.libs.json.{JsObject, JsString, JsValue}
class EventStreamActor extends ActorPublisher[JsValue] {
import EventStreamActor._
import akka.stream.actor.ActorPublisherMessage._
override def receive: Receive = {
case DataUpdated(js) => onNext(js)
case ErrorOccurred(msg) => onNext(JsObject(Seq("error" -> JsString(msg))))
case Request(_) => ()
case Cancel => context.stop(self)
}
}
object EventStreamActor {
def props = Props(new EventStreamActor)
case class DataUpdated(jsValue: JsValue)
case class ErrorOccurred(message: String)
val name = "event-stream-actor"
val pathPattern = s"/user/$name-*"
def name(maybeUserId: Option[UUID]): String = {
val randomPart = UUID.randomUUID().toString.split("-").apply(0)
val userPart = maybeUserId.map(_.toString).getOrElse("unregistered")
s"$name-$userPart-$randomPart"
}
def userSpecificPathPattern(userId: UUID): String = {
s"/user/$name-${userId.toString}-*"
}
}
开发者ID:getArtemUsername,项目名称:play-and-events,代码行数:43,代码来源:EventStreamActor.scala
示例10: WebSocketActorPublisher
//设置package包名称以及导入依赖的类
package smarthouse.restapi.http.ws
import akka.actor.{ActorLogging, ActorRef, Props, Terminated}
import akka.stream.actor.ActorPublisher
import scala.annotation.tailrec
class WebSocketActorPublisher(handler: ActorRef, bufferSize: Int) extends ActorPublisher[String] with ActorLogging {
case object Updated
import akka.stream.actor.ActorPublisherMessage._
import scala.collection.mutable
val queue = mutable.Queue[String]()
var updated = false
override def preStart() {
handler ! WebSocketHandler.PublisherCreated
context.watch(handler)
}
def receive: Receive = {
case stats: String =>
if (queue.size == bufferSize) queue.dequeue()
queue += stats
if (!updated) {
updated = true
self ! Updated
}
case Updated =>
deliver()
case Request(amount) =>
deliver()
case Cancel =>
context.stop(self)
case Terminated(`handler`) =>
context.stop(self)
}
@tailrec final def deliver(): Unit = {
if (queue.isEmpty && totalDemand != 0) {
updated = false
} else if (totalDemand > 0 && queue.nonEmpty) {
onNext(queue.dequeue())
deliver()
}
}
}
object WebSocketActorPublisher {
def props(handler: ActorRef, bufferSize: Int): Props = Props(classOf[WebSocketActorPublisher], handler, bufferSize)
}
开发者ID:andrewobukhov,项目名称:smart-house,代码行数:57,代码来源:WebSocketActorPublisher.scala
示例11: KafkaEventPublisher
//设置package包名称以及导入依赖的类
package events
import akka.stream.actor.ActorPublisher
import models.KafkaEvents.Event
import play.api.Logger
class KafkaEventPublisher extends ActorPublisher[Event] {
override def preStart(): Unit = {
super.preStart()
context.system.eventStream.subscribe(self, classOf[Event])
}
override def postStop(): Unit = {
super.postStop()
context.system.eventStream.unsubscribe(self)
}
override def receive = {
case e: Event =>
Logger.info(s"send event: $e to kafka")
onNext(e)
case other =>
Logger.warn(s"Receive unsupported event $other")
}
}
开发者ID:fsanaulla,项目名称:Akka-Kafka-Producer,代码行数:29,代码来源:KafkaEventPublisher.scala
示例12: CloseConnection
//设置package包名称以及导入依赖的类
package szymonbaranczyk.exitFlow
import akka.actor.{ActorRef, Props}
import akka.stream.actor.ActorPublisher
import com.typesafe.scalalogging.LazyLogging
import szymonbaranczyk.dataLayer.{BulletState, CloseHandle}
import scala.concurrent.ExecutionContext
case class CloseConnection()
class GameDataPublisher(val gameDataBus: GameDataBus, val gameId: Int, playerActor: ActorRef) extends ActorPublisher[GameData] with LazyLogging {
override def preStart = {
gameDataBus.subscribe(self, gameId)
playerActor ! CloseHandle(self)
}
override def receive: Receive = {
case msg: GameData =>
if (isActive && totalDemand > 0) {
onNext(msg)
}
case CloseConnection() =>
gameDataBus.unsubscribe(self, gameId)
onCompleteThenStop()
}
}
object GameDataPublisher {
def props(implicit ctx: ExecutionContext, gameDataBus: GameDataBus, gameId: Int, playerActor:ActorRef): Props = Props(new GameDataPublisher(gameDataBus, gameId,playerActor))
}
case class GameData(playersData: Seq[PlayerData], bulletData: Seq[BulletState])
case class PlayerData(x: Int,y: Int,rotation: Int,turretRotation: Int,id: String, meta:String)
开发者ID:szymonbaranczyk,项目名称:webmmo,代码行数:38,代码来源:GameDataPublisher.scala
示例13: MyPublisher
//设置package包名称以及导入依赖的类
package bidding.client.console
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import bidding.model.AuctionItem
import scala.concurrent.ExecutionContext
class MyPublisher extends ActorPublisher[AuctionItem]{
override def preStart: Unit = {
context.system.eventStream.subscribe(self, classOf[AuctionItem])
}
override def receive: Receive = {
case msg: AuctionItem =>
if (isActive && totalDemand > 0) {
// Pushes the message onto the stream
onNext(msg)
}
}
}
object MyPublisher {
def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}
开发者ID:oleksandr-iskhakov,项目名称:bidding-server,代码行数:28,代码来源:MyPublisher.scala
示例14: TweetPublisher
//设置package包名称以及导入依赖的类
package de.codecentric.dcos_intro
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
class TweetPublisher extends ActorPublisher[Tweet] {
override def receive: Receive = {
case t: Tweet => {
if (isActive && totalDemand > 0) {
onNext(t)
}
}
case Cancel => context.stop(self)
case Request(_) => {}
}
}
开发者ID:ftrossbach,项目名称:intro-to-dcos,代码行数:18,代码来源:TweetPublisher.scala
示例15: StreamingActor
//设置package包名称以及导入依赖的类
package wiii
import java.io.FileNotFoundException
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
object StreamingActor {
def props(host: String, port: Int, path: String) = Props(new StreamingActor(host, port, path))
}
class StreamingActor(host: String, port: Int, path: String) extends ActorPublisher[ByteString] with LazyLogging {
val filesys = {
val conf = new Configuration()
conf.set("fs.default.name", s"hdfs://$host:$port")
FileSystem.get(conf)
}
val chunkSize = 1024
val arr = Array.ofDim[Byte](chunkSize)
def receive: Receive = {
case Request(cnt) =>
val uri = new Path(path)
uri match {
case p if !filesys.exists(p) => throw new FileNotFoundException(s"$p does not exist")
case p if !filesys.getFileStatus(p).isFile => throw new FileNotFoundException(s"$p is not a file")
case p =>
val is = filesys.open(p)
val readBytes = is.read(arr)
onNext(ByteString.fromArray(arr, 0, readBytes))
}
case Cancel => context.stop(self)
case _ =>
}
}
开发者ID:jw3,项目名称:example-hdfs-docker,代码行数:42,代码来源:StreamingActor.scala
示例16: MatSample
//设置package包名称以及导入依赖的类
package com.github.uryyyyyyy.akka.stream.helloworld
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl._
object MatSample {
def main (args: Array[String]) {
val system = ActorSystem.create("sample")
implicit val materializer = ActorMaterializer.create(system)
val source: Source[Int, ActorRef] = Source.actorPublisher[Int](Props[SampleActor])
val rg: RunnableGraph[ActorRef] = source.to(Sink.foreach(println))
val actor = rg.run()
Thread.sleep(1000) // ??????sleep
actor ! Message(1)
actor ! Message(2)
actor ! Message(3)
actor ! END
system.terminate()
}
}
case class Message(n: Int)
case object END
class SampleActor extends ActorPublisher[Int] {
def receive = {
case Message(n) => onNext(n)
case END => onComplete() // ?????????????????fold???????Downstream????????????????????????fold????????
}
}
开发者ID:uryyyyyyy,项目名称:akkaSample,代码行数:39,代码来源:MatSample.scala
示例17: EducatedActor
//设置package包名称以及导入依赖的类
package com.example.eventuate
import akka.actor.{ActorRef, Props}
import akka.stream.actor.ActorPublisher
import com.rbmhtechnology.eventuate.crdt.Apology
import com.rbmhtechnology.eventuate.{EventsourcedActor, Versioned}
object EducatedActor {
def props(id: String, eventLog: ActorRef): Props = Props(new EducatedActor(id,eventLog))
case class MatchApology(matchId: String, removed: String, player: String)
case class AcceptApology(matchId: String, player: String)
case class ApologyAccepted(matchId: String, player:String)
}
class EducatedActor(val id: String, val eventLog: ActorRef) extends EventsourcedActor with ActorPublisher[EducatedActor.MatchApology] {
import EducatedActor._
override def onCommand: Receive = {
case AcceptApology(matchId,player) => persist(ApologyAccepted(matchId,player)){
case e => sender() ! e
}
}
override def onEvent: Receive = {
case Apology(Versioned(e1,_,_,_),Versioned(e2,_,_,_)) if (!recovering) => {
//case Apology(Versioned(e1,_,_,_),Versioned(e2,_,_,_)) => {
val matchId = lastEmitterAggregateId.get.replaceFirst("CERMatch_match-","")
onNext(MatchApology(matchId,e1.asInstanceOf[String], e2.asInstanceOf[String]))
}
}
}
开发者ID:gabrielgiussi,项目名称:eventuate-cerdt-example,代码行数:37,代码来源:EducatedActor.scala
示例18: ConnectionAgent
//设置package包名称以及导入依赖的类
package com.bisphone.sarf.implv1.tcpservice
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import com.bisphone.sarf.IOCommand
import org.slf4j.Logger
import scala.collection.mutable
private[implv1] class ConnectionAgent (logger: Logger) extends ActorPublisher[IOCommand] {
val queue = mutable.Queue.empty[IOCommand]
def tryDeliver (): Unit = {
if (totalDemand > 0 && queue.nonEmpty) onNext(queue dequeue)
}
private def tryPush (cmd: IOCommand): Unit = {
queue enqueue cmd
tryDeliver()
}
def receive: Receive = {
case cmd: IOCommand => tryPush(cmd)
case Request(_) => tryDeliver()
case Cancel =>
if (logger.isDebugEnabled()) logger.debug(s"Actor($self) received 'Cancel' signal!")
context stop self
}
override def preStart (): Unit = {
if (logger.isDebugEnabled()) logger.debug(s"Actor($self) preStart")
}
override def postStop (): Unit = {
if (logger.isDebugEnabled()) logger.debug(s"Actor($self) postStop")
}
}
private[implv1] object ConnectionAgent {
def props (logger: Logger) = Props {
new ConnectionAgent(logger)
}
}
开发者ID:bisphone,项目名称:SARF,代码行数:48,代码来源:ConnectionAgent.scala
示例19: ChatService
//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service
import akka.actor.ActorSystem
import akka.stream.{ FlowShape, Materializer }
import akka.stream.scaladsl._
import akka.stream.actor.{ ActorPublisher, ActorSubscriber }
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }
class ChatService(implicit system: ActorSystem, mat: Materializer) {
val roomActor = system.actorOf(RoomActor.props("default"), "defaultroom")
val roomPub =
Source
.fromPublisher[String](ActorPublisher(roomActor))
.map(msg => TextMessage(Source.single(msg)))
def flow(name: String): Flow[Message, Message, Any] = {
val userActor = system.actorOf(UserActor.props(name, roomActor))
roomActor.tell(InternalProtocol.Join(name), userActor)
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val userOut = b.add(Source.fromPublisher(ActorPublisher(userActor)))
val userIn = b.add(Sink.fromSubscriber(ActorSubscriber(userActor)))
val fromMessage = b.add(msgToStringFlow)
val toMessage = b.add(Flow[String].map(msg => TextMessage(msg)))
fromMessage ~> userIn
userOut ~> toMessage
FlowShape(fromMessage.in, toMessage.out)
})
}
def msgToStringFlow: Flow[Message, String, Any] = Flow[Message].mapConcat {
case TextMessage.Strict(msg) => msg :: Nil
case tm: TextMessage =>
tm.textStream.runWith(Sink.ignore)
Nil
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
开发者ID:Technius,项目名称:chatty,代码行数:46,代码来源:ChatService.scala
示例20: UserActor
//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, WatermarkRequestStrategy }
import akka.stream.actor.{ ActorSubscriberMessage => Sub }
import akka.stream.actor.{ ActorPublisherMessage => Pub }
import InternalProtocol._
class UserActor private(name: String, roomActor: ActorRef) extends Actor
with ActorSubscriber
with ActorPublisher[String] {
val requestStrategy = new WatermarkRequestStrategy(100)
val msgQueue = collection.mutable.Queue[String]()
def receive = {
case Sub.OnNext(msg: String) =>
roomActor ! InboundMessage(name, msg)
case Pub.Request(n) => flushQueue(n)
case OutboundMessage(msg) =>
msgQueue += msg
if (isActive && totalDemand > 0) {
flushQueue(totalDemand)
}
case Sub.OnComplete =>
roomActor ! Leave(name)
}
def flushQueue(num: Long): Unit = {
var cur = 0
while (cur < num && msgQueue.size > 0) {
val msg = msgQueue.dequeue()
onNext(msg)
cur = cur + 1
}
}
}
object UserActor {
def props(name: String, roomActor: ActorRef): Props =
Props(new UserActor(name, roomActor))
}
开发者ID:Technius,项目名称:chatty,代码行数:44,代码来源:UserActor.scala
注:本文中的akka.stream.actor.ActorPublisher类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论