本文整理汇总了Scala中akka.stream.actor.ActorPublisherMessage.Request类的典型用法代码示例。如果您正苦于以下问题:Scala Request类的具体用法?Scala Request怎么用?Scala Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Request类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: MockSource
//设置package包名称以及导入依赖的类
package build.unstable.sonicd.service
import akka.actor.{Actor, ActorContext, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import build.unstable.sonic.model._
import build.unstable.sonicd.SonicdLogging
import scala.collection.mutable
class MockSource(query: Query, actorContext: ActorContext, context: RequestContext)
extends DataSource(query, actorContext, context) {
override def publisher: Props = Props(classOf[ProxyPublisher])
}
class ProxyPublisher extends Actor with ActorPublisher[SonicMessage] with SonicdLogging {
val buffer = mutable.Queue.empty[SonicMessage]
override def unhandled(message: Any): Unit = {
log.warning(">>>>>>>>>>>>>>>> unhandled message for mock publisher: {}", message)
}
override def receive: Receive = {
case c: StreamCompleted ?
if (isActive && totalDemand > 0) {
onNext(c)
onCompleteThenStop()
} else context.become({
case Request(_) ?
onNext(c)
onCompleteThenStop()
})
case m: SonicMessage ?
if (isActive && totalDemand > 0) onNext(m)
else buffer.enqueue(m)
case r: Request ?
while (isActive && totalDemand > 0 && buffer.nonEmpty) {
onNext(buffer.dequeue())
}
}
}
开发者ID:ernestrc,项目名称:sonicd,代码行数:44,代码来源:MockSource.scala
示例2: ActorForwarder
//设置package包名称以及导入依赖的类
package com.olegych.scastie.util
import akka.actor.{Actor, ActorRef}
import akka.stream.actor.ActorPublisher
import scala.collection.mutable.{Queue => MQueue}
import akka.stream.actor.ActorPublisherMessage.Request
import scala.reflect.ClassTag
class ActorForwarder[T: ClassTag](
callback: (T, Boolean) => Unit
) extends Actor
with ActorPublisher[T] {
private var buffer = MQueue.empty[T]
override def receive: Receive = {
case msg: T => {
buffer.enqueue(msg)
deliver()
}
case _: Request => {
deliver()
}
}
private def deliver(): Unit = {
if (totalDemand > 0) {
val (deliverNow, deliverLater) = buffer.splitAt(totalDemand.toInt)
buffer = deliverLater
val noDemmand = totalDemand == 0L
deliverNow.foreach { msg =>
onNext(msg)
callback(msg, noDemmand)
}
}
}
}
开发者ID:scalacenter,项目名称:scastie,代码行数:40,代码来源:ActorForwarder.scala
示例3: TweetPublisher
//设置package包名称以及导入依赖的类
package de.codecentric.dcos_intro
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
class TweetPublisher extends ActorPublisher[Tweet] {
override def receive: Receive = {
case t: Tweet => {
if (isActive && totalDemand > 0) {
onNext(t)
}
}
case Cancel => context.stop(self)
case Request(_) => {}
}
}
开发者ID:ftrossbach,项目名称:intro-to-dcos,代码行数:18,代码来源:TweetPublisher.scala
示例4: StreamingActor
//设置package包名称以及导入依赖的类
package wiii
import java.io.FileNotFoundException
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
object StreamingActor {
def props(host: String, port: Int, path: String) = Props(new StreamingActor(host, port, path))
}
class StreamingActor(host: String, port: Int, path: String) extends ActorPublisher[ByteString] with LazyLogging {
val filesys = {
val conf = new Configuration()
conf.set("fs.default.name", s"hdfs://$host:$port")
FileSystem.get(conf)
}
val chunkSize = 1024
val arr = Array.ofDim[Byte](chunkSize)
def receive: Receive = {
case Request(cnt) =>
val uri = new Path(path)
uri match {
case p if !filesys.exists(p) => throw new FileNotFoundException(s"$p does not exist")
case p if !filesys.getFileStatus(p).isFile => throw new FileNotFoundException(s"$p is not a file")
case p =>
val is = filesys.open(p)
val readBytes = is.read(arr)
onNext(ByteString.fromArray(arr, 0, readBytes))
}
case Cancel => context.stop(self)
case _ =>
}
}
开发者ID:jw3,项目名称:example-hdfs-docker,代码行数:42,代码来源:StreamingActor.scala
示例5: ConnectionAgent
//设置package包名称以及导入依赖的类
package com.bisphone.sarf.implv1.tcpservice
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import com.bisphone.sarf.IOCommand
import org.slf4j.Logger
import scala.collection.mutable
private[implv1] class ConnectionAgent (logger: Logger) extends ActorPublisher[IOCommand] {
val queue = mutable.Queue.empty[IOCommand]
def tryDeliver (): Unit = {
if (totalDemand > 0 && queue.nonEmpty) onNext(queue dequeue)
}
private def tryPush (cmd: IOCommand): Unit = {
queue enqueue cmd
tryDeliver()
}
def receive: Receive = {
case cmd: IOCommand => tryPush(cmd)
case Request(_) => tryDeliver()
case Cancel =>
if (logger.isDebugEnabled()) logger.debug(s"Actor($self) received 'Cancel' signal!")
context stop self
}
override def preStart (): Unit = {
if (logger.isDebugEnabled()) logger.debug(s"Actor($self) preStart")
}
override def postStop (): Unit = {
if (logger.isDebugEnabled()) logger.debug(s"Actor($self) postStop")
}
}
private[implv1] object ConnectionAgent {
def props (logger: Logger) = Props {
new ConnectionAgent(logger)
}
}
开发者ID:bisphone,项目名称:SARF,代码行数:48,代码来源:ConnectionAgent.scala
示例6: NanoboardMessagePublisher
//设置package包名称以及导入依赖的类
package com.karasiq.nanoboard.server.streaming
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import com.karasiq.nanoboard.streaming.NanoboardEvent
import scala.annotation.tailrec
private[server] class NanoboardMessagePublisher extends ActorPublisher[NanoboardEvent] {
override def preStart(): Unit = {
super.preStart()
context.system.eventStream.subscribe(self, classOf[NanoboardEvent])
}
override def postStop(): Unit = {
context.system.eventStream.unsubscribe(self)
super.postStop()
}
val maxBufferSize = 20
var messageBuffer = Vector.empty[NanoboardEvent]
override def receive: Receive = {
case m: NanoboardEvent ?
if (messageBuffer.isEmpty && totalDemand > 0) {
onNext(m)
} else {
if (messageBuffer.length >= maxBufferSize) {
messageBuffer = messageBuffer.tail :+ m
} else {
messageBuffer :+= m
}
deliverBuffer()
}
case Request(_) ?
deliverBuffer()
case Cancel ?
context.stop(self)
}
@tailrec final def deliverBuffer(): Unit = {
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = messageBuffer.splitAt(totalDemand.toInt)
messageBuffer = keep
use foreach onNext
} else {
val (use, keep) = messageBuffer.splitAt(Int.MaxValue)
messageBuffer = keep
use foreach onNext
deliverBuffer()
}
}
}
}
开发者ID:Karasiq,项目名称:nanoboard,代码行数:58,代码来源:NanoboardMessagePublisher.scala
注:本文中的akka.stream.actor.ActorPublisherMessage.Request类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论