• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Observable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中monix.reactive.Observable的典型用法代码示例。如果您正苦于以下问题:Scala Observable类的具体用法?Scala Observable怎么用?Scala Observable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Observable类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ReactiveReact

//设置package包名称以及导入依赖的类
package slate
package views

import cats.Monoid
import japgolly.scalajs.react.{CallbackTo, Children}
import japgolly.scalajs.react.component.ScalaBuilder
import monix.execution.{CancelableFuture, Scheduler}
import monix.reactive.Observable

object ReactiveReact {
  case class ReactiveState[ST](reactivePart: ST, cancelableFuture: CancelableFuture[Unit])

  // todo: cancel on willUnmount
  implicit class ReactiveOps[P, ST, B](val builder: ScalaBuilder.Step4[P, Children.None, ST, B]) {

    @inline final def reactiveReplaceL[R]
    (getReactivePart: P => Observable[R], setReactivePart: (ST, R) => ST)
    (implicit sch: Scheduler): ScalaBuilder.Step4[P, Children.None, ST, B] =
      builder.componentWillMount($ =>
        CallbackTo.lift { () =>
          val _ = getReactivePart($.props).foreach { r =>
            val () = $.modState(setReactivePart(_, r)).runNow()
          }
        }
      )

    @inline final def reactiveReplace
    (implicit sch: Scheduler,
     propsAreReactiveEv: P =:= Observable[ST]
    ): ScalaBuilder.Step4[P, Children.None, ST, B] =
      reactiveReplaceL[ST](propsAreReactiveEv, (_, r) => r)

    @inline final def reactiveMonoio
    (implicit sch: Scheduler, ST: Monoid[ST],
     propsAreReactiveEv: P =:= Observable[ST]
    ): ScalaBuilder.Step4[P, Children.None, ST, B] =
      builder.componentWillMount($ =>
        CallbackTo.lift { () =>
          val _ = $.props.foldLeftF(ST.empty)(ST.combine).foreach { r =>
            val () = $.setState(r).runNow()
          }
        }
      )

  }

} 
开发者ID:edmundnoble,项目名称:slate,代码行数:48,代码来源:ReactiveReact.scala


示例2: MyConnectableObservable

//设置package包名称以及导入依赖的类
package my.samples.observables

import com.typesafe.scalalogging.LazyLogging
import monix.execution.{ Cancelable, Scheduler }
import monix.execution.cancelables.{ BooleanCancelable, SingleAssignmentCancelable }
import monix.reactive.Observable
import monix.reactive.observables.ConnectableObservable
import monix.reactive.observers.Subscriber
import my.samples.services.ZombieConnectorService

import scala.concurrent.duration._

class MyConnectableObservable(service: ZombieConnectorService)(implicit s: Scheduler) extends ConnectableObservable[Long] with LazyLogging {

  private[this] val connection = SingleAssignmentCancelable()
  private val serviceName = service.getClass.getName

  override def connect(): Cancelable = {
    logger.info(s"connecting to the service $serviceName")

    // 1. we connect to the service first
    service.connect()

    // 2. we register a callback that says what to do when we disconnect
    connection := BooleanCancelable { () =>
      service.disconnect()
    }

    connection
  }

  def close() = {
    logger.info(s"shutting down connection to service $serviceName")
    connection.cancel()
  }

  override def unsafeSubscribeFn(subscriber: Subscriber[Long]): Cancelable =
    Observable.interval(1.second).subscribe(subscriber)
}
object MyConnectableObservable {
  def apply(service: ZombieConnectorService)(implicit s: Scheduler) =
    new MyConnectableObservable(service)
} 
开发者ID:joesan,项目名称:monix-samples,代码行数:44,代码来源:MyConnectableObservable.scala


示例3: TaskTest

//设置package包名称以及导入依赖的类
package objektwerks.monix

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalatest.{FunSuite, Matchers}

import scala.concurrent.duration._
import scala.util.{Failure, Success}

class TaskTest extends FunSuite with Matchers {
  test("run async task") {
    val task = Task { 1 + 2 }
    val cancelable = task.runAsync
    cancelable.onComplete(x => x.get shouldBe 3)
  }

  test("run on complete task") {
    val task = Task { 1 + 2 }
    task.runOnComplete {
      case Success(sum) => sum shouldBe 3
      case Failure(failure) => throw failure
    }
  }

