• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Subscriber类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.reactivestreams.Subscriber的典型用法代码示例。如果您正苦于以下问题:Scala Subscriber类的具体用法?Scala Subscriber怎么用?Scala Subscriber使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Subscriber类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Sink

//设置package包名称以及导入依赖的类
package mesosphere.marathon.stream

import akka.actor.{ ActorRef, Props, Status }
import akka.{ Done, NotUsed }
import akka.stream.{ Graph, SinkShape, UniformFanOutShape }
import akka.stream.scaladsl.{ SinkQueueWithCancel, Sink => AkkaSink }
import org.reactivestreams.{ Publisher, Subscriber }

import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try


object Sink {
  def set[T]: AkkaSink[T, Future[immutable.Set[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, immutable.Set[T]](immutable.Set.newBuilder[T]))
  }

  def sortedSet[T](implicit ordering: Ordering[T]): AkkaSink[T, Future[immutable.SortedSet[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, immutable.SortedSet[T]](immutable.SortedSet.newBuilder[T]))
  }

  def map[K, V]: AkkaSink[(K, V), Future[immutable.Map[K, V]]] = {
    AkkaSink.fromGraph(new CollectionStage[(K, V), immutable.Map[K, V]](immutable.Map.newBuilder[K, V]))
  }

  def list[T]: AkkaSink[T, Future[List[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, List[T]](List.newBuilder[T]))
  }

