本文整理汇总了Scala中org.reactivestreams.Publisher类的典型用法代码示例。如果您正苦于以下问题:Scala Publisher类的具体用法?Scala Publisher怎么用?Scala Publisher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Publisher类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Sink
//设置package包名称以及导入依赖的类
package mesosphere.marathon.stream
import akka.actor.{ ActorRef, Props, Status }
import akka.{ Done, NotUsed }
import akka.stream.{ Graph, SinkShape, UniformFanOutShape }
import akka.stream.scaladsl.{ SinkQueueWithCancel, Sink => AkkaSink }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
object Sink {
def set[T]: AkkaSink[T, Future[immutable.Set[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, immutable.Set[T]](immutable.Set.newBuilder[T]))
}
def sortedSet[T](implicit ordering: Ordering[T]): AkkaSink[T, Future[immutable.SortedSet[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, immutable.SortedSet[T]](immutable.SortedSet.newBuilder[T]))
}
def map[K, V]: AkkaSink[(K, V), Future[immutable.Map[K, V]]] = {
AkkaSink.fromGraph(new CollectionStage[(K, V), immutable.Map[K, V]](immutable.Map.newBuilder[K, V]))
}
def list[T]: AkkaSink[T, Future[List[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, List[T]](List.newBuilder[T]))
}
// Akka's API
def fromGraph[T, M](g: Graph[SinkShape[T], M]): AkkaSink[T, M] = AkkaSink.fromGraph(g)
def fromSubscriber[T](subscriber: Subscriber[T]): AkkaSink[T, NotUsed] = AkkaSink.fromSubscriber(subscriber)
def cancelled[T]: AkkaSink[T, NotUsed] = AkkaSink.cancelled
def head[T]: AkkaSink[T, Future[T]] = AkkaSink.head
def headOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.headOption
def last[T]: AkkaSink[T, Future[T]] = AkkaSink.last[T]
def lastOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.lastOption[T]
def seq[T]: AkkaSink[T, Future[Seq[T]]] = AkkaSink.seq[T]
def asPublisher[T](fanout: Boolean): AkkaSink[T, Publisher[T]] = AkkaSink.asPublisher[T](fanout)
def ignore: AkkaSink[Any, Future[Done]] = AkkaSink.ignore
def foreach[T](f: T => Unit): AkkaSink[T, Future[Done]] = AkkaSink.foreach[T](f)
def combine[T, U](
first: AkkaSink[U, _],
second: AkkaSink[U, _],
rest: AkkaSink[U, _]*)(strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): AkkaSink[T, NotUsed] =
AkkaSink.combine[T, U](first, second, rest: _*)(strategy)
def foreachParallel[T](parallelism: Int)(f: T ? Unit)(implicit ec: ExecutionContext): AkkaSink[T, Future[Done]] =
AkkaSink.foreachParallel[T](parallelism)(f)
def fold[U, T](zero: U)(f: (U, T) ? U): AkkaSink[T, Future[U]] = AkkaSink.fold[U, T](zero)(f)
def reduce[T](f: (T, T) ? T): AkkaSink[T, Future[T]] = AkkaSink.reduce(f)
def onComplete[T](callback: Try[Done] => Unit): AkkaSink[T, NotUsed] = AkkaSink.onComplete(callback)
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): AkkaSink[T, NotUsed] =
AkkaSink.actorRef(ref, onCompleteMessage)
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) ? Any = Status.Failure): AkkaSink[T, NotUsed] =
AkkaSink.actorRefWithAck(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
def actorSubscriber[T](props: Props): AkkaSink[T, ActorRef] = AkkaSink.actorSubscriber(props)
def queue[T](): AkkaSink[T, SinkQueueWithCancel[T]] = AkkaSink.queue[T]()
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:62,代码来源:Sink.scala
示例2: SwaveIdentityProcessorVerification
//设置package包名称以及导入依赖的类
package swave.core.tck
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{IdentityProcessorVerification, TestEnvironment}
import org.scalatest.testng.TestNGSuiteLike
import org.testng.SkipException
import org.testng.annotations.AfterClass
import swave.core._
abstract class SwaveIdentityProcessorVerification[T](val testEnv: TestEnvironment, publisherShutdownTimeout: Long)
extends IdentityProcessorVerification[T](testEnv, publisherShutdownTimeout) with TestNGSuiteLike
with StreamEnvShutdown {
def this(printlnDebug: Boolean) =
this(
new TestEnvironment(Timeouts.defaultTimeout.toMillis, printlnDebug),
Timeouts.publisherShutdownTimeout.toMillis)
def this() = this(false)
override def createFailedPublisher(): Publisher[T] =
Spout.failing[T](new Exception("Nope")).drainTo(Drain.toPublisher()).get
// Publishers created by swave don't support fanout by default
override def maxSupportedSubscribers: Long = 1L
override def required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(): Unit =
throw new SkipException("Not relevant for publisher w/o fanout support")
override lazy val publisherExecutorService: ExecutorService =
Executors.newFixedThreadPool(3)
@AfterClass
def shutdownPublisherExecutorService(): Unit = {
publisherExecutorService.shutdown()
publisherExecutorService.awaitTermination(3, TimeUnit.SECONDS)
}
}
开发者ID:sirthias,项目名称:swave,代码行数:40,代码来源:SwaveIdentityProcessorVerification.scala
示例3: SwavePublisherVerification
//设置package包名称以及导入依赖的类
package swave.core.tck
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{PublisherVerification, TestEnvironment}
import org.scalatest.testng.TestNGSuiteLike
import org.testng.SkipException
import swave.core._
abstract class SwavePublisherVerification[T](val testEnv: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[T](testEnv, publisherShutdownTimeout) with TestNGSuiteLike with StreamEnvShutdown {
def this(printlnDebug: Boolean) =
this(
new TestEnvironment(Timeouts.defaultTimeout.toMillis, printlnDebug),
Timeouts.publisherShutdownTimeout.toMillis)
def this() = this(false)
override def createFailedPublisher(): Publisher[T] =
Spout.failing[T](new Exception("Nope")).drainTo(Drain.toPublisher()).get
override def required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(): Unit =
throw new SkipException("Not relevant for publisher w/o fanout support")
}
开发者ID:sirthias,项目名称:swave,代码行数:25,代码来源:SwavePublisherVerification.scala
示例4: EventDao
//设置package包名称以及导入依赖的类
package dao
import akka.NotUsed
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import org.reactivestreams.Publisher
import play.api.libs.json.{Format, JsSuccess, Json}
import scala.concurrent.Future
abstract class EventDao[Event](
implicit val mat: Materializer,
format: Format[Event]) {
def store(str: Event): Future[QueueOfferResult] =
in.offer(str)
val (
in: SourceQueueWithComplete[Event],
out: Publisher[Event]
) =
Source
.queue[Event](3, OverflowStrategy.backpressure)
.map(Json.toJson(_))
.map(Json.stringify)
.via(eventStore)
.map(Json.parse)
.map(Json.fromJson[Event])
.collect { case JsSuccess(event, _) => event }
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
.run()
protected def eventStore: Flow[String, String, NotUsed]
}
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:35,代码来源:EventDao.scala
示例5:
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ProducerMessage, ConsumerProperties, ProducerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "lowercaseStrings",
groupId = "groupName",
valueDeserializer = new StringDeserializer()
))
val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = "uppercaseStrings",
valueSerializer = new StringSerializer()
))
Source.fromPublisher(publisher).map(m => ProducerMessage(m.value().toUpperCase))
.to(Sink.fromSubscriber(subscriber)).run()
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:28,代码来源:UseCase.scala
示例6: App
//设置package包名称以及导入依赖的类
package kschool.pfm.mail.dvl
import akka.actor.{Props, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import kschool.pfm.mail.dvl.mail._
import kschool.pfm.mail.dvl.model._
import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.Future
object App {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem( "ReactiveKafka" )
implicit val materializer = ActorMaterializer( )
val kafka = new ReactiveKafka( )
val publisher: Publisher[StringConsumerRecord] = kafka.consume( ConsumerProperties(
bootstrapServers = "kafka:9092",//kafka
topic = "alert_mail",
groupId = "sendMailKschool",
valueDeserializer = new StringDeserializer( )
) )
val senderActor = actorSystem.actorOf( Props[SenderActor] )
val actorSenderMailSubscrtiber: Subscriber[(Future[Option[Contact]], String, String, String)] =
ActorSubscriber[(Future[Option[Contact]], String, String, String)](senderActor)
Source.fromPublisher( publisher )
.map( retrieveContactFromMongo )
.to( Sink.fromSubscriber( actorSenderMailSubscrtiber ) ).run( )
}
def retrieveContactFromMongo(record: StringConsumerRecord): (Future[Option[Contact]], String, String, String) = {
val splittedRecord = record.value.split(",")
val user = splittedRecord(0)
val typeAlarm = splittedRecord(1)
val latitude = splittedRecord(2)
val longitude = splittedRecord(3)
(db.KschoolDAO.findContact( user ), typeAlarm, latitude, longitude)
}
}
开发者ID:DVentas,项目名称:PFM-Kschool-Flink-Docker,代码行数:54,代码来源:App.scala
示例7: PersistenceSources
//设置package包名称以及导入依赖的类
package akkaviz.persistence
import akka.stream.scaladsl.Source
import io.getquill._
import io.getquill.naming.SnakeCase
import org.reactivestreams.Publisher
import monifu.concurrent.Implicits.globalScheduler
import scala.util.control.NonFatal
object PersistenceSources {
private[this] lazy val db = source(new CassandraStreamSourceConfig[SnakeCase]("akkaviz.cassandra"))
def of(ref: String): Source[ReceivedRecord, _] = {
try {
Source.fromPublisher(db.run(Queries.getAllFor)(ref))
} catch {
case NonFatal(e) =>
Source.failed(e)
}
}
def between(ref: String, ref2: String): Source[ReceivedRecord, _] = {
try {
Source.fromPublisher(db.run(Queries.getBetween)(ref, ref2, To))
} catch {
case NonFatal(e) =>
Source.failed(e)
}
}
}
开发者ID:blstream,项目名称:akka-viz,代码行数:34,代码来源:PersistenceSources.scala
示例8: FutureAsyncHttpClientHandler
//设置package包名称以及导入依赖的类
package com.softwaremill.sttp.asynchttpclient.future
import java.nio.ByteBuffer
import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
import com.softwaremill.sttp.{FutureMonad, SttpHandler}
import org.asynchttpclient.{
AsyncHttpClient,
AsyncHttpClientConfig,
DefaultAsyncHttpClient
}
import org.reactivestreams.Publisher
import scala.concurrent.{ExecutionContext, Future}
class FutureAsyncHttpClientHandler private (
asyncHttpClient: AsyncHttpClient,
closeClient: Boolean)(implicit ec: ExecutionContext)
extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient,
new FutureMonad,
closeClient) {
override protected def streamBodyToPublisher(
s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
override protected def publisherToStreamBody(
p: Publisher[ByteBuffer]): Nothing =
throw new IllegalStateException("This handler does not support streaming")
}
object FutureAsyncHttpClientHandler {
def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext =
ExecutionContext.Implicits.global)
: SttpHandler[Future, Nothing] =
new FutureAsyncHttpClientHandler(client, closeClient = false)
}
开发者ID:softwaremill,项目名称:sttp,代码行数:39,代码来源:FutureAsyncHttpClientHandler.scala
示例9: ScalazAsyncHttpClientHandler
//设置package包名称以及导入依赖的类
package com.softwaremill.sttp.asynchttpclient.scalaz
import java.nio.ByteBuffer
import com.softwaremill.sttp.{MonadAsyncError, SttpHandler}
import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
import org.asynchttpclient.{
AsyncHttpClient,
AsyncHttpClientConfig,
DefaultAsyncHttpClient
}
import org.reactivestreams.Publisher
import scalaz.{-\/, \/-}
import scalaz.concurrent.Task
class ScalazAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient,
closeClient: Boolean)
extends AsyncHttpClientHandler[Task, Nothing](asyncHttpClient,
TaskMonad,
closeClient) {
override protected def streamBodyToPublisher(
s: Nothing): Publisher[ByteBuffer] = s // nothing is everything
override protected def publisherToStreamBody(
p: Publisher[ByteBuffer]): Nothing =
throw new IllegalStateException("This handler does not support streaming")
}
object ScalazAsyncHttpClientHandler {
def apply(): SttpHandler[Task, Nothing] =
new ScalazAsyncHttpClientHandler(new DefaultAsyncHttpClient(),
closeClient = true)
def usingConfig(cfg: AsyncHttpClientConfig): SttpHandler[Task, Nothing] =
new ScalazAsyncHttpClientHandler(new DefaultAsyncHttpClient(cfg),
closeClient = true)
def usingClient(client: AsyncHttpClient): SttpHandler[Task, Nothing] =
new ScalazAsyncHttpClientHandler(client, closeClient = false)
}
private[scalaz] object TaskMonad extends MonadAsyncError[Task] {
override def unit[T](t: T): Task[T] = Task.point(t)
override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)
override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
fa.flatMap(f)
override def async[T](
register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
Task.async { cb =>
register {
case Left(t) => cb(-\/(t))
case Right(t) => cb(\/-(t))
}
}
override def error[T](t: Throwable): Task[T] = Task.fail(t)
}
开发者ID:softwaremill,项目名称:sttp,代码行数:61,代码来源:ScalazAsyncHttpClientHandler.scala
示例10: Merge
//设置package包名称以及导入依赖的类
package alexsmirnov.stream
import org.reactivestreams.Publisher
class Merge[A] extends PublisherBase[A] { self =>
private[this] var branches: List[BranchSubscriber] = Nil
class BranchSubscriber extends SubscriberBase[A] {
def onNext(a: A) { self.sendNext(a); request(1L) }
def onComplete() { self.sendComplete() }
def onError(t: Throwable) { self.sendError(t) }
}
def addPublisher(p: Publisher[A]) {
val sub = new BranchSubscriber
p.subscribe(sub)
branches = sub +: branches
}
override def onStart() = branches.foreach { _.request(1) }
override def onStop() = branches.foreach { _.cancel() }
}
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:21,代码来源:Merge.scala
注:本文中的org.reactivestreams.Publisher类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论