  test("run async observable task") {
    val task = {
      Observable.interval(1.second)
        .filter(_ % 2 == 0)
        .map(_ * 2)
        .flatMap(x => Observable.fromIterable(Seq(x, x)))
        .take(3)
        .toListL
    }
    val cancelable = task.runAsync
    cancelable.onComplete(xs => xs.get.sum shouldBe 4)
  }

  test("coeval task") {
    val task = Task.eval{ 1 + 2 }
    val evaluating = task.coeval
    evaluating.value match {
      case Left(_) => throw new IllegalArgumentException("coeval task failed")
      case Right(sum) => sum shouldBe 3
    }
  }
} 
开发者ID:objektwerks,项目名称:typelevel,代码行数:48,代码来源:TaskTest.scala


示例4: SampleApp1

//设置package包名称以及导入依赖的类
package se.ta

import scala.concurrent.duration._
import scala.scalajs.js.JSApp

import japgolly.scalajs.react._
import japgolly.scalajs.react.vdom.prefix_<^._
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import org.scalajs.dom.document

object SampleApp1 extends JSApp {

  def viewTick(tick: Long) =
    <.div(
      <.p(s"Tick: $tick seconds")
    )

  val Tick =
    ReactComponentB[Long]("Tick")
      .render_P(tick => viewTick(tick))
      .build

  def main(): Unit = {

    println("Starting counting  10 ticks")

    Observable.interval(1 second)
      .doOnNext(x => ReactDOM.render(Tick(x), document.body))
      .take(10)
      .dump("0")
      .subscribe()
  }
} 
开发者ID:ThomasAlexandre,项目名称:scalajs-elm,代码行数:35,代码来源:SampleApp1.scala


示例5: SubjectSubscription

//设置package包名称以及导入依赖的类
package com.hypertino.hyperbus.util

import com.hypertino.hyperbus.transport.api.matchers.RequestMatcher
import monix.eval.Task
import monix.execution.Ack.Stop
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.{ConcurrentSubject, Subject}

import scala.util.Success

abstract class SubjectSubscription[T](implicit val scheduler: Scheduler) extends FuzzyMatcher {
  type eventType = T

  // FyzzyIndex properties
  def requestMatcher: RequestMatcher
  override def indexProperties: Seq[FuzzyIndexItemMetaInfo] = requestMatcher.indexProperties
  override def matches(other: Any): Boolean = requestMatcher.matches(other)

  // Subject properties
  protected val subject: Subject[eventType, eventType]
  def cancel(): Unit = {
    remove()
    subject.onComplete()
  }

  def publish(t: eventType): Task[Ack] = {
    Task.fromFuture(subject.onNext(t).andThen {
      case Success(Stop) ? remove()
    })
  }

  private def cancel_1() = cancel()

  val observable: Observable[eventType] = new Observable[eventType] {
    override def unsafeSubscribeFn(subscriber: Subscriber[eventType]): Cancelable = {
      val original: Cancelable = subject.unsafeSubscribeFn(subscriber)
      add()
      new Cancelable {
        override def cancel(): Unit = {
          cancel_1()
          original.cancel()
        }
      }
    }
  }

  protected def remove(): Unit
  protected def add(): Unit
} 
开发者ID:hypertino,项目名称:hyperbus,代码行数:52,代码来源:SubjectSubscription.scala


示例6: PlainEndpoint

//设置package包名称以及导入依赖的类
package com.hypertino.hyperbus.transport.resolvers

import com.hypertino.hyperbus.model.RequestBase
import com.hypertino.hyperbus.transport.api.{NoTransportRouteException, ServiceEndpoint, ServiceResolver}
import monix.reactive.Observable

case class PlainEndpoint(hostname: String, port: Option[Int]) extends ServiceEndpoint

class PlainResolver(endpointsMap: Map[String, Seq[PlainEndpoint]]) extends ServiceResolver {
  def this(serviceName: String, hostname: String, port: Option[Int]) = this(Map(serviceName ? Seq(PlainEndpoint(hostname, port))))

  override def serviceObservable(message: RequestBase): Observable[Seq[ServiceEndpoint]] = {
    endpointsMap.get(message.headers.hrl.authority) match {
      case Some(endpoints) ? Observable.now(endpoints)
      case None ? Observable.raiseError(new NoTransportRouteException(s"Can't resolve service for ${message.headers.hrl}"))
    }
  }
} 
开发者ID:hypertino,项目名称:hyperbus,代码行数:19,代码来源:PlainResolver.scala


示例7: Implicits

//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.body

import java.io.InputStream
import java.nio.ByteBuffer

import fr.hmil.roshttp.body.JSONBody._
import monix.reactive.Observable

object Implicits {
  implicit def stringToJSONString(value: String): JSONString = new JSONString(value)
  implicit def intToJSONNumber(value: Int): JSONNumber = new JSONNumber(value)
  implicit def floatToJSONNumber(value: Float): JSONNumber = new JSONNumber(value)
  implicit def doubleToJSONNumber(value: Double): JSONNumber = new JSONNumber(value)
  implicit def JSONObjectToJSONBody(obj: JSONObject): JSONBody = JSONBody(obj)

  implicit def byteBufferToByteBufferBody(buffer: ByteBuffer): BodyPart = ByteBufferBody(buffer)
  implicit def observableToStreamBody(is: InputStream): BodyPart =
    StreamBody(Observable.fromInputStream(is).map(ByteBuffer.wrap))
} 
开发者ID:hmil,项目名称:RosHTTP,代码行数:20,代码来源:Implicits.scala


示例8: MultiPartBody

//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.body

import java.nio.ByteBuffer

import monix.execution.Scheduler
import monix.reactive.Observable

import scala.util.Random


class MultiPartBody(parts: Map[String, BodyPart], subtype: String = "form-data")(implicit scheduler: Scheduler)
  extends BodyPart {

  val boundary = "----" + Random.alphanumeric.take(24).mkString.toLowerCase

  override def contentType: String = s"multipart/$subtype; boundary=$boundary"

  override def content: Observable[ByteBuffer] = {
    parts.
      // Prepend multipart encapsulation boundary and body part headers to
      // each body part.
      map({ case (name, part) =>
        ByteBuffer.wrap(
          ("\r\n--" + boundary + "\r\n" +
            "Content-Disposition: form-data; name=\"" + name + "\"\r\n" +
            s"Content-Type: ${part.contentType}\r\n" +
            "\r\n").getBytes("utf-8")
        ) +: part.content
      }).
      // Join body parts
      reduceLeft((acc, elem) => acc ++ elem).
      // Append the closing boundary
      :+(ByteBuffer.wrap(s"\r\n--$boundary--\r\n".getBytes("utf-8")))
  }
}

object MultiPartBody {
  def apply(parts: (String, BodyPart)*)(implicit scheduler: Scheduler): MultiPartBody =
    new MultiPartBody(Map(parts: _*))
} 
开发者ID:hmil,项目名称:RosHTTP,代码行数:41,代码来源:MultiPartBody.scala


示例9: StreamHttpResponse

//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.response

import java.nio.ByteBuffer

import fr.hmil.roshttp.BackendConfig
import fr.hmil.roshttp.util.HeaderMap
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.concurrent.Future


class StreamHttpResponse(
    val statusCode: Int,
    val headers: HeaderMap[String],
    val body: Observable[ByteBuffer])
extends HttpResponse

object StreamHttpResponse extends HttpResponseFactory[StreamHttpResponse] {
  override def apply(
      header: HttpResponseHeader,
      bodyStream: Observable[ByteBuffer],
      config: BackendConfig)
      (implicit scheduler: Scheduler): Future[StreamHttpResponse] =
    Future.successful(new StreamHttpResponse(header.statusCode, header.headers, bodyStream))
} 
开发者ID:hmil,项目名称:RosHTTP,代码行数:27,代码来源:StreamHttpResponse.scala


示例10: SimpleHttpResponse

//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.response

import java.nio.ByteBuffer

import fr.hmil.roshttp.BackendConfig
import fr.hmil.roshttp.exceptions.ResponseException
import fr.hmil.roshttp.util.{HeaderMap, Utils}
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}


class SimpleHttpResponse(
    val statusCode: Int,
    val headers: HeaderMap[String],
    val body: String)
  extends HttpResponse

object SimpleHttpResponse extends HttpResponseFactory[SimpleHttpResponse] {
  override def apply(
      header: HttpResponseHeader,
      bodyStream: Observable[ByteBuffer],
      config: BackendConfig)
      (implicit scheduler: Scheduler): Future[SimpleHttpResponse] = {

    val charset = Utils.charsetFromContentType(header.headers.getOrElse("content-type", null))
    val buffers = mutable.Queue[ByteBuffer]()
    val promise = Promise[SimpleHttpResponse]()

    val streamCollector = bodyStream.
      foreach(elem => buffers.enqueue(elem)).
      map({_ =>
        val body = recomposeBody(buffers, config.maxChunkSize, charset)
        new SimpleHttpResponse(header.statusCode, header.headers, body)
      })

    streamCollector.onComplete({
      case res:Success[SimpleHttpResponse] =>
        promise.trySuccess(res.value)
      case e:Failure[_] =>
        promise.tryFailure(new ResponseException(e.exception, header))
    })

    promise.future
  }

  private def recomposeBody(seq: mutable.Queue[ByteBuffer], maxChunkSize: Int, charset: String): String = {
    // Allocate maximum expected body length
    val buffer = ByteBuffer.allocate(seq.length * maxChunkSize)
    val totalBytes = seq.foldLeft(0)({ (count, chunk) =>
      buffer.put(chunk)
      count + chunk.limit
    })
    buffer.limit(totalBytes)
    Utils.getStringFromBuffer(buffer, charset)
  }
} 
开发者ID:hmil,项目名称:RosHTTP,代码行数:61,代码来源:SimpleHttpResponse.scala


示例11: StreamingPressureTest

//设置package包名称以及导入依赖的类
package fr.hmil.roshttp

import java.nio.ByteBuffer

import fr.hmil.roshttp.body.StreamBody
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import utest._

object StreamingPressureTest extends TestSuite {

  private val SERVER_URL = "http://localhost:3000"

  private val ONE_MILLION = 1000000

  val tests = this{

    // Send approx. 8 gigs of data to the server to check that there is no leak
    "Upload streams do not leak" - {
      if (!JsEnvUtils.isRealBrowser) {
        HttpRequest(s"$SERVER_URL/streams/in")
          .post(StreamBody(Observable.fromIterator(new Iterator[ByteBuffer]() {
            override def hasNext: Boolean = true
            override def next(): ByteBuffer = ByteBuffer.allocateDirect(8192)
          })
            .take(ONE_MILLION)))
          .map(r => r.body ==> "Received 8192000000 bytes.")
          .recover({
            case e: Throwable =>
              e.printStackTrace()
          })
      }
    }

    // Receive approx. 8 gigs of data to ensure that there is no leak
    "Download streams do not leak" - {
      // Due to browser incompatibility and node memory leak, run this test only in the JVM
      if (JsEnvUtils.userAgent == "jvm") {
        HttpRequest(s"$SERVER_URL/streams/out")
          .stream()
          .flatMap(_.body.map(_.limit.asInstanceOf[Long]).reduce((l, r) => l + r).runAsyncGetFirst)
          .map(_.get ==> 8192000000L)
      }
    }
  }
} 
开发者ID:hmil,项目名称:RosHTTP,代码行数:47,代码来源:StreamingPressureTest.scala


示例12: RouteGuideMonixService

//设置package包名称以及导入依赖的类
package io.grpc.routeguide

import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.logging.Logger

import concurrency.AtomicRef
import monix.eval.Task
import monix.reactive.Observable

class RouteGuideMonixService(features: Seq[Feature]) extends RouteGuideGrpcMonix.RouteGuide {

  val logger: Logger = Logger.getLogger(classOf[RouteGuideMonixService].getName)

  private val routeNotes: AtomicRef[Map[Point, Seq[RouteNote]]] = new AtomicRef(Map.empty)

  
  override def routeChat(notes: Observable[RouteNote]): Observable[RouteNote] =
    notes.flatMap { note =>
      addNote(note)
      Observable.fromIterable(getNotes(note.getLocation))
    }

  private def findFeature(point: Point): Feature = {
    features.find { feature =>
      feature.getLocation.latitude == point.latitude && feature.getLocation.longitude == point.longitude
    } getOrElse new Feature(location = Some(point))
  }

  private def getNotes(point: Point): Seq[RouteNote] = {
    routeNotes.get.getOrElse(point, Seq.empty)
  }

  private def addNote(note: RouteNote): Unit = {
    routeNotes.updateAndGet { notes =>
      val existingNotes = notes.getOrElse(note.getLocation, Seq.empty)
      val updatedNotes = existingNotes :+ note
      notes + (note.getLocation -> updatedNotes)
    }
  }
} 
开发者ID:btlines,项目名称:grpcexample,代码行数:41,代码来源:RouteGuideMonixService.scala


示例13: ConsulServiceResolver

//设置package包名称以及导入依赖的类
package com.hypertino.transport.resolvers

import java.util

import com.hypertino.hyperbus.model.RequestBase
import com.hypertino.hyperbus.transport.api.{NoTransportRouteException, ServiceEndpoint, ServiceResolver}
import com.hypertino.hyperbus.transport.resolvers.PlainEndpoint
import com.orbitz.consul.Consul
import com.orbitz.consul.cache.{ConsulCache, ServiceHealthCache, ServiceHealthKey}
import com.orbitz.consul.model.health.ServiceHealth
import monix.execution.Scheduler
import monix.reactive.Observable
import monix.reactive.subjects.ConcurrentSubject

class ConsulServiceResolver(consul: Consul)
                           (implicit val scheduler: Scheduler) extends ServiceResolver {

  def this()(implicit scheduler: Scheduler) = this(Consul.builder().build())

  override def serviceObservable(message: RequestBase): Observable[Seq[ServiceEndpoint]] = {
    message.headers.hrl.service.map { serviceName ?
      val subject = ConcurrentSubject.publishToOne[Seq[ServiceEndpoint]]
      val healthClient = consul.healthClient

      val svHealth = ServiceHealthCache.newCache(healthClient, serviceName)
      val listener = new ConsulCache.Listener[ServiceHealthKey, ServiceHealth] {
        override def notify(newValues: util.Map[ServiceHealthKey, ServiceHealth]): Unit = {
          import scala.collection.JavaConverters._

          val seq = newValues
            .asScala
            .filter {
              _._2
                .getChecks
                .asScala
                .forall(check ? isPassing(check.getStatus))
            }
            .map { i ?
              PlainEndpoint(i._1.getHost, Some(i._1.getPort))
            }
            .toSeq
          subject.onNext(seq)
        }
      }
      svHealth.addListener(listener)
      svHealth.start()
      subject
    } getOrElse {
      Observable.raiseError(
        new NoTransportRouteException(s"Request doesn't specify service name: ${message.headers.hrl}")
      )
    }
  }

  private def isPassing(s: String) = s != null && (s.equalsIgnoreCase("passing") || s.equalsIgnoreCase("warning"))
} 
开发者ID:hypertino,项目名称:hyperbus-consul-resolver,代码行数:57,代码来源:ConsulServiceResolver.scala


示例14: scrap

//设置package包名称以及导入依赖的类
package com.airbnbData.service

//import org.http4s.client.Client
import com.airbnbData.model.command.PropertyAndAirbnbUserCommand
import play.api.libs.ws.{WSClient => Client}

import scalaz.Kleisli
import slick.jdbc.JdbcBackend.DatabaseDef
import monix.eval.Task
import monix.reactive.Observable


trait AirbnbScrapService {
  type Box[A] = Task[A]
  type Dependencies = (Client, DatabaseDef)
  type Operation[A] = Kleisli[Box, Dependencies, A]

  def scrap(
             save: Seq[PropertyAndAirbnbUserCommand] => Kleisli[Task, DatabaseDef, Int],
             scrap: () => Kleisli[Task, Client, Seq[Option[PropertyAndAirbnbUserCommand]]],
             deleteAll: () => Kleisli[Task, DatabaseDef, Int]
           ): Operation[String]

  def scrap2(
              save: PropertyAndAirbnbUserCommand => Kleisli[Observable, DatabaseDef, Int],
              scrap: () => Kleisli[Observable, Client, PropertyAndAirbnbUserCommand]
            ): Kleisli[Task, (Client, DatabaseDef), Unit]
} 
开发者ID:Hxucaa,项目名称:scrappy,代码行数:29,代码来源:AirbnbScrapService.scala



注:本文中的monix.reactive.Observable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala KeyFactory类代码示例发布时间:2022-05-23
下一篇:
Scala ShardRegion类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap