本文整理汇总了Scala中rx.lang.scala.Observable类的典型用法代码示例。如果您正苦于以下问题:Scala Observable类的具体用法?Scala Observable怎么用?Scala Observable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Observable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ObservableInPin
//设置package包名称以及导入依赖的类
package framboos.async
import rx.lang.scala.Observable
import rx.lang.scala.Subject
import rx.lang.scala.subjects._
import scala.concurrent.duration._
import framboos._
object ObservableInPin {
def apply(pinNumber: Int): Observable[Boolean] = {
val inPin = ReverseInPin(pinNumber)
var lastValue = inPin.value
val subject = BehaviorSubject(lastValue)
val intervals = Observable.interval(50 milliseconds)
intervals.subscribe(next => {
val currentValue = inPin.value
if (currentValue != lastValue) {
// TODO access Akka logging?
// log.debug(s"value of in#$pinNumber changed to $currentValue")
subject.onNext(currentValue)
}
lastValue = currentValue
})
subject
}
}
开发者ID:nwswanson,项目名称:thermopi,代码行数:28,代码来源:ObservableInPin.scala
示例2: SynchronizedManualRxEmitter
//设置package包名称以及导入依赖的类
package org.nephtys.rxjavamanualemitter
import rx.lang.scala.Observable
class SynchronizedManualRxEmitter[T] {
private val innerEmitter = new ManualRxEmitter[T]
def err(e : Throwable) = this.synchronized {
innerEmitter.err(e)
}
def emit(t : T) = this.synchronized {
innerEmitter.emit(t)
}
def complete() = this.synchronized {
innerEmitter.complete()
}
def Observable : Observable[T] = innerEmitter.Observable
}
开发者ID:n3phtys,项目名称:rxjava-manualemitter,代码行数:21,代码来源:SynchronizedManualRxEmitter.scala
示例3: emit
//设置package包名称以及导入依赖的类
package org.nephtys.rxjavamanualemitter
import java.util.concurrent.atomic.AtomicBoolean
import rx.lang.scala.{Observable, Subscriber, Subscription}
import scala.collection.parallel.mutable.{ParHashMap, ParHashSet}
import rx.lang.scala
def emit(t : T) = {
if (!completed.get() && !errored.get()) {
hashmap.foreach(sub => if(!sub.isUnsubscribed) {
sub.onNext(t)
})
}
}
def complete() = {
if (!completed.get() && !errored.get()) {
hashmap.foreach(sub => if(!sub.isUnsubscribed) {
sub.onCompleted()
})
completed.set(true)
hashmap.clear()
}
}
def err(e : Throwable) = {
if (!completed.get() && !errored.get()) {
hashmap.foreach(sub => if(!sub.isUnsubscribed) {
sub.onError(e)
})
errored.set(true)
hashmap.clear()
}
}
private def subscribefunc(subscriber : Subscriber[T]) : Unit = {
hashmap.+=(subscriber)
}
val Observable : Observable[T] = scala.Observable.apply[T](subscriber => subscribefunc(subscriber))
}
开发者ID:n3phtys,项目名称:rxjava-manualemitter,代码行数:48,代码来源:ManualRxEmitter.scala
示例4: RunInterval
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.core
import scala.concurrent.duration._
import akka.actor._
import rx.lang.scala.Observable
import net.ruippeixotog.scalafbp.component.ComponentActor.OnAllOutputPortsClosed
import net.ruippeixotog.scalafbp.component._
case object RunInterval extends Component {
val name = "core/RunInterval"
val description = "Sends a signal periodically"
val icon = Some("clock-o")
val intervalPort = InPort[Long]("interval", "Interval at which signals are emitted (ms)")
val stopPort = InPort[Unit]("stop", "Stop the emission")
val inPorts = List(intervalPort, stopPort)
val outPort = OutPort[Unit]("out", "A signal sent at the given interval")
val outPorts = List(outPort)
val instanceProps = Props(new ComponentActor(this) {
override val terminationPolicy = List(OnAllOutputPortsClosed)
intervalPort.stream
.switchMap { int => Observable.interval(int.millis).map(_ => ()) }
.doOnCompleted(context.stop(self))
.pipeTo(outPort)
stopPort.stream.foreach(_ => context.stop(self))
})
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:34,代码来源:RunInterval.scala
示例5: RunTimeout
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.core
import scala.concurrent.duration._
import akka.actor._
import rx.lang.scala.Observable
import net.ruippeixotog.scalafbp.component.ComponentActor.OnAllOutputPortsClosed
import net.ruippeixotog.scalafbp.component._
case object RunTimeout extends Component {
val name = "core/RunTimeout"
val description = "Sends a signal after the given time"
val icon = Some("clock-o")
val timePort = InPort[Long]("time", "Time after which a signal will be sent (ms)")
val inPorts = List(timePort)
val outPort = OutPort[Unit]("out", "A signal sent after the given time")
val outPorts = List(outPort)
val instanceProps = Props(new ComponentActor(this) {
override val terminationPolicy = List(OnAllOutputPortsClosed)
timePort.stream.take(1)
.flatMap { t => Observable.timer(t.millis).map(_ => ()) }
.doOnCompleted(context.stop(self))
.pipeTo(outPort)
})
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:31,代码来源:RunTimeout.scala
示例6: RepeatDelayed
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.core
import scala.concurrent.duration._
import akka.actor.Props
import rx.lang.scala.Observable
import spray.json.JsValue
import net.ruippeixotog.scalafbp.component._
case object RepeatDelayed extends Component {
val name = "core/RepeatDelayed"
val description = "Forwards packets after a set delay"
val icon = Some("clock-o")
val inPort = InPort[JsValue]("in", "Packet to forward with a delay")
val delayPort = InPort[Long]("delay", "Delay length (ms)")
val inPorts = List(inPort, delayPort)
val outPort = OutPort[JsValue]("out", "Forwarded packet")
val outPorts = List(outPort)
val instanceProps = Props(new ComponentActor(this) {
val str = inPort.stream
.withLatestFrom(delayPort.stream)((_, _))
.flatMap { case (in, delay) => Observable.just(in).delay(delay.millis) }
.pipeTo(outPort)
})
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:30,代码来源:RepeatDelayed.scala
示例7: MapConcat
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.component.stream
import akka.actor.Props
import rx.lang.scala.Observable
import spray.json.{ JsArray, JsValue }
import net.ruippeixotog.scalafbp.component._
import net.ruippeixotog.scalafbp.util.NashornEngine
case object MapConcat extends Component {
val name = "stream/MapConcat"
val description = "Transforms the elements of a stream into arrays of elements and flatterns them"
val icon = Some("code")
val inPort = InPort[JsValue]("in", "The stream to transform")
val funcPort = InPort[String]("func", "The function with argument x to use for transformation. " +
"Must return an array. While not defined, all elements pass untouched.")
val inPorts = List(inPort, funcPort)
val outPort = OutPort[JsValue]("out", "The transformed stream")
val outPorts = List(outPort)
val instanceProps = Props(new ComponentActor(this) with NashornEngine {
val defaultFunc = Observable.just[JsFunction](JsArray(_))
val func = defaultFunc ++ funcPort.stream.map(JsFunction(_))
inPort.stream.withLatestFrom(func) { (x, f) => f(x) }.flatMapIterable {
case JsArray(elems) => elems
case js => throw new IllegalArgumentException(
s"The value ${js.compactPrint} returned by the function is not an array")
}.pipeTo(outPort)
})
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:34,代码来源:MapConcat.scala
示例8: request
//设置package包名称以及导入依赖的类
package com.mishiranu.instantimage.async
import com.squareup.okhttp.{OkHttpClient, Request, Response}
import rx.lang.scala.Observable
import scala.language.implicitConversions
import java.util.concurrent.TimeUnit
trait RxHttp {
private val client = new OkHttpClient()
List(client.setConnectTimeout _, client.setReadTimeout _, client.setWriteTimeout _)
.foreach(_(15000, TimeUnit.MILLISECONDS))
private val userAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:51.0) Gecko/20100101 Firefox/51.0"
def request(url: String): Request.Builder = {
new Request.Builder().url(url).header("User-Agent", userAgent)
}
class ExecutableBuilder(builder: Request.Builder) {
def execute(): Observable[Response] = Observable { e =>
try {
e.onNext(client.newCall(builder.build()).execute())
e.onCompleted()
} catch {
case t: Throwable => e.onError(t)
}
}
}
implicit def executable(request: Request.Builder): ExecutableBuilder = new ExecutableBuilder(request)
}
开发者ID:Mishiranu,项目名称:Instant-Image,代码行数:36,代码来源:RxHttp.scala
示例9: TokenReader
//设置package包名称以及导入依赖的类
package com.piotrglazar.receiptlottery.core
import com.piotrglazar.receiptlottery.Token
import org.springframework.beans.factory.annotation.{Value, Autowired}
import org.springframework.stereotype.Component
import rx.lang.scala.Observable
import scala.io.Source
import scala.util.{Failure, Success, Try}
@Component
class TokenReader @Autowired()(@Value("${token.file}") private val path: String) {
def readTokens(): Observable[Token] = {
readContent() match {
case Success(items) => Observable.from(items)
case Failure(e) => Observable.error(e)
}
}
private def readContent(): Try[List[Token]] =
Try {
Source.fromInputStream(getClass.getResourceAsStream(path))
.getLines()
.filter(!_.isEmpty)
.map(Token)
.toList
}
}
开发者ID:piotrglazar,项目名称:receipt-lottery,代码行数:30,代码来源:TokenReader.scala
示例10: OfferMatcherLaunchTokensActor
//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.flow.impl
import akka.actor.{ Actor, Cancellable, Props }
import mesosphere.marathon.core.flow.LaunchTokenConfig
import mesosphere.marathon.core.instance.update.{ InstanceChange, InstanceUpdated }
import mesosphere.marathon.core.matcher.manager.OfferMatcherManager
import mesosphere.marathon.core.task.bus.TaskChangeObservables
import rx.lang.scala.{ Observable, Subscription }
import scala.concurrent.duration._
private[flow] object OfferMatcherLaunchTokensActor {
def props(
conf: LaunchTokenConfig,
taskStatusObservables: TaskChangeObservables,
offerMatcherManager: OfferMatcherManager): Props = {
Props(new OfferMatcherLaunchTokensActor(conf, taskStatusObservables, offerMatcherManager))
}
}
private class OfferMatcherLaunchTokensActor(
conf: LaunchTokenConfig,
taskStatusObservables: TaskChangeObservables, offerMatcherManager: OfferMatcherManager)
extends Actor {
var taskStatusUpdateSubscription: Subscription = _
var periodicSetToken: Cancellable = _
override def preStart(): Unit = {
val all: Observable[InstanceChange] = taskStatusObservables.forAll
taskStatusUpdateSubscription = all.subscribe(self ! _)
import context.dispatcher
periodicSetToken = context.system.scheduler.schedule(0.seconds, conf.launchTokenRefreshInterval().millis)(
offerMatcherManager.setLaunchTokens(conf.launchTokens())
)
}
override def postStop(): Unit = {
taskStatusUpdateSubscription.unsubscribe()
periodicSetToken.cancel()
}
override def receive: Receive = {
case InstanceUpdated(instance, _, _) if instance.isRunning && instance.state.healthy.fold(true)(_ == true) =>
offerMatcherManager.addLaunchTokens(1)
}
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:49,代码来源:OfferMatcherLaunchTokensActor.scala
示例11: refillOfferMatcherManagerLaunchTokens
//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.flow
import akka.event.EventStream
import mesosphere.marathon.MarathonSchedulerDriverHolder
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.flow.impl.{ OfferReviverDelegate, OfferMatcherLaunchTokensActor, ReviveOffersActor }
import mesosphere.marathon.core.leadership.LeadershipModule
import mesosphere.marathon.core.matcher.manager.OfferMatcherManager
import mesosphere.marathon.core.task.bus.TaskChangeObservables
import org.slf4j.LoggerFactory
import rx.lang.scala.Observable
def refillOfferMatcherManagerLaunchTokens(
conf: LaunchTokenConfig,
taskStatusObservables: TaskChangeObservables,
offerMatcherManager: OfferMatcherManager): Unit =
{
lazy val offerMatcherLaunchTokensProps = OfferMatcherLaunchTokensActor.props(
conf, taskStatusObservables, offerMatcherManager
)
leadershipModule.startWhenLeader(offerMatcherLaunchTokensProps, "offerMatcherLaunchTokens")
}
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:26,代码来源:FlowModule.scala
示例12: TaskChangeObservablesImpl
//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.task.bus.impl
import mesosphere.marathon.core.instance.update.InstanceChange
import mesosphere.marathon.core.task.bus.TaskChangeObservables
import mesosphere.marathon.state.PathId
import rx.lang.scala.{ Observable, Subscription }
private[bus] class TaskChangeObservablesImpl(eventStream: InternalTaskChangeEventStream)
extends TaskChangeObservables {
override def forAll: Observable[InstanceChange] = forRunSpecId(PathId.empty)
override def forRunSpecId(appId: PathId): Observable[InstanceChange] = {
Observable { observer =>
observer.add(Subscription(eventStream.unsubscribe(observer, appId)))
eventStream.subscribe(observer, appId)
}
}
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:20,代码来源:TaskChangeObservablesImpl.scala
示例13: RxScala
//设置package包名称以及导入依赖的类
package org.zalando.benchmarks
import akka.actor.ActorSystem
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.{ComputationScheduler, ExecutionContextScheduler}
class RxScala(system: ActorSystem) {
import ComputationFollowedByAsyncPublishing._
def benchmark: Unit = {
// looks nice, not sure if correct, blows up the heap
Observable
.from(1 to numTasks map Job)
.subscribeOn(ComputationScheduler())
.map(Computer compute)
.subscribeOn(ExecutionContextScheduler(system dispatcher))
.flatMap(1024, r => Observable.from(Publisher publish (r, system))(system dispatcher))
.foldLeft(0) { case (s, r) => s + computeResult(r) }
.foreach(println)
}
}
开发者ID:narayana-glassbeam,项目名称:scala-concurrency-playground,代码行数:22,代码来源:RxScala.scala
示例14: StatsReportingDao
//设置package包名称以及导入依赖的类
package com.flipkart.connekt.commons.dao
import com.couchbase.client.java.Bucket
import com.couchbase.client.java.document.StringDocument
import com.couchbase.client.java.query.Query
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import rx.lang.scala.Observable
import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
class StatsReportingDao(bucket: Bucket) extends Dao {
val ttl = 15.days.toSeconds.toInt
def put(kv: List[(String, Long)]) =
Observable.from(kv).flatMap(kv => {
rx.lang.scala.JavaConversions.toScalaObservable(bucket.async().upsert(StringDocument.create(kv._1, ttl, kv._2.toString)))
}).last.toBlocking.single
def get(keys: List[String]): Predef.Map[String, Long] = {
Observable.from(keys).flatMap(key => {
rx.lang.scala.JavaConversions.toScalaObservable(bucket.async().get(StringDocument.create(key))).filter(_ != null).map(d => key -> d.content().toLong)
}).toList.toBlocking.single.toMap
}
def counter(kvList: List[(String, Long)]) = {
Observable.from(kvList).flatMap(kv => {
rx.lang.scala.JavaConversions.toScalaObservable(bucket.async().counter(kv._1, kv._2, kv._2, ttl))
}).last.toBlocking.single
}
def prefix(prefixString: String): List[String] = {
try {
val queryResult = bucket.query(
Query.simple(s"SELECT META(${bucket.name()}).id FROM ${bucket.name()} WHERE META(${bucket.name()}).id LIKE '$prefixString%'")
)
queryResult.iterator().asScala.map(queryResult => {
queryResult.value().get("id").toString
}).toList
} catch {
case e: Exception =>
ConnektLogger(LogFile.SERVICE).error("StatsReportingDao prefix search failure", e)
throw e
}
}
}
object StatsReportingDao {
def apply(bucket: Bucket) = new StatsReportingDao(bucket)
}
开发者ID:ayush03agarwal,项目名称:connekt,代码行数:57,代码来源:StatsReportingDao.scala
示例15: ObserveBlockingQueue
//设置package包名称以及导入依赖的类
package nl.knaw.dans.experiments.hazelcast
import java.util.concurrent.{BlockingQueue, TimeUnit}
import com.hazelcast.core.HazelcastInstanceNotActiveException
import rx.lang.scala.{Observable, Scheduler}
import rx.lang.scala.schedulers.NewThreadScheduler
import rx.lang.scala.subscriptions.CompositeSubscription
import scala.concurrent.duration.Duration
package object queue {
implicit class ObserveBlockingQueue[T](val queue: BlockingQueue[T]) extends AnyVal {
def observe(timeout: Duration, scheduler: Scheduler = NewThreadScheduler())(running: () => Boolean): Observable[T] = {
Observable(subscriber => {
val worker = scheduler.createWorker
val subscription = worker.scheduleRec {
try {
if (running()) {
val t = Option(queue.poll(timeout.toMillis, TimeUnit.MILLISECONDS))
println(s"received: $t")
t.foreach(subscriber.onNext)
}
else subscriber.onCompleted()
}
catch {
case e: HazelcastInstanceNotActiveException => subscriber.onCompleted()
case e: Throwable => println(s" caught ${e.getClass.getSimpleName}: ${e.getMessage}"); subscriber.onError(e)
}
}
subscriber.add(CompositeSubscription(subscription, worker))
})
}
}
}
开发者ID:rvanheest-DANS-KNAW,项目名称:Hazelcast-experiments,代码行数:38,代码来源:package.scala
示例16: RxController
//设置package包名称以及导入依赖的类
package controllers
import javax.inject.Singleton
import javax.inject.Inject
import play.api.mvc._
import play.Logger
import models._
import controllers._
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
import rx.lang.scala.subjects.PublishSubject
import services._
import scala.concurrent.Future
@Singleton
class RxController @Inject()(priceService: IPriceService) extends Controller {
def prices = Action { implicit request =>
Logger.info("RX called.")
import scala.concurrent.ExecutionContext.Implicits.global
val sourceObservable = priceService.generatePrices
val rxResult = Observable.create {
sourceObservable.subscribe
}.subscribeOn(IOScheduler())
.take(1)
.flatMap { x => println(x) ; Observable.just(x) }
.toBlocking
.first
Ok("RxScala Price suggested is = " + rxResult)
}
def pricesAsync = Action.async { implicit request =>
Logger.info("RX Async called.")
import play.api.libs.concurrent.Execution.Implicits.defaultContext
val sourceObservable = priceService.generatePrices
val rxResult = Observable.create {
sourceObservable.subscribe
}.subscribeOn(IOScheduler())
.take(1)
.flatMap { x => println(x) ; Observable.just(x) }
.toBlocking
.first
Future { Ok("RxScala Price suggested is = " + rxResult)}
}
}
开发者ID:manojtc,项目名称:ReactiveWebStore,代码行数:45,代码来源:RxController.scala
示例17: generatePrices
//设置package包名称以及导入依赖的类
package services
import javax.inject.Singleton
import play.Logger
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
import rx.lang.scala.subjects.PublishSubject
import scala.concurrent.Future
import scala.util.Random.nextDouble
trait IPriceService {
def generatePrices: Observable[Double]
}
@Singleton
class PriceService extends IPriceService {
var doubleInfiniteStreamSubject = PublishSubject.apply[Double]()
import scala.concurrent.ExecutionContext.Implicits.global
Future {
Stream.continually(nextDouble * 1000.0).foreach {
x => Thread.sleep(1000)
doubleInfiniteStreamSubject.onNext(x)
}
}
override def generatePrices: Observable[Double] = {
var observableEven = Observable.create {
doubleInfiniteStreamSubject.subscribe
}.subscribeOn(IOScheduler())
.flatMap { x => Observable.from(Iterable.fill(1)(x + 10))}
.filter { x => x.toInt % 2 != 0 }
var observableOdd = Observable.create {
doubleInfiniteStreamSubject.subscribe
}.subscribeOn(IOScheduler())
.flatMap { x => Observable.from(Iterable.fill(1)(x + 10))}
.filter { x => x.toInt % 2 == 0}
var mergeObservable = Observable
.empty
.subscribeOn(IOScheduler())
.merge(observableEven)
.merge(observableOdd)
.take(10)
.foldLeft(0.0)(_+_)
.flatMap { x => Observable.just( x - (x * 0.9)) }
return mergeObservable
}
}
开发者ID:manojtc,项目名称:ReactiveWebStore,代码行数:48,代码来源:PriceService.scala
示例18: PublishSubjectSpec
//设置package包名称以及导入依赖的类
package name.mikulskibartosz.rxscala.tests.subject
import name.mikulskibartosz.rxscala.tests.assert.Assertions._
import org.scalatest.{FlatSpec, Matchers}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.{Observable, Observer, Subscription}
class PublishSubjectSpec extends FlatSpec with Matchers {
"A publish subject" should "emit all values emitted by the source after the subject subscribed to it" in {
var observer: Observer[Int] = null
val observable = Observable.create[Int](o => {
observer = o //do not do it at home ;)
o.onNext(1)
o.onNext(2)
Subscription()
})
when {
val subject = PublishSubject[Int]()
observable.subscribe(subject)
subject
} assert {
subscriber => {
observer.onNext(3)
observer.onNext(4)
subscriber.assertValues(3, 4)
}
}
}
//NOTE every subject can subscribe to more than one observable, but this one does not do anything interesting, therefore it is easy to explain its behaviour
it should "emit values from multiple sources" in {
var firstObserver: Observer[Int] = null
val firstObservable = Observable.create[Int](o => {
firstObserver = o
Subscription()
})
var secondObserver: Observer[Int] = null
val secondObservable = Observable.create[Int](o => {
secondObserver = o
Subscription()
})
when {
val subject = PublishSubject[Int]()
firstObservable.subscribe(subject)
secondObservable.subscribe(subject)
subject
} assert {
subscriber => {
firstObserver.onNext(1)
secondObserver.onNext(2)
subscriber.assertValues(1, 2)
}
}
}
}
开发者ID:mikulskibartosz,项目名称:RXScalaMeetup,代码行数:59,代码来源:PublishSubjectSpec.scala
示例19: AsyncSubjectSpec
//设置package包名称以及导入依赖的类
package name.mikulskibartosz.rxscala.tests.subject
import org.scalatest.{FlatSpec, Matchers}
import rx.lang.scala.{Subscription, Observable}
import rx.lang.scala.subjects.AsyncSubject
import name.mikulskibartosz.rxscala.tests.assert.Assertions._
class AsyncSubjectSpec extends FlatSpec with Matchers {
"An async subject" should "emit the last value emitted by the observable it observes" in {
val observable = Observable.from(Array(1, 2, 3))
when {
val subject = AsyncSubject[Int]()
observable.subscribe(subject)
subject
} assert {
_.assertValue(3)
}
}
it should "emit a failure if the observable has failed" in {
val observable = Observable.error(new IllegalStateException())
when {
val subject = AsyncSubject[Int]()
observable.subscribe(subject)
subject
} assert {
_.assertError(classOf[IllegalStateException])
}
}
it should "not emit a value if the observable has not completed" in {
val observable = Observable.create[Int](o => {
o.onNext(1)
o.onNext(2)
o.onNext(3)
//NOTE the function does not call the onCompleted method
Subscription()
})
when {
val subject = AsyncSubject[Int]()
observable.subscribe(subject)
subject
} assert {
subscriber => {
subscriber.assertNoValues()
subscriber.assertNotCompleted()
}
}
}
}
开发者ID:mikulskibartosz,项目名称:RXScalaMeetup,代码行数:56,代码来源:AsyncSubjectSpec.scala
示例20: ReplaySubjectSpec
//设置package包名称以及导入依赖的类
package name.mikulskibartosz.rxscala.tests.subject
import name.mikulskibartosz.rxscala.tests.assert.Assertions._
import org.scalatest.{FlatSpec, Matchers}
import rx.lang.scala.subjects.ReplaySubject
import rx.lang.scala.{Observable, Observer, Subscription}
class ReplaySubjectSpec extends FlatSpec with Matchers {
"A replay subject" should "emit all value emitted by its source" in {
var observer: Observer[Int] = null
val observable = Observable.create[Int](o => {
observer = o //do not do it at home ;)
o.onNext(1)
o.onNext(2)
Subscription()
})
when {
val subject = ReplaySubject[Int]()
observable.subscribe(subject)
subject
} assert {
subscriber => {
observer.onNext(3)
observer.onNext(4)
subscriber.assertValues(1, 2, 3, 4)
}
}
}
it should "cache a specified number of items emitted before subscribing and emit all added afterwards" in {
var observer: Observer[Int] = null
val observable = Observable.create[Int](o => {
observer = o //do not do it at home ;)
o.onNext(1)
o.onNext(2)
o.onNext(3)
o.onNext(4)
Subscription()
})
when {
val subject = ReplaySubject.withSize[Int](3)
observable.subscribe(subject)
subject
} assert {
subscriber => {
observer.onNext(5)
observer.onNext(6)
subscriber.assertValues(2, 3, 4, 5, 6)
}
}
}
}
开发者ID:mikulskibartosz,项目名称:RXScalaMeetup,代码行数:55,代码来源:ReplaySubjectSpec.scala
注:本文中的rx.lang.scala.Observable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论