  // Akka's API
  def fromGraph[T, M](g: Graph[SinkShape[T], M]): AkkaSink[T, M] = AkkaSink.fromGraph(g)
  def fromSubscriber[T](subscriber: Subscriber[T]): AkkaSink[T, NotUsed] = AkkaSink.fromSubscriber(subscriber)
  def cancelled[T]: AkkaSink[T, NotUsed] = AkkaSink.cancelled
  def head[T]: AkkaSink[T, Future[T]] = AkkaSink.head
  def headOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.headOption
  def last[T]: AkkaSink[T, Future[T]] = AkkaSink.last[T]
  def lastOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.lastOption[T]
  def seq[T]: AkkaSink[T, Future[Seq[T]]] = AkkaSink.seq[T]
  def asPublisher[T](fanout: Boolean): AkkaSink[T, Publisher[T]] = AkkaSink.asPublisher[T](fanout)
  def ignore: AkkaSink[Any, Future[Done]] = AkkaSink.ignore
  def foreach[T](f: T => Unit): AkkaSink[T, Future[Done]] = AkkaSink.foreach[T](f)
  def combine[T, U](
    first: AkkaSink[U, _],
    second: AkkaSink[U, _],
    rest: AkkaSink[U, _]*)(strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): AkkaSink[T, NotUsed] =
    AkkaSink.combine[T, U](first, second, rest: _*)(strategy)
  def foreachParallel[T](parallelism: Int)(f: T ? Unit)(implicit ec: ExecutionContext): AkkaSink[T, Future[Done]] =
    AkkaSink.foreachParallel[T](parallelism)(f)
  def fold[U, T](zero: U)(f: (U, T) ? U): AkkaSink[T, Future[U]] = AkkaSink.fold[U, T](zero)(f)
  def reduce[T](f: (T, T) ? T): AkkaSink[T, Future[T]] = AkkaSink.reduce(f)
  def onComplete[T](callback: Try[Done] => Unit): AkkaSink[T, NotUsed] = AkkaSink.onComplete(callback)
  def actorRef[T](ref: ActorRef, onCompleteMessage: Any): AkkaSink[T, NotUsed] =
    AkkaSink.actorRef(ref, onCompleteMessage)
  def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
    onFailureMessage: (Throwable) ? Any = Status.Failure): AkkaSink[T, NotUsed] =
    AkkaSink.actorRefWithAck(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
  def actorSubscriber[T](props: Props): AkkaSink[T, ActorRef] = AkkaSink.actorSubscriber(props)
  def queue[T](): AkkaSink[T, SinkQueueWithCancel[T]] = AkkaSink.queue[T]()
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:62,代码来源:Sink.scala


示例2: SubscriberDrainStage

//设置package包名称以及导入依赖的类
package swave.core.impl.stages.drain

import org.reactivestreams.Subscriber
import swave.core.Stage
import swave.core.impl.Inport
import swave.core.impl.rs.ForwardToRunnerSubscription
import swave.core.impl.stages.DrainStage
import swave.core.macros.StageImplementation

// format: OFF
@StageImplementation
private[core] final class SubscriberDrainStage(subscriber: Subscriber[AnyRef]) extends DrainStage {

  def kind = Stage.Kind.Drain.FromSubscriber(subscriber)

  connectInAndSealWith { in ?
    region.impl.requestDispatcherAssignment()
    region.impl.registerForXStart(this)
    awaitingXStart(in)
  }

  def awaitingXStart(in: Inport): State = state(
    xStart = () => {
      subscriber.onSubscribe(new ForwardToRunnerSubscription(this))
      running(in)
    })

  def running(in: Inport): State = state(
    intercept = false,

    request = requestF(in),
    cancel = stopCancelF(in),

    onNext = (elem, _) => {
      subscriber.onNext(elem)
      stay()
    },

    onComplete = _ => {
      subscriber.onComplete()
      stop()
    },

    onError = (e, _) => {
      subscriber.onError(e)
      stop(e)
    },

    xEvent = { case ForwardToRunnerSubscription.IllegalRequest(n) =>
        subscriber.onError(new IllegalArgumentException(
          "The number of elements requested must be > 0 (see reactive-streams spec, rule 3.9)"))
        stopCancel(in)
    })
} 
开发者ID:sirthias,项目名称:swave,代码行数:55,代码来源:SubscriberDrainStage.scala


示例3: SubscriberWhiteboxVerificationTest

//设置package包名称以及导入依赖的类
package ru.reajames
package reactivespecification

import org.scalatest.testng.TestNGSuiteLike
import java.util.concurrent.atomic.AtomicInteger
import concurrent.ExecutionContext.Implicits.global
import org.reactivestreams.{Subscriber, Subscription}
import org.reactivestreams.tck.{SubscriberWhiteboxVerification, TestEnvironment}
import org.reactivestreams.tck.SubscriberWhiteboxVerification.{SubscriberPuppet, WhiteboxSubscriberProbe}


class SubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification[String](new TestEnvironment(300)) with TestNGSuiteLike
  with ActimeMQConnectionFactoryAware {

  val connectionHolder = new ConnectionHolder(connectionFactory)
  val counter = new AtomicInteger()

  private def queue = Queue("subscriber-bb-verification-" + counter.incrementAndGet)

  def createSubscriber(probe: WhiteboxSubscriberProbe[String]): Subscriber[String] = {
    new JmsSender[String](connectionHolder, queue, _ createTextMessage _) {
      override def onSubscribe(subscription: Subscription): Unit = {
        super.onSubscribe(subscription)
        probe.registerOnSubscribe(new SubscriberPuppet {
          def triggerRequest(elements: Long) = subscription.request(elements)
          def signalCancel() = subscription.cancel()
        })
      }

      override def onNext(element: String): Unit = {
        super.onNext(element)
        probe.registerOnNext(element)
      }

      override def onComplete(): Unit = {
        super.onComplete()
        probe.registerOnComplete()
      }

      override def onError(th: Throwable): Unit = {
        super.onError(th)
        probe.registerOnError(th)
      }
    }
  }

  def createElement(element: Int): String = "message " + element
} 
开发者ID:dobrynya,项目名称:reajames,代码行数:49,代码来源:SubscriberWhiteboxVerificationTest.scala


示例4:

//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import com.softwaremill.react.kafka.{ProducerMessage, ConsumerProperties, ProducerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val publisher: Publisher[StringConsumerRecord] = kafka.consume(ConsumerProperties(
 bootstrapServers = "localhost:9092",
 topic = "lowercaseStrings",
 groupId = "groupName",
 valueDeserializer = new StringDeserializer()
))

val subscriber: Subscriber[StringProducerMessage] = kafka.publish(ProducerProperties(
  bootstrapServers = "localhost:9092",
  topic = "uppercaseStrings",
  valueSerializer = new StringSerializer()
))

