本文整理汇总了Scala中akka.stream.actor.ActorSubscriber类的典型用法代码示例。如果您正苦于以下问题:Scala ActorSubscriber类的具体用法?Scala ActorSubscriber怎么用?Scala ActorSubscriber使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActorSubscriber类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: NakadiActorSubscriber
//设置package包名称以及导入依赖的类
package org.zalando.react.nakadi
import akka.actor.{ActorLogging, ActorRef, Props}
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, RequestStrategy}
import org.zalando.react.nakadi.NakadiMessages._
object NakadiActorSubscriber {
def props(consumerAndProps: ReactiveNakadiProducer, requestStrategyProvider: () => RequestStrategy) = {
Props(new NakadiActorSubscriber(consumerAndProps, requestStrategyProvider))
}
}
class NakadiActorSubscriber(producerAndProps: ReactiveNakadiProducer, requestStrategyProvider: () => RequestStrategy)
extends ActorSubscriber
with ActorLogging {
override protected val requestStrategy = requestStrategyProvider()
private val client: ActorRef = producerAndProps.nakadiClient
override def receive: Receive = {
case ActorSubscriberMessage.OnNext(element) => processElement(element.asInstanceOf[StringProducerMessage])
case ActorSubscriberMessage.OnError(ex) => handleError(ex)
case ActorSubscriberMessage.OnComplete => stop()
}
private def processElement(message: StringProducerMessage) = client ! message
private def handleError(ex: Throwable) = {
log.error(ex, "Stopping Nakadi subscriber due to fatal error.")
stop()
}
def stop() = {
context.stop(self)
}
}
开发者ID:zalando-nakadi,项目名称:reactive-nakadi,代码行数:39,代码来源:NakadiActorSubscriber.scala
示例2: KafkaWriter
//设置package包名称以及导入依赖的类
package wiii
import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}
object KafkaWriter {
def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}
class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
override def preStart(): Unit = initWriter()
override def receive: Receive = {
case _ =>
}
def initWriter(): Unit = {
val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = topicName,
valueSerializer = TweetSerializer
))
val subscriber = context.actorOf(subscriberProps)
val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
val factory = new TwitterStreamFactory()
val twitterStream = factory.getInstance()
twitterStream.addListener(new StatusForwarder(actorRef))
twitterStream.filter(new FilterQuery("espn"))
Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
.runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
}
}
class StatusForwarder(publisher: ActorRef) extends StatusListener {
def onStatus(status: Status): Unit = publisher ! status
//\\ nop all the others for now //\\
def onStallWarning(warning: StallWarning): Unit = {}
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
def onException(ex: Exception): Unit = {}
}
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala
示例3: QueueSubscriber
//设置package包名称以及导入依赖的类
package reactive.queue.router
import akka.actor.{ActorLogging, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError, OnNext}
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.util.ByteString
import io.scalac.amqp.{Connection, Message}
object QueueSubscriber {
def props(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri): Props =
Props(classOf[QueueSubscriber],queueConnection, queueName, queueSubscriberUri)
}
class QueueSubscriber(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri) extends
ActorSubscriber with ActorLogging {
implicit val system = context.system
implicit val ec = context.dispatcher
implicit val materlizer = ActorMaterializer()
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
def receive = {
case OnNext(message: Message) => route(ByteString(message.body.toArray).decodeString("UTF-8"))
case OnComplete => log.info("*** on complete")
case OnError(error) => log.error(s"*** on error: $error")
case HttpResponse(status, _, _, _) => log.info(s"*** route response: ${status.intValue}")
}
def route(message: String): Unit = {
log.info(s"*** on next: $message")
try {
val httpResponse = for {
request <- Marshal(message).to[RequestEntity]
response <- Http().singleRequest(HttpRequest(method = HttpMethods.POST, uri = queueSubscriberUri, entity = request))
entity <- Unmarshal(response).to[HttpResponse]
} yield entity
httpResponse.pipeTo(self)
} catch {
case t: Throwable =>
log.error(s"*** on next: forward to uri $queueSubscriberUri failed on: $message with error: ${t.getMessage}")
queueConnection.publish(queueName, message)
log.info(s"*** on next: republished to queue $queueName")
}
}
}
开发者ID:objektwerks,项目名称:reactive.queue.router,代码行数:52,代码来源:QueueSubscriber.scala
示例4: HrSenderActor
//设置package包名称以及导入依赖的类
package it.wknd.reactive.frontend
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import it.wknd.reactive.Logging
import scala.concurrent.ExecutionContext
class HrSenderActor(implicit val actorSystem: ActorSystem,
val ec: ExecutionContext,
val materializer: ActorMaterializer) extends ActorSubscriber with Logging {
private var inFlight = 0
private val http = Http()
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
override def inFlightInternally: Int = inFlight
}
def receive: Receive = {
case OnNext(event: String) =>
inFlight += 1
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = Uri("http://localhost:2525/hr"),
entity = HttpEntity(
ContentTypes.`application/json`,
event
))) onComplete {
tryy =>
log.info(tryy.toString)
inFlight -= 1
}
}
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:40,代码来源:HrSenderActor.scala
示例5: StepSenderActor
//设置package包名称以及导入依赖的类
package it.wknd.reactive.frontend
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import it.wknd.reactive.Logging
import scala.concurrent.ExecutionContext
class StepSenderActor(implicit val actorSystem: ActorSystem,
val ec: ExecutionContext,
val materializer: ActorMaterializer) extends ActorSubscriber with Logging {
private var inFlight = 0
private val http = Http()
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
override def inFlightInternally: Int = inFlight
}
def receive: Receive = {
case OnNext(event: String) =>
inFlight += 1
http.singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = Uri("http://localhost:2525/step"),
entity = HttpEntity(
ContentTypes.`application/json`,
event
))) onComplete {
tryy =>
log.info(tryy.toString)
inFlight -= 1
}
}
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:41,代码来源:StepSenderActor.scala
示例6: NotifierActor
//设置package包名称以及导入依赖的类
package it.wknd.reactive.backend
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import it.wknd.reactive.Logging
import it.wknd.reactive.backend.model.HealthNotification
class NotifierActor extends ActorSubscriber with Logging {
private var worstHeartRate: Option[Int] = None
private var inFlight = 0
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
override def inFlightInternally: Int = inFlight
}
def receive: Receive = {
case OnNext(event: HealthNotification) =>
inFlight += 1
processEvent(event)
log.info(event.toString)
inFlight -= 1
}
def processEvent(decision: HealthNotification): Unit =
worstHeartRate = Some((decision.heartRate :: worstHeartRate.toList).max)
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:27,代码来源:NotifierActor.scala
示例7: TopicActor
//设置package包名称以及导入依赖的类
package actors
import akka.actor.{ActorRef, Terminated}
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import models.response.View
import play.api.libs.json.{JsValue, Json}
object TopicActor {
case class Subscribe(actorRef: ActorRef)
}
class TopicActor extends ActorSubscriber with View.JsonWriter {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
var subscribers: Set[ActorRef] = Set()
override def receive: Receive = {
case Terminated(actor) =>
subscribers = subscribers - actor
case TopicActor.Subscribe(sub) =>
context.watch(sub)
subscribers = subscribers + sub
case view: View =>
val json = view match {
case value: View.Event =>
Json.obj(("type", "Event"), ("value", Json.toJson(value)))
case value: View.Message =>
Json.obj(("type", "Message"), ("value", Json.toJson(value)))
case value: View.Gravity =>
Json.obj(("type", "Gravity"), ("value", Json.toJson(value)))
}
subscribers.foreach(a => a ! json)
}
}
开发者ID:Augment8,项目名称:slot,代码行数:35,代码来源:TopicActor.scala
示例8: InsertSink
//设置package包名称以及导入依赖的类
package com.github.jeroenr.tepkin
import akka.actor.{ActorLogging, ActorRef, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy, RequestStrategy}
import com.github.jeroenr.bson.{BsonDocument, Bulk}
import com.github.jeroenr.tepkin.protocol.WriteConcern
import com.github.jeroenr.tepkin.protocol.command.Insert
import com.github.jeroenr.tepkin.protocol.message.Reply
class InsertSink(databaseName: String,
collectionName: String,
pool: ActorRef,
parallelism: Int,
ordered: Option[Boolean],
writeConcern: Option[BsonDocument])
extends ActorSubscriber with ActorLogging {
var requests = 0
override protected def requestStrategy: RequestStrategy = new MaxInFlightRequestStrategy(parallelism) {
override def inFlightInternally: Int = requests
}
override def receive: Receive = {
case OnNext(bulk: Bulk) =>
pool ! Insert(databaseName, collectionName, bulk.documents, ordered, writeConcern)
requests += 1
case reply: Reply =>
requests -= 1
if (requests == 0 && canceled) {
context.stop(self)
}
}
}
object InsertSink {
def props(databaseName: String,
collectionName: String,
pool: ActorRef,
parallelism: Int = 1,
ordered: Option[Boolean] = None,
writeConcern: Option[WriteConcern] = None): Props = {
Props(new InsertSink(databaseName, collectionName, pool, parallelism, ordered, writeConcern.map(_.toDoc)))
}
}
开发者ID:dgouyette,项目名称:tepkin,代码行数:49,代码来源:InsertSink.scala
示例9: WritingActor
//设置package包名称以及导入依赖的类
package wiii
import akka.actor.{Actor, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import wiii.Implicits._
object WritingActor {
def props(host: String, port: Int, path: String) = Props(new WritingActor(host, port, path))
}
class WritingActor(host: String, port: Int, path: String) extends Actor with ActorSubscriber with LazyLogging {
val filesys = {
val conf = new Configuration()
conf.set("fs.default.name", s"hdfs://$host:$port")
FileSystem.get(conf)
}
val stream = filesys.create(path, true)
def receive: Receive = {
case OnNext(bs: ByteString) =>
stream.write(bs.toArray)
logger.trace(s"wrote ${bs.size} to $path")
}
protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
}
开发者ID:jw3,项目名称:example-hdfs-docker,代码行数:33,代码来源:WritingActor.scala
示例10: App
//设置package包名称以及导入依赖的类
package kschool.pfm.mail.dvl
import akka.actor.{Props, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import kschool.pfm.mail.dvl.mail._
import kschool.pfm.mail.dvl.model._
import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem( "ReactiveKafka" )
implicit val materializer = ActorMaterializer( )
val kafka = new ReactiveKafka( )
val publisher: Publisher[StringConsumerRecord] = kafka.consume( ConsumerProperties(
bootstrapServers = "kafka:9092",//kafka
topic = "alert_mail",
groupId = "sendMailKschool",
valueDeserializer = new StringDeserializer( )
) )
val senderActor = actorSystem.actorOf( Props[SenderActor] )
val actorSenderMailSubscrtiber: Subscriber[(Future[Option[Contact]], String, String, String)] =
ActorSubscriber[(Future[Option[Contact]], String, String, String)](senderActor)
Source.fromPublisher( publisher )
.map( retrieveContactFromMongo )
.to( Sink.fromSubscriber( actorSenderMailSubscrtiber ) ).run( )
}
def retrieveContactFromMongo(record: StringConsumerRecord): (Future[Option[Contact]], String, String, String) = {
val splittedRecord = record.value.split(",")
val user = splittedRecord(0)
val typeAlarm = splittedRecord(1)
val latitude = splittedRecord(2)
val longitude = splittedRecord(3)
(db.KschoolDAO.findContact( user ), typeAlarm, latitude, longitude)
}
}
开发者ID:DVentas,项目名称:PFM-Kschool-Flink-Docker,代码行数:54,代码来源:App.scala
示例11: ChatService
//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service
import akka.actor.ActorSystem
import akka.stream.{ FlowShape, Materializer }
import akka.stream.scaladsl._
import akka.stream.actor.{ ActorPublisher, ActorSubscriber }
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }
class ChatService(implicit system: ActorSystem, mat: Materializer) {
val roomActor = system.actorOf(RoomActor.props("default"), "defaultroom")
val roomPub =
Source
.fromPublisher[String](ActorPublisher(roomActor))
.map(msg => TextMessage(Source.single(msg)))
def flow(name: String): Flow[Message, Message, Any] = {
val userActor = system.actorOf(UserActor.props(name, roomActor))
roomActor.tell(InternalProtocol.Join(name), userActor)
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val userOut = b.add(Source.fromPublisher(ActorPublisher(userActor)))
val userIn = b.add(Sink.fromSubscriber(ActorSubscriber(userActor)))
val fromMessage = b.add(msgToStringFlow)
val toMessage = b.add(Flow[String].map(msg => TextMessage(msg)))
fromMessage ~> userIn
userOut ~> toMessage
FlowShape(fromMessage.in, toMessage.out)
})
}
def msgToStringFlow: Flow[Message, String, Any] = Flow[Message].mapConcat {
case TextMessage.Strict(msg) => msg :: Nil
case tm: TextMessage =>
tm.textStream.runWith(Sink.ignore)
Nil
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
开发者ID:Technius,项目名称:chatty,代码行数:46,代码来源:ChatService.scala
示例12: UserActor
//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, WatermarkRequestStrategy }
import akka.stream.actor.{ ActorSubscriberMessage => Sub }
import akka.stream.actor.{ ActorPublisherMessage => Pub }
import InternalProtocol._
class UserActor private(name: String, roomActor: ActorRef) extends Actor
with ActorSubscriber
with ActorPublisher[String] {
val requestStrategy = new WatermarkRequestStrategy(100)
val msgQueue = collection.mutable.Queue[String]()
def receive = {
case Sub.OnNext(msg: String) =>
roomActor ! InboundMessage(name, msg)
case Pub.Request(n) => flushQueue(n)
case OutboundMessage(msg) =>
msgQueue += msg
if (isActive && totalDemand > 0) {
flushQueue(totalDemand)
}
case Sub.OnComplete =>
roomActor ! Leave(name)
}
def flushQueue(num: Long): Unit = {
var cur = 0
while (cur < num && msgQueue.size > 0) {
val msg = msgQueue.dequeue()
onNext(msg)
cur = cur + 1
}
}
}
object UserActor {
def props(name: String, roomActor: ActorRef): Props =
Props(new UserActor(name, roomActor))
}
开发者ID:Technius,项目名称:chatty,代码行数:44,代码来源:UserActor.scala
示例13: SummarizerPersistent
//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams
import akka.persistence.PersistentActor
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import scala.collection.mutable.{ Map => MMap }
import scalaz._
import Scalaz._
class SummarizerPersistent extends PersistentActor with ActorSubscriber with Logging {
private val balance = MMap.empty[String, Balance]
override def persistenceId = "transaction-netter"
private var inFlight = 0
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
override def inFlightInternally = inFlight
}
def receiveCommand = {
case OnNext(data: Transaction) =>
inFlight += 1
persistAsync(data) { _ =>
updateBalance(data)
inFlight -= 1
}
case LogSummaryBalance => logger.info("Balance so far: " + balance)
}
def receiveRecover = {
case d: Transaction => updateBalance(d)
}
def updateBalance(data: Transaction) = balance.get(data.accountNo).fold {
balance += ((data.accountNo, Balance(data.amount, data.debitCredit)))
} { b =>
balance += ((data.accountNo, b |+| Balance(data.amount, data.debitCredit)))
}
}
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:45,代码来源:SummarizerPersistent.scala
示例14: TransactionProcessor
//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams
import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Tcp, Source, Sink, Framing}
import akka.util.ByteString
import scala.concurrent.duration._
class TransactionProcessor(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {
def run(): Unit = {
implicit val mat = ActorMaterializer()
val summarizer = system.actorOf(Props[Summarizer])
logger.info(s"Receiver: binding to $host:$port")
Tcp().bind(host, port).runForeach { conn =>
val receiveSink =
conn.flow
.via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 4000, allowTruncation = true))
.map(_.utf8String)
.map(_.split(","))
.mapConcat(Transaction(_).toList)
.to(Sink.fromSubscriber(ActorSubscriber[Transaction](summarizer)))
receiveSink.runWith(Source.maybe)
}
import system.dispatcher
system.scheduler.schedule(0.seconds, 5.seconds, summarizer, LogSummaryBalance)
}
}
object TransactionProcessor extends App {
implicit val system = ActorSystem("processor")
new TransactionProcessor("127.0.0.1", 9982).run()
}
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:43,代码来源:TransactionProcessor.scala
示例15: Summarizer
//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams
import akka.actor.Actor
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import scala.collection.mutable.{ Map => MMap }
import scalaz._
import Scalaz._
class Summarizer extends Actor with ActorSubscriber with Logging {
private val balance = MMap.empty[String, Balance]
private var inFlight = 0
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
override def inFlightInternally = inFlight
}
def receive = {
case OnNext(data: Transaction) =>
inFlight += 1
updateBalance(data)
inFlight -= 1
case LogSummaryBalance => logger.info("Balance so far: " + balance)
}
def updateBalance(data: Transaction) = balance.get(data.accountNo).fold {
balance += ((data.accountNo, Balance(data.amount, data.debitCredit)))
} { b =>
balance += ((data.accountNo, b |+| Balance(data.amount, data.debitCredit)))
}
}
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:37,代码来源:Summarizer.scala
示例16: MessageReceiver
//设置package包名称以及导入依赖的类
package server
import akka.actor.{ActorRef, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{MaxInFlightRequestStrategy, ActorSubscriber}
import akka.util.ByteString
object MessageReceiver {
def props(router: ActorRef): Props = Props(new MessageReceiver(router))
}
class MessageReceiver(router: ActorRef) extends ActorSubscriber {
import akka.stream.actor.ActorPublisherMessage._
import MessageReceiver._
val MaxBufferSize = 100
var uid = ""
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxBufferSize) {
override def inFlightInternally: Int = 0 //not safe :)
}
def receive = {
case OnNext(MyUid(inUid)) =>
uid = inUid
case OnNext(x:Message) =>
router ! x
case OnNext(x:Disconnect) =>
router ! CtrlDisconnect(uid)
case OnNext(x:ListUsers) =>
router ! CtrlListUsers(uid)
}
}
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:34,代码来源:MessageReceiver.scala
示例17: MessageReceiver
//设置package包名称以及导入依赖的类
package gatling.protocol.protoclient
import akka.actor.{ActorRef, Props}
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
import akka.util.ByteString
import gatling.protocol.protoclient.MessageReceiver.{ReceivedMsg, ReceivedUserList}
object MessageReceiver {
def props(router: ActorRef): Props = Props(new MessageReceiver(router))
case class ReceivedMsg(payLoad: String)
case class ReceivedUserList(payLoad: String)
}
class MessageReceiver(router: ActorRef) extends ActorSubscriber {
val MaxBufferSize = 100
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxBufferSize) {
override def inFlightInternally: Int = 0 //not safe :)
}
def receive = {
case OnNext(x: ByteString) => {
val y = x.utf8String
if(y.contains(">")) {
router ! ReceivedMsg(y)
}
else if(y.contains(";")) {
router ! ReceivedUserList(y)
}
}
}
}
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:37,代码来源:MessageReceiver.scala
注:本文中的akka.stream.actor.ActorSubscriber类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论