本文整理汇总了Scala中org.reactivestreams.Subscription类的典型用法代码示例。如果您正苦于以下问题:Scala Subscription类的具体用法?Scala Subscription怎么用?Scala Subscription使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Subscription类的2个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: SubscriberWhiteboxVerificationTest
//设置package包名称以及导入依赖的类
package ru.reajames
package reactivespecification
import org.scalatest.testng.TestNGSuiteLike
import java.util.concurrent.atomic.AtomicInteger
import concurrent.ExecutionContext.Implicits.global
import org.reactivestreams.{Subscriber, Subscription}
import org.reactivestreams.tck.{SubscriberWhiteboxVerification, TestEnvironment}
import org.reactivestreams.tck.SubscriberWhiteboxVerification.{SubscriberPuppet, WhiteboxSubscriberProbe}
class SubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification[String](new TestEnvironment(300)) with TestNGSuiteLike
with ActimeMQConnectionFactoryAware {
val connectionHolder = new ConnectionHolder(connectionFactory)
val counter = new AtomicInteger()
private def queue = Queue("subscriber-bb-verification-" + counter.incrementAndGet)
def createSubscriber(probe: WhiteboxSubscriberProbe[String]): Subscriber[String] = {
new JmsSender[String](connectionHolder, queue, _ createTextMessage _) {
override def onSubscribe(subscription: Subscription): Unit = {
super.onSubscribe(subscription)
probe.registerOnSubscribe(new SubscriberPuppet {
def triggerRequest(elements: Long) = subscription.request(elements)
def signalCancel() = subscription.cancel()
})
}
override def onNext(element: String): Unit = {
super.onNext(element)
probe.registerOnNext(element)
}
override def onComplete(): Unit = {
super.onComplete()
probe.registerOnComplete()
}
override def onError(th: Throwable): Unit = {
super.onError(th)
probe.registerOnError(th)
}
}
}
def createElement(element: Int): String = "message " + element
}
开发者ID:dobrynya,项目名称:reajames,代码行数:49,代码来源:SubscriberWhiteboxVerificationTest.scala
示例2: onSubscribe
//设置package包名称以及导入依赖的类
package alexsmirnov.stream
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import scala.concurrent.Future
trait SubscriberBase[A] extends Subscriber[A] {
private[this] var subscription: Subscription = null
private[this] val singleExecutor = Executors.newSingleThreadExecutor(streamsThreadFactory)
private implicit val execContext = ExecutionContext.fromExecutorService(singleExecutor)
def onSubscribe(s: Subscription) = {
require(subscription == null, "Subscriber already has subscription")
subscription = s
}
def request(n: Long) {
require(subscription != null, "Subscriber has no subscription")
Future(subscription.request(n))
}
def cancel() {
require(subscription != null, "Subscriber has no subscription")
Future(subscription.cancel())
}
}
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:28,代码来源:SubscriberBase.scala
注:本文中的org.reactivestreams.Subscription类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论