本文整理汇总了Scala中monix.reactive.Observable类的典型用法代码示例。如果您正苦于以下问题:Scala Observable类的具体用法?Scala Observable怎么用?Scala Observable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Observable类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ReactiveReact
//设置package包名称以及导入依赖的类
package slate
package views
import cats.Monoid
import japgolly.scalajs.react.{CallbackTo, Children}
import japgolly.scalajs.react.component.ScalaBuilder
import monix.execution.{CancelableFuture, Scheduler}
import monix.reactive.Observable
object ReactiveReact {
case class ReactiveState[ST](reactivePart: ST, cancelableFuture: CancelableFuture[Unit])
// todo: cancel on willUnmount
implicit class ReactiveOps[P, ST, B](val builder: ScalaBuilder.Step4[P, Children.None, ST, B]) {
@inline final def reactiveReplaceL[R]
(getReactivePart: P => Observable[R], setReactivePart: (ST, R) => ST)
(implicit sch: Scheduler): ScalaBuilder.Step4[P, Children.None, ST, B] =
builder.componentWillMount($ =>
CallbackTo.lift { () =>
val _ = getReactivePart($.props).foreach { r =>
val () = $.modState(setReactivePart(_, r)).runNow()
}
}
)
@inline final def reactiveReplace
(implicit sch: Scheduler,
propsAreReactiveEv: P =:= Observable[ST]
): ScalaBuilder.Step4[P, Children.None, ST, B] =
reactiveReplaceL[ST](propsAreReactiveEv, (_, r) => r)
@inline final def reactiveMonoio
(implicit sch: Scheduler, ST: Monoid[ST],
propsAreReactiveEv: P =:= Observable[ST]
): ScalaBuilder.Step4[P, Children.None, ST, B] =
builder.componentWillMount($ =>
CallbackTo.lift { () =>
val _ = $.props.foldLeftF(ST.empty)(ST.combine).foreach { r =>
val () = $.setState(r).runNow()
}
}
)
}
}
开发者ID:edmundnoble,项目名称:slate,代码行数:48,代码来源:ReactiveReact.scala
示例2: MyConnectableObservable
//设置package包名称以及导入依赖的类
package my.samples.observables
import com.typesafe.scalalogging.LazyLogging
import monix.execution.{ Cancelable, Scheduler }
import monix.execution.cancelables.{ BooleanCancelable, SingleAssignmentCancelable }
import monix.reactive.Observable
import monix.reactive.observables.ConnectableObservable
import monix.reactive.observers.Subscriber
import my.samples.services.ZombieConnectorService
import scala.concurrent.duration._
class MyConnectableObservable(service: ZombieConnectorService)(implicit s: Scheduler) extends ConnectableObservable[Long] with LazyLogging {
private[this] val connection = SingleAssignmentCancelable()
private val serviceName = service.getClass.getName
override def connect(): Cancelable = {
logger.info(s"connecting to the service $serviceName")
// 1. we connect to the service first
service.connect()
// 2. we register a callback that says what to do when we disconnect
connection := BooleanCancelable { () =>
service.disconnect()
}
connection
}
def close() = {
logger.info(s"shutting down connection to service $serviceName")
connection.cancel()
}
override def unsafeSubscribeFn(subscriber: Subscriber[Long]): Cancelable =
Observable.interval(1.second).subscribe(subscriber)
}
object MyConnectableObservable {
def apply(service: ZombieConnectorService)(implicit s: Scheduler) =
new MyConnectableObservable(service)
}
开发者ID:joesan,项目名称:monix-samples,代码行数:44,代码来源:MyConnectableObservable.scala
示例3: TaskTest
//设置package包名称以及导入依赖的类
package objektwerks.monix
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalatest.{FunSuite, Matchers}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
class TaskTest extends FunSuite with Matchers {
test("run async task") {
val task = Task { 1 + 2 }
val cancelable = task.runAsync
cancelable.onComplete(x => x.get shouldBe 3)
}
test("run on complete task") {
val task = Task { 1 + 2 }
task.runOnComplete {
case Success(sum) => sum shouldBe 3
case Failure(failure) => throw failure
}
}
test("run async observable task") {
val task = {
Observable.interval(1.second)
.filter(_ % 2 == 0)
.map(_ * 2)
.flatMap(x => Observable.fromIterable(Seq(x, x)))
.take(3)
.toListL
}
val cancelable = task.runAsync
cancelable.onComplete(xs => xs.get.sum shouldBe 4)
}
test("coeval task") {
val task = Task.eval{ 1 + 2 }
val evaluating = task.coeval
evaluating.value match {
case Left(_) => throw new IllegalArgumentException("coeval task failed")
case Right(sum) => sum shouldBe 3
}
}
}
开发者ID:objektwerks,项目名称:typelevel,代码行数:48,代码来源:TaskTest.scala
示例4: SampleApp1
//设置package包名称以及导入依赖的类
package se.ta
import scala.concurrent.duration._
import scala.scalajs.js.JSApp
import japgolly.scalajs.react._
import japgolly.scalajs.react.vdom.prefix_<^._
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalajs.dom.document
object SampleApp1 extends JSApp {
def viewTick(tick: Long) =
<.div(
<.p(s"Tick: $tick seconds")
)
val Tick =
ReactComponentB[Long]("Tick")
.render_P(tick => viewTick(tick))
.build
def main(): Unit = {
println("Starting counting 10 ticks")
Observable.interval(1 second)
.doOnNext(x => ReactDOM.render(Tick(x), document.body))
.take(10)
.dump("0")
.subscribe()
}
}
开发者ID:ThomasAlexandre,项目名称:scalajs-elm,代码行数:35,代码来源:SampleApp1.scala
示例5: SubjectSubscription
//设置package包名称以及导入依赖的类
package com.hypertino.hyperbus.util
import com.hypertino.hyperbus.transport.api.matchers.RequestMatcher
import monix.eval.Task
import monix.execution.Ack.Stop
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.{ConcurrentSubject, Subject}
import scala.util.Success
abstract class SubjectSubscription[T](implicit val scheduler: Scheduler) extends FuzzyMatcher {
type eventType = T
// FyzzyIndex properties
def requestMatcher: RequestMatcher
override def indexProperties: Seq[FuzzyIndexItemMetaInfo] = requestMatcher.indexProperties
override def matches(other: Any): Boolean = requestMatcher.matches(other)
// Subject properties
protected val subject: Subject[eventType, eventType]
def cancel(): Unit = {
remove()
subject.onComplete()
}
def publish(t: eventType): Task[Ack] = {
Task.fromFuture(subject.onNext(t).andThen {
case Success(Stop) ? remove()
})
}
private def cancel_1() = cancel()
val observable: Observable[eventType] = new Observable[eventType] {
override def unsafeSubscribeFn(subscriber: Subscriber[eventType]): Cancelable = {
val original: Cancelable = subject.unsafeSubscribeFn(subscriber)
add()
new Cancelable {
override def cancel(): Unit = {
cancel_1()
original.cancel()
}
}
}
}
protected def remove(): Unit
protected def add(): Unit
}
开发者ID:hypertino,项目名称:hyperbus,代码行数:52,代码来源:SubjectSubscription.scala
示例6: PlainEndpoint
//设置package包名称以及导入依赖的类
package com.hypertino.hyperbus.transport.resolvers
import com.hypertino.hyperbus.model.RequestBase
import com.hypertino.hyperbus.transport.api.{NoTransportRouteException, ServiceEndpoint, ServiceResolver}
import monix.reactive.Observable
case class PlainEndpoint(hostname: String, port: Option[Int]) extends ServiceEndpoint
class PlainResolver(endpointsMap: Map[String, Seq[PlainEndpoint]]) extends ServiceResolver {
def this(serviceName: String, hostname: String, port: Option[Int]) = this(Map(serviceName ? Seq(PlainEndpoint(hostname, port))))
override def serviceObservable(message: RequestBase): Observable[Seq[ServiceEndpoint]] = {
endpointsMap.get(message.headers.hrl.authority) match {
case Some(endpoints) ? Observable.now(endpoints)
case None ? Observable.raiseError(new NoTransportRouteException(s"Can't resolve service for ${message.headers.hrl}"))
}
}
}
开发者ID:hypertino,项目名称:hyperbus,代码行数:19,代码来源:PlainResolver.scala
示例7: Implicits
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.body
import java.io.InputStream
import java.nio.ByteBuffer
import fr.hmil.roshttp.body.JSONBody._
import monix.reactive.Observable
object Implicits {
implicit def stringToJSONString(value: String): JSONString = new JSONString(value)
implicit def intToJSONNumber(value: Int): JSONNumber = new JSONNumber(value)
implicit def floatToJSONNumber(value: Float): JSONNumber = new JSONNumber(value)
implicit def doubleToJSONNumber(value: Double): JSONNumber = new JSONNumber(value)
implicit def JSONObjectToJSONBody(obj: JSONObject): JSONBody = JSONBody(obj)
implicit def byteBufferToByteBufferBody(buffer: ByteBuffer): BodyPart = ByteBufferBody(buffer)
implicit def observableToStreamBody(is: InputStream): BodyPart =
StreamBody(Observable.fromInputStream(is).map(ByteBuffer.wrap))
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:20,代码来源:Implicits.scala
示例8: MultiPartBody
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.body
import java.nio.ByteBuffer
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.util.Random
class MultiPartBody(parts: Map[String, BodyPart], subtype: String = "form-data")(implicit scheduler: Scheduler)
extends BodyPart {
val boundary = "----" + Random.alphanumeric.take(24).mkString.toLowerCase
override def contentType: String = s"multipart/$subtype; boundary=$boundary"
override def content: Observable[ByteBuffer] = {
parts.
// Prepend multipart encapsulation boundary and body part headers to
// each body part.
map({ case (name, part) =>
ByteBuffer.wrap(
("\r\n--" + boundary + "\r\n" +
"Content-Disposition: form-data; name=\"" + name + "\"\r\n" +
s"Content-Type: ${part.contentType}\r\n" +
"\r\n").getBytes("utf-8")
) +: part.content
}).
// Join body parts
reduceLeft((acc, elem) => acc ++ elem).
// Append the closing boundary
:+(ByteBuffer.wrap(s"\r\n--$boundary--\r\n".getBytes("utf-8")))
}
}
object MultiPartBody {
def apply(parts: (String, BodyPart)*)(implicit scheduler: Scheduler): MultiPartBody =
new MultiPartBody(Map(parts: _*))
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:41,代码来源:MultiPartBody.scala
示例9: StreamHttpResponse
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.response
import java.nio.ByteBuffer
import fr.hmil.roshttp.BackendConfig
import fr.hmil.roshttp.util.HeaderMap
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.concurrent.Future
class StreamHttpResponse(
val statusCode: Int,
val headers: HeaderMap[String],
val body: Observable[ByteBuffer])
extends HttpResponse
object StreamHttpResponse extends HttpResponseFactory[StreamHttpResponse] {
override def apply(
header: HttpResponseHeader,
bodyStream: Observable[ByteBuffer],
config: BackendConfig)
(implicit scheduler: Scheduler): Future[StreamHttpResponse] =
Future.successful(new StreamHttpResponse(header.statusCode, header.headers, bodyStream))
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:27,代码来源:StreamHttpResponse.scala
示例10: SimpleHttpResponse
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.response
import java.nio.ByteBuffer
import fr.hmil.roshttp.BackendConfig
import fr.hmil.roshttp.exceptions.ResponseException
import fr.hmil.roshttp.util.{HeaderMap, Utils}
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
class SimpleHttpResponse(
val statusCode: Int,
val headers: HeaderMap[String],
val body: String)
extends HttpResponse
object SimpleHttpResponse extends HttpResponseFactory[SimpleHttpResponse] {
override def apply(
header: HttpResponseHeader,
bodyStream: Observable[ByteBuffer],
config: BackendConfig)
(implicit scheduler: Scheduler): Future[SimpleHttpResponse] = {
val charset = Utils.charsetFromContentType(header.headers.getOrElse("content-type", null))
val buffers = mutable.Queue[ByteBuffer]()
val promise = Promise[SimpleHttpResponse]()
val streamCollector = bodyStream.
foreach(elem => buffers.enqueue(elem)).
map({_ =>
val body = recomposeBody(buffers, config.maxChunkSize, charset)
new SimpleHttpResponse(header.statusCode, header.headers, body)
})
streamCollector.onComplete({
case res:Success[SimpleHttpResponse] =>
promise.trySuccess(res.value)
case e:Failure[_] =>
promise.tryFailure(new ResponseException(e.exception, header))
})
promise.future
}
private def recomposeBody(seq: mutable.Queue[ByteBuffer], maxChunkSize: Int, charset: String): String = {
// Allocate maximum expected body length
val buffer = ByteBuffer.allocate(seq.length * maxChunkSize)
val totalBytes = seq.foldLeft(0)({ (count, chunk) =>
buffer.put(chunk)
count + chunk.limit
})
buffer.limit(totalBytes)
Utils.getStringFromBuffer(buffer, charset)
}
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:61,代码来源:SimpleHttpResponse.scala
示例11: StreamingPressureTest
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp
import java.nio.ByteBuffer
import fr.hmil.roshttp.body.StreamBody
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import utest._
object StreamingPressureTest extends TestSuite {
private val SERVER_URL = "http://localhost:3000"
private val ONE_MILLION = 1000000
val tests = this{
// Send approx. 8 gigs of data to the server to check that there is no leak
"Upload streams do not leak" - {
if (!JsEnvUtils.isRealBrowser) {
HttpRequest(s"$SERVER_URL/streams/in")
.post(StreamBody(Observable.fromIterator(new Iterator[ByteBuffer]() {
override def hasNext: Boolean = true
override def next(): ByteBuffer = ByteBuffer.allocateDirect(8192)
})
.take(ONE_MILLION)))
.map(r => r.body ==> "Received 8192000000 bytes.")
.recover({
case e: Throwable =>
e.printStackTrace()
})
}
}
// Receive approx. 8 gigs of data to ensure that there is no leak
"Download streams do not leak" - {
// Due to browser incompatibility and node memory leak, run this test only in the JVM
if (JsEnvUtils.userAgent == "jvm") {
HttpRequest(s"$SERVER_URL/streams/out")
.stream()
.flatMap(_.body.map(_.limit.asInstanceOf[Long]).reduce((l, r) => l + r).runAsyncGetFirst)
.map(_.get ==> 8192000000L)
}
}
}
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:47,代码来源:StreamingPressureTest.scala
示例12: RouteGuideMonixService
//设置package包名称以及导入依赖的类
package io.grpc.routeguide
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.logging.Logger
import concurrency.AtomicRef
import monix.eval.Task
import monix.reactive.Observable
class RouteGuideMonixService(features: Seq[Feature]) extends RouteGuideGrpcMonix.RouteGuide {
val logger: Logger = Logger.getLogger(classOf[RouteGuideMonixService].getName)
private val routeNotes: AtomicRef[Map[Point, Seq[RouteNote]]] = new AtomicRef(Map.empty)
override def routeChat(notes: Observable[RouteNote]): Observable[RouteNote] =
notes.flatMap { note =>
addNote(note)
Observable.fromIterable(getNotes(note.getLocation))
}
private def findFeature(point: Point): Feature = {
features.find { feature =>
feature.getLocation.latitude == point.latitude && feature.getLocation.longitude == point.longitude
} getOrElse new Feature(location = Some(point))
}
private def getNotes(point: Point): Seq[RouteNote] = {
routeNotes.get.getOrElse(point, Seq.empty)
}
private def addNote(note: RouteNote): Unit = {
routeNotes.updateAndGet { notes =>
val existingNotes = notes.getOrElse(note.getLocation, Seq.empty)
val updatedNotes = existingNotes :+ note
notes + (note.getLocation -> updatedNotes)
}
}
}
开发者ID:btlines,项目名称:grpcexample,代码行数:41,代码来源:RouteGuideMonixService.scala
示例13: ConsulServiceResolver
//设置package包名称以及导入依赖的类
package com.hypertino.transport.resolvers
import java.util
import com.hypertino.hyperbus.model.RequestBase
import com.hypertino.hyperbus.transport.api.{NoTransportRouteException, ServiceEndpoint, ServiceResolver}
import com.hypertino.hyperbus.transport.resolvers.PlainEndpoint
import com.orbitz.consul.Consul
import com.orbitz.consul.cache.{ConsulCache, ServiceHealthCache, ServiceHealthKey}
import com.orbitz.consul.model.health.ServiceHealth
import monix.execution.Scheduler
import monix.reactive.Observable
import monix.reactive.subjects.ConcurrentSubject
class ConsulServiceResolver(consul: Consul)
(implicit val scheduler: Scheduler) extends ServiceResolver {
def this()(implicit scheduler: Scheduler) = this(Consul.builder().build())
override def serviceObservable(message: RequestBase): Observable[Seq[ServiceEndpoint]] = {
message.headers.hrl.service.map { serviceName ?
val subject = ConcurrentSubject.publishToOne[Seq[ServiceEndpoint]]
val healthClient = consul.healthClient
val svHealth = ServiceHealthCache.newCache(healthClient, serviceName)
val listener = new ConsulCache.Listener[ServiceHealthKey, ServiceHealth] {
override def notify(newValues: util.Map[ServiceHealthKey, ServiceHealth]): Unit = {
import scala.collection.JavaConverters._
val seq = newValues
.asScala
.filter {
_._2
.getChecks
.asScala
.forall(check ? isPassing(check.getStatus))
}
.map { i ?
PlainEndpoint(i._1.getHost, Some(i._1.getPort))
}
.toSeq
subject.onNext(seq)
}
}
svHealth.addListener(listener)
svHealth.start()
subject
} getOrElse {
Observable.raiseError(
new NoTransportRouteException(s"Request doesn't specify service name: ${message.headers.hrl}")
)
}
}
private def isPassing(s: String) = s != null && (s.equalsIgnoreCase("passing") || s.equalsIgnoreCase("warning"))
}
开发者ID:hypertino,项目名称:hyperbus-consul-resolver,代码行数:57,代码来源:ConsulServiceResolver.scala
示例14: scrap
//设置package包名称以及导入依赖的类
package com.airbnbData.service
//import org.http4s.client.Client
import com.airbnbData.model.command.PropertyAndAirbnbUserCommand
import play.api.libs.ws.{WSClient => Client}
import scalaz.Kleisli
import slick.jdbc.JdbcBackend.DatabaseDef
import monix.eval.Task
import monix.reactive.Observable
trait AirbnbScrapService {
type Box[A] = Task[A]
type Dependencies = (Client, DatabaseDef)
type Operation[A] = Kleisli[Box, Dependencies, A]
def scrap(
save: Seq[PropertyAndAirbnbUserCommand] => Kleisli[Task, DatabaseDef, Int],
scrap: () => Kleisli[Task, Client, Seq[Option[PropertyAndAirbnbUserCommand]]],
deleteAll: () => Kleisli[Task, DatabaseDef, Int]
): Operation[String]
def scrap2(
save: PropertyAndAirbnbUserCommand => Kleisli[Observable, DatabaseDef, Int],
scrap: () => Kleisli[Observable, Client, PropertyAndAirbnbUserCommand]
): Kleisli[Task, (Client, DatabaseDef), Unit]
}
开发者ID:Hxucaa,项目名称:scrappy,代码行数:29,代码来源:AirbnbScrapService.scala
注:本文中的monix.reactive.Observable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论