本文整理汇总了Scala中org.reactivestreams.Subscriber类的典型用法代码示例。如果您正苦于以下问题:Scala Subscriber类的具体用法?Scala Subscriber怎么用?Scala Subscriber使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Subscriber类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Sink
//设置package包名称以及导入依赖的类
package mesosphere.marathon.stream
import akka.actor.{ ActorRef, Props, Status }
import akka.{ Done, NotUsed }
import akka.stream.{ Graph, SinkShape, UniformFanOutShape }
import akka.stream.scaladsl.{ SinkQueueWithCancel, Sink => AkkaSink }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
object Sink {
def set[T]: AkkaSink[T, Future[immutable.Set[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, immutable.Set[T]](immutable.Set.newBuilder[T]))
}
def sortedSet[T](implicit ordering: Ordering[T]): AkkaSink[T, Future[immutable.SortedSet[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, immutable.SortedSet[T]](immutable.SortedSet.newBuilder[T]))
}
def map[K, V]: AkkaSink[(K, V), Future[immutable.Map[K, V]]] = {
AkkaSink.fromGraph(new CollectionStage[(K, V), immutable.Map[K, V]](immutable.Map.newBuilder[K, V]))
}
def list[T]: AkkaSink[T, Future[List[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, List[T]](List.newBuilder[T]))
}
// Akka's API
def fromGraph[T, M](g: Graph[SinkShape[T], M]): AkkaSink[T, M] = AkkaSink.fromGraph(g)
def fromSubscriber[T](subscriber: Subscriber[T]): AkkaSink[T, NotUsed] = AkkaSink.fromSubscriber(subscriber)
def cancelled[T]: AkkaSink[T, NotUsed] = AkkaSink.cancelled
def head[T]: AkkaSink[T, Future[T]] = AkkaSink.head
def headOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.headOption
def last[T]: AkkaSink[T, Future[T]] = AkkaSink.last[T]
def lastOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.lastOption[T]
def seq[T]: AkkaSink[T, Future[Seq[T]]] = AkkaSink.seq[T]
def asPublisher[T](fanout: Boolean): AkkaSink[T, Publisher[T]] = AkkaSink.asPublisher[T](fanout)
def ignore: AkkaSink[Any, Future[Done]] = AkkaSink.ignore
def foreach[T](f: T => Unit): AkkaSink[T, Future[Done]] = AkkaSink.foreach[T](f)
def combine[T, U](
first: AkkaSink[U, _],
second: AkkaSink[U, _],
rest: AkkaSink[U, _]*)(strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): AkkaSink[T, NotUsed] =
AkkaSink.combine[T, U](first, second, rest: _*)(strategy)
def foreachParallel[T](parallelism: Int)(f: T ? Unit)(implicit ec: ExecutionContext): AkkaSink[T, Future[Done]] =
AkkaSink.foreachParallel[T](parallelism)(f)
def fold[U, T](zero: U)(f: (U, T) ? U): AkkaSink[T, Future[U]] = AkkaSink.fold[U, T](zero)(f)
def reduce[T](f: (T, T) ? T): AkkaSink[T, Future[T]] = AkkaSink.reduce(f)
def onComplete[T](callback: Try[Done] => Unit): AkkaSink[T, NotUsed] = AkkaSink.onComplete(callback)
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): AkkaSink[T, NotUsed] =
AkkaSink.actorRef(ref, onCompleteMessage)
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) ? Any = Status.Failure): AkkaSink[T, NotUsed] =
AkkaSink.actorRefWithAck(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
def actorSubscriber[T](props: Props): AkkaSink[T, ActorRef] = AkkaSink.actorSubscriber(props)
def queue[T](): AkkaSink[T, SinkQueueWithCancel[T]] = AkkaSink.queue[T]()
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:62,代码来源:Sink.scala
示例2: SubscriberDrainStage
//设置package包名称以及导入依赖的类
package swave.core.impl.stages.drain
import org.reactivestreams.Subscriber
import swave.core.Stage
import swave.core.impl.Inport
import swave.core.impl.rs.ForwardToRunnerSubscription
import swave.core.impl.stages.DrainStage
import swave.core.macros.StageImplementation
// format: OFF
@StageImplementation
private[core] final class SubscriberDrainStage(subscriber: Subscriber[AnyRef]) extends DrainStage {
def kind = Stage.Kind.Drain.FromSubscriber(subscriber)
connectInAndSealWith { in ?
region.impl.requestDispatcherAssignment()
region.impl.registerForXStart(this)
awaitingXStart(in)
}
def awaitingXStart(in: Inport): State = state(
xStart = () => {
subscriber.onSubscribe(new ForwardToRunnerSubscription(this))
running(in)
})
def running(in: Inport): State = state(
intercept = false,
request = requestF(in),
cancel = stopCancelF(in),
onNext = (elem, _) => {
subscriber.onNext(elem)
stay()
},
onComplete = _ => {
subscriber.onComplete()
stop()
},
onError = (e, _) => {
subscriber.onError(e)
stop(e)
},
xEvent = { case ForwardToRunnerSubscription.IllegalRequest(n) =>
subscriber.onError(new IllegalArgumentException(
"The number of elements requested must be > 0 (see reactive-streams spec, rule 3.9)"))
stopCancel(in)
})
}
开发者ID:sirthias,项目名称:swave,代码行数:55,代码来源:SubscriberDrainStage.scala
示例3: SubscriberWhiteboxVerificationTest
//设置package包名称以及导入依赖的类
package ru.reajames
package reactivespecification
import org.scalatest.testng.TestNGSuiteLike
import java.util.concurrent.atomic.AtomicInteger
import concurrent.ExecutionContext.Implicits.global
import org.reactivestreams.{Subscriber, Subscription}
import org.reactivestreams.tck.{SubscriberWhiteboxVerification, TestEnvironment}
import org.reactivestreams.tck.SubscriberWhiteboxVerification.{SubscriberPuppet, WhiteboxSubscriberProbe}
class SubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification[String](new TestEnvironment(300)) with TestNGSuiteLike
with ActimeMQConnectionFactoryAware {
val connectionHolder = new ConnectionHolder(connectionFactory)
val counter = new AtomicInteger()
private def queue = Queue("subscriber-bb-verification-" + counter.incrementAndGet)
def createSubscriber(probe: WhiteboxSubscriberProbe[String]): Subscriber[String] = {
new JmsSender[String](connectionHolder, queue, _ createTextMessage _) {
override def onSubscribe(subscription: Subscription): Unit = {
super.onSubscribe(subscription)
probe.registerOnSubscribe(new SubscriberPuppet {
def triggerRequest(elements: Long) = subscription.request(elements)
def signalCancel() = subscription.cancel()
})
}
override def onNext(element: String): Unit = {
super.onNext(element)
probe.registerOnNext(element)
}
override def onComplete(): Unit = {
super.onComplete()
probe.registerOnComplete()
}
override def onError(th: Throwable): Unit = {
super.onError(th)
probe.registerOnError(th)
}
}
}
def createElement(element: Int): String = "message " + element
}
开发者ID:dobrynya,项目名称:reajames,代码行数:49,代码来源:SubscriberWhiteboxVerificationTest.scala
示例4:
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ProducerMessage, ConsumerProperties, ProducerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "lowercaseStrings",
groupId = "groupName",
valueDeserializer = new StringDeserializer()
))
val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = "uppercaseStrings",
valueSerializer = new StringSerializer()
))
Source.fromPublisher(publisher).map(m => ProducerMessage(m.value().toUpperCase))
.to(Sink.fromSubscriber(subscriber)).run()
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:28,代码来源:UseCase.scala
示例5: App
//设置package包名称以及导入依赖的类
package kschool.pfm.mail.dvl
import akka.actor.{Props, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import kschool.pfm.mail.dvl.mail._
import kschool.pfm.mail.dvl.model._
import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem( "ReactiveKafka" )
implicit val materializer = ActorMaterializer( )
val kafka = new ReactiveKafka( )
val publisher: Publisher[StringConsumerRecord] = kafka.consume( ConsumerProperties(
bootstrapServers = "kafka:9092",//kafka
topic = "alert_mail",
groupId = "sendMailKschool",
valueDeserializer = new StringDeserializer( )
) )
val senderActor = actorSystem.actorOf( Props[SenderActor] )
val actorSenderMailSubscrtiber: Subscriber[(Future[Option[Contact]], String, String, String)] =
ActorSubscriber[(Future[Option[Contact]], String, String, String)](senderActor)
Source.fromPublisher( publisher )
.map( retrieveContactFromMongo )
.to( Sink.fromSubscriber( actorSenderMailSubscrtiber ) ).run( )
}
def retrieveContactFromMongo(record: StringConsumerRecord): (Future[Option[Contact]], String, String, String) = {
val splittedRecord = record.value.split(",")
val user = splittedRecord(0)
val typeAlarm = splittedRecord(1)
val latitude = splittedRecord(2)
val longitude = splittedRecord(3)
(db.KschoolDAO.findContact( user ), typeAlarm, latitude, longitude)
}
}
开发者ID:DVentas,项目名称:PFM-Kschool-Flink-Docker,代码行数:54,代码来源:App.scala
示例6: Fork
//设置package包名称以及导入依赖的类
package alexsmirnov.stream
import org.reactivestreams.Processor
import org.reactivestreams.Subscriber
class Fork[A] extends Processor[A, A] with SubscriberBase[A] { self =>
var branches: List[BranchPublisher] = Nil
class BranchPublisher extends PublisherBase[A] {
def onStart() { self.request(1L) }
def onStop() { self.cancel() }
}
def subscribe(sub: Subscriber[_ >: A]) {
val pub = new BranchPublisher()
pub.subscribe(sub)
branches = pub +: branches
}
def onNext(a: A) = { branches.foreach(_.sendNext(a)); request(1L) }
def onComplete() = branches.foreach(_.sendComplete())
def onError(t: Throwable) = branches.foreach(_.sendError(t))
}
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:23,代码来源:Fork.scala
示例7: onSubscribe
//设置package包名称以及导入依赖的类
package alexsmirnov.stream
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import scala.concurrent.Future
trait SubscriberBase[A] extends Subscriber[A] {
private[this] var subscription: Subscription = null
private[this] val singleExecutor = Executors.newSingleThreadExecutor(streamsThreadFactory)
private implicit val execContext = ExecutionContext.fromExecutorService(singleExecutor)
def onSubscribe(s: Subscription) = {
require(subscription == null, "Subscriber already has subscription")
subscription = s
}
def request(n: Long) {
require(subscription != null, "Subscriber has no subscription")
Future(subscription.request(n))
}
def cancel() {
require(subscription != null, "Subscriber has no subscription")
Future(subscription.cancel())
}
}
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:28,代码来源:SubscriberBase.scala
注:本文中的org.reactivestreams.Subscriber类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论