Source.fromPublisher(publisher).map(m => ProducerMessage(m.value().toUpperCase))
  .to(Sink.fromSubscriber(subscriber)).run() 
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:28,代码来源:UseCase.scala


示例5: App

//设置package包名称以及导入依赖的类
package kschool.pfm.mail.dvl

import akka.actor.{Props, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.KafkaMessages._
import kschool.pfm.mail.dvl.mail._
import kschool.pfm.mail.dvl.model._
import org.apache.kafka.common.serialization.StringDeserializer
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.Future

object App {

  def main(args: Array[String]): Unit = {

    implicit val actorSystem = ActorSystem( "ReactiveKafka" )
    implicit val materializer = ActorMaterializer( )

    val kafka = new ReactiveKafka( )

    val publisher: Publisher[StringConsumerRecord] = kafka.consume( ConsumerProperties(
      bootstrapServers = "kafka:9092",//kafka
      topic = "alert_mail",
      groupId = "sendMailKschool",
      valueDeserializer = new StringDeserializer( )
    ) )

    val senderActor = actorSystem.actorOf( Props[SenderActor] )
    val actorSenderMailSubscrtiber: Subscriber[(Future[Option[Contact]], String, String, String)] =
            ActorSubscriber[(Future[Option[Contact]], String, String, String)](senderActor)

    Source.fromPublisher( publisher )
      .map( retrieveContactFromMongo )
      .to( Sink.fromSubscriber( actorSenderMailSubscrtiber ) ).run( )

  }

  def retrieveContactFromMongo(record: StringConsumerRecord): (Future[Option[Contact]], String, String, String) = {

    val splittedRecord = record.value.split(",")
    val user = splittedRecord(0)
    val typeAlarm = splittedRecord(1)
    val latitude = splittedRecord(2)
    val longitude = splittedRecord(3)

    (db.KschoolDAO.findContact( user ), typeAlarm, latitude, longitude)

  }

} 
开发者ID:DVentas,项目名称:PFM-Kschool-Flink-Docker,代码行数:54,代码来源:App.scala


示例6: Fork

//设置package包名称以及导入依赖的类
package alexsmirnov.stream

import org.reactivestreams.Processor
import org.reactivestreams.Subscriber

  class Fork[A] extends Processor[A, A] with SubscriberBase[A] { self =>

    var branches: List[BranchPublisher] = Nil
    class BranchPublisher extends PublisherBase[A] {
      def onStart() { self.request(1L) }
      def onStop() { self.cancel() }
    }

    def subscribe(sub: Subscriber[_ >: A]) {
      val pub = new BranchPublisher()
      pub.subscribe(sub)
      branches = pub +: branches
    }
    def onNext(a: A) = { branches.foreach(_.sendNext(a)); request(1L) }
    def onComplete() = branches.foreach(_.sendComplete())
    def onError(t: Throwable) = branches.foreach(_.sendError(t))
  } 
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:23,代码来源:Fork.scala


示例7: onSubscribe

//设置package包名称以及导入依赖的类
package alexsmirnov.stream

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import scala.concurrent.Future

  trait SubscriberBase[A] extends Subscriber[A] {
    private[this] var subscription: Subscription = null
    private[this] val singleExecutor = Executors.newSingleThreadExecutor(streamsThreadFactory)
    private implicit val execContext = ExecutionContext.fromExecutorService(singleExecutor)
    def onSubscribe(s: Subscription) = {
      require(subscription == null, "Subscriber already has subscription")
      subscription = s
    }
    def request(n: Long) {
      require(subscription != null, "Subscriber has no subscription")
      Future(subscription.request(n))
    }
    def cancel() {
      require(subscription != null, "Subscriber has no subscription")
      Future(subscription.cancel())
    }

    
  } 
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:28,代码来源:SubscriberBase.scala



注:本文中的org.reactivestreams.Subscriber类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala StanfordCoreNLP类代码示例发布时间:2022-05-23
下一篇:
Scala LinearLayout类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap