• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Publisher类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.reactivestreams.Publisher的典型用法代码示例。如果您正苦于以下问题:Scala Publisher类的具体用法?Scala Publisher怎么用?Scala Publisher使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Publisher类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Sink

//设置package包名称以及导入依赖的类
package mesosphere.marathon.stream

import akka.actor.{ ActorRef, Props, Status }
import akka.{ Done, NotUsed }
import akka.stream.{ Graph, SinkShape, UniformFanOutShape }
import akka.stream.scaladsl.{ SinkQueueWithCancel, Sink => AkkaSink }
import org.reactivestreams.{ Publisher, Subscriber }

import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try


object Sink {
  def set[T]: AkkaSink[T, Future[immutable.Set[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, immutable.Set[T]](immutable.Set.newBuilder[T]))
  }

  def sortedSet[T](implicit ordering: Ordering[T]): AkkaSink[T, Future[immutable.SortedSet[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, immutable.SortedSet[T]](immutable.SortedSet.newBuilder[T]))
  }

  def map[K, V]: AkkaSink[(K, V), Future[immutable.Map[K, V]]] = {
    AkkaSink.fromGraph(new CollectionStage[(K, V), immutable.Map[K, V]](immutable.Map.newBuilder[K, V]))
  }

  def list[T]: AkkaSink[T, Future[List[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, List[T]](List.newBuilder[T]))
  }

  // Akka's API
  def fromGraph[T, M](g: Graph[SinkShape[T], M]): AkkaSink[T, M] = AkkaSink.fromGraph(g)
  def fromSubscriber[T](subscriber: Subscriber[T]): AkkaSink[T, NotUsed] = AkkaSink.fromSubscriber(subscriber)
  def cancelled[T]: AkkaSink[T, NotUsed] = AkkaSink.cancelled
  def head[T]: AkkaSink[T, Future[T]] = AkkaSink.head
  def headOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.headOption
  def last[T]: AkkaSink[T, Future[T]] = AkkaSink.last[T]
  def lastOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.lastOption[T]
  def seq[T]: AkkaSink[T, Future[Seq[T]]] = AkkaSink.seq[T]
  def asPublisher[T](fanout: Boolean): AkkaSink[T, Publisher[T]] = AkkaSink.asPublisher[T](fanout)
  def ignore: AkkaSink[Any, Future[Done]] = AkkaSink.ignore
  def foreach[T](f: T => Unit): AkkaSink[T, Future[Done]] = AkkaSink.foreach[T](f)
  def combine[T, U](
    first: AkkaSink[U, _],
    second: AkkaSink[U, _],
    rest: AkkaSink[U, _]*)(strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): AkkaSink[T, NotUsed] =
    AkkaSink.combine[T, U](first, second, rest: _*)(strategy)
  def foreachParallel[T](parallelism: Int)(f: T ? Unit)(implicit ec: ExecutionContext): AkkaSink[T, Future[Done]] =
    AkkaSink.foreachParallel[T](parallelism)(f)
  def fold[U, T](zero: U)(f: (U, T) ? U): AkkaSink[T, Future[U]] = AkkaSink.fold[U, T](zero)(f)
  def reduce[T](f: (T, T) ? T): AkkaSink[T, Future[T]] = AkkaSink.reduce(f)
  def onComplete[T](callback: Try[Done] => Unit): AkkaSink[T, NotUsed] = AkkaSink.onComplete(callback)
  def actorRef[T](ref: ActorRef, onCompleteMessage: Any): AkkaSink[T, NotUsed] =
    AkkaSink.actorRef(ref, onCompleteMessage)
  def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
    onFailureMessage: (Throwable) ? Any = Status.Failure): AkkaSink[T, NotUsed] =
    AkkaSink.actorRefWithAck(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
  def actorSubscriber[T](props: Props): AkkaSink[T, ActorRef] = AkkaSink.actorSubscriber(props)
  def queue[T](): AkkaSink[T, SinkQueueWithCancel[T]] = AkkaSink.queue[T]()
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:62,代码来源:Sink.scala


示例2: SwaveIdentityProcessorVerification

//设置package包名称以及导入依赖的类
package swave.core.tck

import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{IdentityProcessorVerification, TestEnvironment}
import org.scalatest.testng.TestNGSuiteLike
import org.testng.SkipException
import org.testng.annotations.AfterClass
import swave.core._

abstract class SwaveIdentityProcessorVerification[T](val testEnv: TestEnvironment, publisherShutdownTimeout: Long)
    extends IdentityProcessorVerification[T](testEnv, publisherShutdownTimeout) with TestNGSuiteLike
    with StreamEnvShutdown {

  def this(printlnDebug: Boolean) =
    this(
      new TestEnvironment(Timeouts.defaultTimeout.toMillis, printlnDebug),
      Timeouts.publisherShutdownTimeout.toMillis)

  def this() = this(false)

  override def createFailedPublisher(): Publisher[T] =
    Spout.failing[T](new Exception("Nope")).drainTo(Drain.toPublisher()).get

  // Publishers created by swave don't support fanout by default
  override def maxSupportedSubscribers: Long = 1L

  override def required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(): Unit =
    throw new SkipException("Not relevant for publisher w/o fanout support")

  override lazy val publisherExecutorService: ExecutorService =
    Executors.newFixedThreadPool(3)

  @AfterClass
  def shutdownPublisherExecutorService(): Unit = {
    publisherExecutorService.shutdown()
    publisherExecutorService.awaitTermination(3, TimeUnit.SECONDS)
  }
} 
开发者ID:sirthias,项目名称:swave,代码行数:40,代码来源:SwaveIdentityProcessorVerification.scala


示例3: SwavePublisherVerification

//设置package包名称以及导入依赖的类
package swave.core.tck

import org.reactivestreams.Publisher
import org.reactivestreams.tck.{PublisherVerification, TestEnvironment}
import org.scalatest.testng.TestNGSuiteLike
import org.testng.SkipException
import swave.core._

abstract class SwavePublisherVerification[T](val testEnv: TestEnvironment, publisherShutdownTimeout: Long)
    extends PublisherVerification[T](testEnv, publisherShutdownTimeout) with TestNGSuiteLike with StreamEnvShutdown {

  def this(printlnDebug: Boolean) =
    this(
      new TestEnvironment(Timeouts.defaultTimeout.toMillis, printlnDebug),
      Timeouts.publisherShutdownTimeout.toMillis)

  def this() = this(false)

  override def createFailedPublisher(): Publisher[T] =
    Spout.failing[T](new Exception("Nope")).drainTo(Drain.toPublisher()).get

  override def required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(): Unit =
    throw new SkipException("Not relevant for publisher w/o fanout support")
} 
开发者ID:sirthias,项目名称:swave,代码行数:25,代码来源:SwavePublisherVerification.scala


示例4: EventDao

//设置package包名称以及导入依赖的类
package dao

import akka.NotUsed
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import org.reactivestreams.Publisher
import play.api.libs.json.{Format, JsSuccess, Json}

import scala.concurrent.Future

abstract class EventDao[Event](
    implicit val mat: Materializer,
    format: Format[Event]) {

  def store(str: Event): Future[QueueOfferResult] =
    in.offer(str)

  val (
    in: SourceQueueWithComplete[Event],
    out: Publisher[Event]
  ) =
    Source
      .queue[Event](3, OverflowStrategy.backpressure)
      .map(Json.toJson(_))
      .map(Json.stringify)
      .via(eventStore)
      .map(Json.parse)
      .map(Json.fromJson[Event])
      .collect { case JsSuccess(event, _) => event }
      .toMat(Sink.asPublisher(fanout = true))(Keep.both)
      .run()

  protected def eventStore: Flow[String, String, NotUsed]
} 
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:35,代码来源:EventDao.scala


示例5:

//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ProducerMessage, ConsumerProperties, ProducerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
 bootstrapServers = "localhost:9092",
 topic = "lowercaseStrings",
 groupId = "groupName",
 valueDeserializer = new StringDeserializer()
))

val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
  bootstrapServers = "localhost:9092",
  topic = "uppercaseStrings",
  valueSerializer = new StringSerializer()
))

Source.fromPublisher(publisher).map(m => ProducerMessage(m.value().toUpperCase))
  .to(Sink.fromSubscriber(subscriber)).run() 
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:28,代码来源:UseCase.scala


示例6: App

//设置package包名称以及导入依赖的类
package kschool.pfm.mail.dvl

import akka.actor.{Props, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import kschool.pfm.mail.dvl.mail._
import kschool.pfm.mail.dvl.model._
import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.Future

object App {

  def main(args: Array[String]): Unit = {

    implicit val actorSystem = ActorSystem( "ReactiveKafka" )
    implicit val materializer = ActorMaterializer( )

    val kafka = new ReactiveKafka( )

    val publisher: Publisher[StringConsumerRecord] = kafka.consume( ConsumerProperties(
      bootstrapServers = "kafka:9092",//kafka
      topic = "alert_mail",
      groupId = "sendMailKschool",
      valueDeserializer = new StringDeserializer( )
    ) )

    val senderActor = actorSystem.actorOf( Props[SenderActor] )
    val actorSenderMailSubscrtiber: Subscriber[(Future[Option[Contact]], String, String, String)] =
            ActorSubscriber[(Future[Option[Contact]], String, String, String)](senderActor)

    Source.fromPublisher( publisher )
      .map( retrieveContactFromMongo )
      .to( Sink.fromSubscriber( actorSenderMailSubscrtiber ) ).run( )

  }

  def retrieveContactFromMongo(record: StringConsumerRecord): (Future[Option[Contact]], String, String, String) = {

    val splittedRecord = record.value.split(",")
    val user = splittedRecord(0)
    val typeAlarm = splittedRecord(1)
    val latitude = splittedRecord(2)
    val longitude = splittedRecord(3)

    (db.KschoolDAO.findContact( user ), typeAlarm, latitude, longitude)

  }

} 
开发者ID:DVentas,项目名称:PFM-Kschool-Flink-Docker,代码行数:54,代码来源:App.scala


示例7: PersistenceSources

//设置package包名称以及导入依赖的类
package akkaviz.persistence

import akka.stream.scaladsl.Source
import io.getquill._
import io.getquill.naming.SnakeCase
import org.reactivestreams.Publisher
import monifu.concurrent.Implicits.globalScheduler

import scala.util.control.NonFatal

object PersistenceSources {

  private[this] lazy val db = source(new CassandraStreamSourceConfig[SnakeCase]("akkaviz.cassandra"))

  def of(ref: String): Source[ReceivedRecord, _] = {
    try {
      Source.fromPublisher(db.run(Queries.getAllFor)(ref))
    } catch {
      case NonFatal(e) =>
        Source.failed(e)
    }
  }

  def between(ref: String, ref2: String): Source[ReceivedRecord, _] = {
    try {
      Source.fromPublisher(db.run(Queries.getBetween)(ref, ref2, To))
    } catch {
      case NonFatal(e) =>
        Source.failed(e)
    }
  }

} 
开发者ID:blstream,项目名称:akka-viz,代码行数:34,代码来源:PersistenceSources.scala


示例8: FutureAsyncHttpClientHandler

//设置package包名称以及导入依赖的类
package com.softwaremill.sttp.asynchttpclient.future

import java.nio.ByteBuffer

import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
import com.softwaremill.sttp.{FutureMonad, SttpHandler}
import org.asynchttpclient.{
  AsyncHttpClient,
  AsyncHttpClientConfig,
  DefaultAsyncHttpClient
}
import org.reactivestreams.Publisher

import scala.concurrent.{ExecutionContext, Future}

class FutureAsyncHttpClientHandler private (
    asyncHttpClient: AsyncHttpClient,
    closeClient: Boolean)(implicit ec: ExecutionContext)
    extends AsyncHttpClientHandler[Future, Nothing](asyncHttpClient,
                                                    new FutureMonad,
                                                    closeClient) {

  override protected def streamBodyToPublisher(
      s: Nothing): Publisher[ByteBuffer] = s // nothing is everything

  override protected def publisherToStreamBody(
      p: Publisher[ByteBuffer]): Nothing =
    throw new IllegalStateException("This handler does not support streaming")
}

object FutureAsyncHttpClientHandler {

  
  def usingClient(client: AsyncHttpClient)(implicit ec: ExecutionContext =
                                             ExecutionContext.Implicits.global)
    : SttpHandler[Future, Nothing] =
    new FutureAsyncHttpClientHandler(client, closeClient = false)
} 
开发者ID:softwaremill,项目名称:sttp,代码行数:39,代码来源:FutureAsyncHttpClientHandler.scala


示例9: ScalazAsyncHttpClientHandler

//设置package包名称以及导入依赖的类
package com.softwaremill.sttp.asynchttpclient.scalaz

import java.nio.ByteBuffer

import com.softwaremill.sttp.{MonadAsyncError, SttpHandler}
import com.softwaremill.sttp.asynchttpclient.AsyncHttpClientHandler
import org.asynchttpclient.{
  AsyncHttpClient,
  AsyncHttpClientConfig,
  DefaultAsyncHttpClient
}
import org.reactivestreams.Publisher

import scalaz.{-\/, \/-}
import scalaz.concurrent.Task

class ScalazAsyncHttpClientHandler private (asyncHttpClient: AsyncHttpClient,
                                            closeClient: Boolean)
    extends AsyncHttpClientHandler[Task, Nothing](asyncHttpClient,
                                                  TaskMonad,
                                                  closeClient) {

  override protected def streamBodyToPublisher(
      s: Nothing): Publisher[ByteBuffer] = s // nothing is everything

  override protected def publisherToStreamBody(
      p: Publisher[ByteBuffer]): Nothing =
    throw new IllegalStateException("This handler does not support streaming")
}

object ScalazAsyncHttpClientHandler {
  def apply(): SttpHandler[Task, Nothing] =
    new ScalazAsyncHttpClientHandler(new DefaultAsyncHttpClient(),
                                     closeClient = true)
  def usingConfig(cfg: AsyncHttpClientConfig): SttpHandler[Task, Nothing] =
    new ScalazAsyncHttpClientHandler(new DefaultAsyncHttpClient(cfg),
                                     closeClient = true)
  def usingClient(client: AsyncHttpClient): SttpHandler[Task, Nothing] =
    new ScalazAsyncHttpClientHandler(client, closeClient = false)
}

private[scalaz] object TaskMonad extends MonadAsyncError[Task] {
  override def unit[T](t: T): Task[T] = Task.point(t)

  override def map[T, T2](fa: Task[T], f: (T) => T2): Task[T2] = fa.map(f)

  override def flatMap[T, T2](fa: Task[T], f: (T) => Task[T2]): Task[T2] =
    fa.flatMap(f)

  override def async[T](
      register: ((Either[Throwable, T]) => Unit) => Unit): Task[T] =
    Task.async { cb =>
      register {
        case Left(t)  => cb(-\/(t))
        case Right(t) => cb(\/-(t))
      }
    }

  override def error[T](t: Throwable): Task[T] = Task.fail(t)
} 
开发者ID:softwaremill,项目名称:sttp,代码行数:61,代码来源:ScalazAsyncHttpClientHandler.scala


示例10: Merge

//设置package包名称以及导入依赖的类
package alexsmirnov.stream

import org.reactivestreams.Publisher

class Merge[A] extends PublisherBase[A] { self =>
  private[this] var branches: List[BranchSubscriber] = Nil
  class BranchSubscriber extends SubscriberBase[A] {
    def onNext(a: A) { self.sendNext(a); request(1L) }
    def onComplete() { self.sendComplete() }
    def onError(t: Throwable) { self.sendError(t) }
  }
  
  def addPublisher(p: Publisher[A]) {
    val sub = new BranchSubscriber
    p.subscribe(sub)
    branches = sub +: branches
  }
  override def onStart() = branches.foreach { _.request(1) }
  override def onStop() = branches.foreach { _.cancel() }
} 
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:21,代码来源:Merge.scala



注:本文中的org.reactivestreams.Publisher类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ConfigHelper类代码示例发布时间:2022-05-23
下一篇:
Scala Optional类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap