本文整理汇总了Scala中monix.execution.Scheduler类的典型用法代码示例。如果您正苦于以下问题:Scala Scheduler类的具体用法?Scala Scheduler怎么用?Scala Scheduler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Scheduler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: QQAsyncTestSuite
//设置package包名称以及导入依赖的类
package qq
import java.util.concurrent.TimeUnit
import monix.execution.schedulers.ExecutionModel
import monix.execution.{Cancelable, Scheduler}
import org.scalatest.AsyncFreeSpec
import scala.concurrent.ExecutionContext
abstract class QQAsyncTestSuite extends AsyncFreeSpec with QQTestSuite with AsyncTestUtil {
implicit val schedulerVal: Scheduler = scheduler(super.executionContext)
def scheduler(implicit executionContext: ExecutionContext): Scheduler =
new Scheduler {
override def execute(runnable: Runnable): Unit = executionContext.execute(runnable)
override def reportFailure(t: Throwable): Unit = executionContext.reportFailure(t)
override def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
executionContext.execute(r)
Cancelable.empty
}
override def scheduleWithFixedDelay(initialDelay: Long, delay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
???
}
override def scheduleAtFixedRate(initialDelay: Long, period: Long, unit: TimeUnit, r: Runnable): Cancelable = {
???
}
override def currentTimeMillis(): Long = System.currentTimeMillis()
override def executionModel: ExecutionModel = ExecutionModel.SynchronousExecution
override def withExecutionModel(em: ExecutionModel): Scheduler = ???
}
}
开发者ID:edmundnoble,项目名称:slate,代码行数:35,代码来源:QQAsyncTestSuite.scala
示例2: ReactiveReact
//设置package包名称以及导入依赖的类
package slate
package views
import cats.Monoid
import japgolly.scalajs.react.{CallbackTo, Children}
import japgolly.scalajs.react.component.ScalaBuilder
import monix.execution.{CancelableFuture, Scheduler}
import monix.reactive.Observable
object ReactiveReact {
case class ReactiveState[ST](reactivePart: ST, cancelableFuture: CancelableFuture[Unit])
// todo: cancel on willUnmount
implicit class ReactiveOps[P, ST, B](val builder: ScalaBuilder.Step4[P, Children.None, ST, B]) {
@inline final def reactiveReplaceL[R]
(getReactivePart: P => Observable[R], setReactivePart: (ST, R) => ST)
(implicit sch: Scheduler): ScalaBuilder.Step4[P, Children.None, ST, B] =
builder.componentWillMount($ =>
CallbackTo.lift { () =>
val _ = getReactivePart($.props).foreach { r =>
val () = $.modState(setReactivePart(_, r)).runNow()
}
}
)
@inline final def reactiveReplace
(implicit sch: Scheduler,
propsAreReactiveEv: P =:= Observable[ST]
): ScalaBuilder.Step4[P, Children.None, ST, B] =
reactiveReplaceL[ST](propsAreReactiveEv, (_, r) => r)
@inline final def reactiveMonoio
(implicit sch: Scheduler, ST: Monoid[ST],
propsAreReactiveEv: P =:= Observable[ST]
): ScalaBuilder.Step4[P, Children.None, ST, B] =
builder.componentWillMount($ =>
CallbackTo.lift { () =>
val _ = $.props.foldLeftF(ST.empty)(ST.combine).foreach { r =>
val () = $.setState(r).runNow()
}
}
)
}
}
开发者ID:edmundnoble,项目名称:slate,代码行数:48,代码来源:ReactiveReact.scala
示例3: BlockingIO
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder.util
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{Executors, ThreadFactory}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Promise, blocking}
import scala.util.control.NonFatal
import monix.execution.Scheduler
// https://github.com/alexandru/scala-best-practices/blob/master/sections/4-concurrency-parallelism.md
object BlockingIO {
private val ioThreadPool = Scheduler.io(name = "io-thread")
def future[T](t: => T): Future[T] = {
val p = Promise[T]()
val runnable = new Runnable {
def run() = try {
p.success(blocking(t))
} catch {
case NonFatal(ex) => p.failure(ex)
}
}
ioThreadPool.execute(runnable)
p.future
}
}
开发者ID:walfie,项目名称:gbf-raidfinder,代码行数:29,代码来源:BlockingIO.scala
示例4: getObservable
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder.util
import akka.agent.Agent
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive._
import monix.reactive.observables.GroupedObservable
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.PublishSubject
import scala.concurrent.{ExecutionContext, Future}
trait ObservablesPartitioner[K, V] {
def getObservable(key: K): Observable[V]
}
object CachedObservablesPartitioner {
def fromUngroupedObservable[K, InputV, OutputV](
observable: Observable[InputV],
cacheSizePerKey: Int,
keySelector: InputV => K,
mappingFunction: InputV => OutputV
)(implicit scheduler: Scheduler): (CachedObservablesPartitioner[K, InputV, OutputV], Cancelable) = {
val partitioner = new CachedObservablesPartitioner[K, InputV, OutputV](cacheSizePerKey, mappingFunction)
val cancelable = observable.groupBy(keySelector).subscribe(partitioner)
(partitioner, cancelable)
}
}
class CachedObservablesPartitioner[K, InputV, OutputV](
cacheSizePerKey: Int, mappingFunction: InputV => OutputV
)(implicit ec: ExecutionContext)
extends Observer[GroupedObservable[K, InputV]] with ObservablesPartitioner[K, OutputV] {
private val observablesByKey = Agent[Map[K, Observable[OutputV]]](Map.empty)
private val incomingKeys = PublishSubject[K]()
def onComplete(): Unit = {
incomingKeys.onComplete()
}
def onError(e: Throwable): Unit = {
System.err.println(e) // TODO: Better logging?
incomingKeys.onError(e)
}
def getObservable(key: K): Observable[OutputV] = {
observablesByKey.get.getOrElse(
key,
incomingKeys.findF(_ == key).flatMap(_ => getObservable(key))
)
}
}
开发者ID:walfie,项目名称:gbf-raidfinder,代码行数:53,代码来源:ObservablesPartitioner.scala
示例5: get
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder
import akka.agent.Agent
import java.util.Date
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive._
import monix.reactive.subjects.ConcurrentSubject
import scala.concurrent.{ExecutionContext, Future}
import walfie.gbf.raidfinder.domain._
trait KnownBossesMap {
def get(): Map[BossName, RaidBoss]
def newBossObservable(): Observable[RaidBoss]
class KnownBossesObserver(
initialBosses: Seq[RaidBoss]
)(implicit scheduler: Scheduler) extends Observer[RaidInfo] with KnownBossesMap {
private val agent = Agent[Map[BossName, RaidBoss]](
initialBosses.map(boss => boss.name -> boss)(scala.collection.breakOut)
)
// TODO: Write test for this
private val subject = ConcurrentSubject.publish[RaidBoss]
val newBossObservable: Observable[RaidBoss] = subject
def onComplete(): Unit = ()
def onError(e: Throwable): Unit = ()
def onNext(elem: RaidInfo): Future[Ack] = {
val name = elem.tweet.bossName
val raidBoss = elem.boss
if (!agent.get.isDefinedAt(name)) {
subject.onNext(raidBoss)
}
agent.alter(_.updated(name, raidBoss)).flatMap(_ => Ack.Continue)
}
def get(): Map[BossName, RaidBoss] = agent.get()
def purgeOldBosses(
minDate: Date,
levelThreshold: Option[Int]
): Future[Map[BossName, RaidBoss]] = {
agent.alter(_.filter {
case (name, boss) => boss.lastSeen.after(minDate) || levelThreshold.exists(boss.level >= _)
})
}
}
开发者ID:walfie,项目名称:gbf-raidfinder,代码行数:48,代码来源:KnownBossesMap.scala
示例6: observable
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder
import monix.execution.Scheduler
import monix.reactive._
import monix.reactive.subjects.ConcurrentSubject
import twitter4j._
// TODO: Write tests
trait TwitterStreamer {
def observable: Observable[Status]
}
object TwitterStreamer {
// For English tweets, we can just search "I need backup!Battle ID:"
// For Japanese tweets we can't just use "??????" since words must be
// space-separated, and the only space-separated thing is the boss level.
val DefaultFilterTerms = (15 to 150 by 5).map("Lv" + _) :+ "I need backup!Battle ID:"
def apply(
twitterStream: twitter4j.TwitterStream = TwitterStreamFactory.getSingleton,
filterTerms: Seq[String] = DefaultFilterTerms
)(implicit scheduler: Scheduler): Twitter4jStreamer =
new Twitter4jStreamer(twitterStream, filterTerms)
}
class Twitter4jStreamer(
twitterStream: twitter4j.TwitterStream,
filterTerms: Seq[String]
)(implicit scheduler: Scheduler) extends TwitterStreamer {
private val subject = ConcurrentSubject.publish[Status]
val observable: Observable[Status] = subject
private val listener = new StatusAdapter() {
override def onStatus(status: Status): Unit = subject.onNext(status)
override def onException(e: Exception): Unit = println(e) // TODO: Better error handling
}
twitterStream.addListener(listener)
twitterStream.filter(new FilterQuery(filterTerms: _*))
}
开发者ID:walfie,项目名称:gbf-raidfinder,代码行数:41,代码来源:TwitterStreamer.scala
示例7: WebsocketController
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder.server.controller
import akka.actor._
import akka.stream.scaladsl.Flow
import akka.stream.{Materializer, OverflowStrategy}
import monix.execution.Scheduler
import play.api.http.websocket.Message
import play.api.libs.streams._
import play.api.mvc._
import play.api.mvc.WebSocket.MessageFlowTransformer
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import walfie.gbf.raidfinder.domain._
import walfie.gbf.raidfinder.protocol._
import walfie.gbf.raidfinder.RaidFinder
import walfie.gbf.raidfinder.server.actor.WebsocketRaidsHandler
import walfie.gbf.raidfinder.server.util.MessageFlowTransformerUtil
import walfie.gbf.raidfinder.server.{BossNameTranslator, MetricsCollector}
class WebsocketController(
raidFinder: RaidFinder[BinaryProtobuf],
translator: BossNameTranslator,
keepAliveInterval: FiniteDuration,
metricsCollector: MetricsCollector
)(implicit system: ActorSystem, materializer: Materializer, scheduler: Scheduler) extends Controller {
private val jsonTransformer = MessageFlowTransformerUtil.protobufJsonMessageFlowTransformer
private val binaryTransformer = MessageFlowTransformerUtil.protobufBinaryMessageFlowTransformer
private val defaultTransformer = jsonTransformer
val flow = ActorFlow.actorRef(props = props)
transformer.transform(flow)
}
case None => Left {
val unsupportedProtocols = requestedProtocols.mkString("[", ", ", "]")
Results.BadRequest("Unsupported websocket subprotocols " + unsupportedProtocols)
}
}
Future.successful(result)
}
}
开发者ID:walfie,项目名称:gbf-raidfinder,代码行数:43,代码来源:WebsocketController.scala
示例8: WebsocketController
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder.server.controller
import akka.actor._
import akka.stream.scaladsl.Flow
import akka.stream.{Materializer, OverflowStrategy}
import monix.execution.Scheduler
import play.api.http.websocket.Message
import play.api.libs.streams._
import play.api.mvc._
import play.api.mvc.WebSocket.MessageFlowTransformer
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import walfie.gbf.raidfinder.domain._
import walfie.gbf.raidfinder.protocol._
import walfie.gbf.raidfinder.RaidFinder
import walfie.gbf.raidfinder.server.actor.WebsocketRaidsHandler
import walfie.gbf.raidfinder.server.util.MessageFlowTransformerUtil
import walfie.gbf.raidfinder.server.{BossNameTranslator, MetricsCollector}
class WebsocketController(
raidFinder: RaidFinder[ResponseMessage],
translator: BossNameTranslator,
keepAliveInterval: FiniteDuration,
metricsCollector: MetricsCollector
)(implicit system: ActorSystem, materializer: Materializer, scheduler: Scheduler) extends Controller {
private val jsonTransformer = MessageFlowTransformerUtil.protobufJsonMessageFlowTransformer
private val binaryTransformer = MessageFlowTransformerUtil.protobufBinaryMessageFlowTransformer
private val defaultTransformer = jsonTransformer
val flow = ActorFlow.actorRef(props = props)
transformer.transform(flow)
}
case None => Left {
val unsupportedProtocols = requestedProtocols.mkString("[", ", ", "]")
Results.BadRequest("Unsupported websocket subprotocols " + unsupportedProtocols)
}
}
Future.successful(result)
}
}
开发者ID:Einhalkzt,项目名称:GBF-RAid-finder,代码行数:43,代码来源:WebsocketController.scala
示例9: SupervisorActor
//设置package包名称以及导入依赖的类
package my.samples.core
import akka.actor.{ Actor, Props }
import com.typesafe.scalalogging.LazyLogging
import monix.execution.Scheduler
import monix.execution.cancelables.CompositeCancelable
import monix.reactive.observables.ConnectableObservable
import my.samples.observables.{ MyConnectableObservable, MyObservable }
import my.samples.observers.MyObserver
import my.samples.models.MyMessages.{ Destroy, Init, Tick }
import my.samples.services.ZombieConnectorService
class SupervisorActor(globalChannel: GlobalOutputChannel)(implicit s: Scheduler) extends Actor with LazyLogging {
private[this] val subscriptions = CompositeCancelable()
override def preStart = {
logger.info(s"starting Supervisor Actor [$self]")
self ! Init
}
override def postStop = {
subscriptions.cancel()
logger.info(s"cancelling all subscriptions :: isCancelled ${subscriptions.isCanceled}")
}
private def init(): Seq[ConnectableObservable[Long]] = {
// 1. our Observables
val myObservable = MyObservable.apply
val myConnectableObservable = MyConnectableObservable.apply(ZombieConnectorService.apply)
// 2. our Subscribers (Observers with a Scheduler)
val mySubscriber = MyObserver.apply(self, "hot-subscriber")
val myConnectableSubscriber = MyObserver.apply(self, "cold-subscriber")
// 3. marry the Observers and the Observables
subscriptions += myObservable.unsafeSubscribeFn(mySubscriber)
subscriptions += myConnectableObservable.unsafeSubscribeFn(myConnectableSubscriber)
// 4. return a reference to all the connectables
Seq(myConnectableObservable)
}
override def receive: Receive = {
case Init =>
init().foreach(elem => subscriptions += elem.connect())
case tick: Tick =>
// TODO: is this a good practice? exposing the internals of the GlobalChannel ???
globalChannel.publishChannel.onNext(tick)
case Destroy =>
subscriptions.cancel()
}
}
object SupervisorActor {
implicit val s = monix.execution.Scheduler.Implicits.global
def props(globalChannel: GlobalOutputChannel) = Props(new SupervisorActor(globalChannel))
}
开发者ID:joesan,项目名称:monix-samples,代码行数:58,代码来源:SupervisorActor.scala
示例10: MyConnectableObservable
//设置package包名称以及导入依赖的类
package my.samples.observables
import com.typesafe.scalalogging.LazyLogging
import monix.execution.{ Cancelable, Scheduler }
import monix.execution.cancelables.{ BooleanCancelable, SingleAssignmentCancelable }
import monix.reactive.Observable
import monix.reactive.observables.ConnectableObservable
import monix.reactive.observers.Subscriber
import my.samples.services.ZombieConnectorService
import scala.concurrent.duration._
class MyConnectableObservable(service: ZombieConnectorService)(implicit s: Scheduler) extends ConnectableObservable[Long] with LazyLogging {
private[this] val connection = SingleAssignmentCancelable()
private val serviceName = service.getClass.getName
override def connect(): Cancelable = {
logger.info(s"connecting to the service $serviceName")
// 1. we connect to the service first
service.connect()
// 2. we register a callback that says what to do when we disconnect
connection := BooleanCancelable { () =>
service.disconnect()
}
connection
}
def close() = {
logger.info(s"shutting down connection to service $serviceName")
connection.cancel()
}
override def unsafeSubscribeFn(subscriber: Subscriber[Long]): Cancelable =
Observable.interval(1.second).subscribe(subscriber)
}
object MyConnectableObservable {
def apply(service: ZombieConnectorService)(implicit s: Scheduler) =
new MyConnectableObservable(service)
}
开发者ID:joesan,项目名称:monix-samples,代码行数:44,代码来源:MyConnectableObservable.scala
示例11: MyObserver
//设置package包名称以及导入依赖的类
package my.samples.observers
import akka.actor.ActorRef
import monix.execution.Ack.Continue
import monix.execution.{ Ack, Scheduler }
import monix.reactive.observers.Subscriber
import my.samples.models.MyMessages
import my.samples.models.MyMessages.Tick
import org.slf4j.LoggerFactory
import scala.concurrent.Future
class MyObserver(actorRef: ActorRef, sourceName: String)(implicit s: Scheduler) extends Subscriber[Long] {
private[this] def logger = LoggerFactory.getLogger(this.getClass)
override implicit def scheduler: Scheduler = s
override def onError(ex: Throwable): Unit =
logger.error(s"error happened when processing the stream: error message << ${ex.getMessage} >>")
override def onComplete(): Unit =
logger.info("stream completed")
override def onNext(elem: Long): Future[Ack] = {
logger.info(s"message received from source $sourceName --> $elem")
actorRef ! Tick(sourceName, elem)
Continue
}
}
object MyObserver {
def apply(actorRef: ActorRef, sourceName: String)(implicit s: Scheduler) = {
new MyObserver(actorRef, sourceName)(s)
}
}
开发者ID:joesan,项目名称:monix-samples,代码行数:36,代码来源:MyObserver.scala
示例12: apply
//设置package包名称以及导入依赖的类
package service.reflection
import javax.inject.Singleton
import com.google.inject.ImplementedBy
import monix.execution.Scheduler
import org.scalatest.Suite
@ImplementedBy(classOf[ScalaRuntimeRunner])
trait RuntimeSuiteExecutor {
def apply(suiteClass: Class[Suite], solutionTrait: Class[AnyRef], solution: String)
(channel: String => Unit)
(implicit s: Scheduler): Unit
}
@Singleton
class ScalaRuntimeRunner extends RuntimeSuiteExecutor with SuiteExecution with SuiteToolbox {
def apply(suiteClass: Class[Suite], solutionTrait: Class[AnyRef], solution: String)
(channel: String => Unit)
(implicit s: Scheduler): Unit = {
val solutionInstance = createSolutionInstance(solution, solutionTrait)
executionTestSuite(suiteClass.getConstructor(solutionTrait).newInstance(solutionInstance), channel)
}
def createSolutionInstance(solution: String, solutionTrait: Class[AnyRef]): AnyRef = {
val patchedSolution = classDefPattern.replaceFirstIn(solution, s"class $userClass extends ${solutionTrait.getSimpleName} ")
val dynamicCode = s"import ${solutionTrait.getName}; $patchedSolution; new $userClass"
tb.eval(tb.parse(dynamicCode)).asInstanceOf[AnyRef]
}
}
开发者ID:DmytroOrlov,项目名称:devgym,代码行数:33,代码来源:ScalaRuntimeRunner.scala
示例13: apply
//设置package包名称以及导入依赖的类
package service.reflection
import javax.inject.Singleton
import com.google.inject.ImplementedBy
import monix.execution.Scheduler
@ImplementedBy(classOf[ScalaDynamicRunner])
trait DynamicSuiteExecutor {
def apply(solution: String, suite: String, solutionTrait: String)
(channel: String => Unit)
(implicit s: Scheduler): Unit
}
@Singleton
class ScalaDynamicRunner extends DynamicSuiteExecutor with DynamicExecution {
def apply(solution: String, suite: String, solutionTrait: String)
(channel: String => Unit)
(implicit s: Scheduler): Unit = {
classDefPattern.findFirstIn(solution)
//TODO: replace exception message to messageKey
.orElse(throw new RuntimeException(s"There is no class definition in solution code: $solution"))
//TODO: user may already provide extends from solutionTrait, so we may add redundant extends which will validation
val patchedSolution = classDefPattern.replaceFirstIn(solution, s"class $userClass extends $solutionTrait ")
executeDynamic(suite, patchedSolution, channel)
}
}
开发者ID:DmytroOrlov,项目名称:devgym,代码行数:31,代码来源:ScalaDynamicRunner.scala
示例14: SubjectSubscription
//设置package包名称以及导入依赖的类
package com.hypertino.hyperbus.util
import com.hypertino.hyperbus.transport.api.matchers.RequestMatcher
import monix.eval.Task
import monix.execution.Ack.Stop
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.{ConcurrentSubject, Subject}
import scala.util.Success
abstract class SubjectSubscription[T](implicit val scheduler: Scheduler) extends FuzzyMatcher {
type eventType = T
// FyzzyIndex properties
def requestMatcher: RequestMatcher
override def indexProperties: Seq[FuzzyIndexItemMetaInfo] = requestMatcher.indexProperties
override def matches(other: Any): Boolean = requestMatcher.matches(other)
// Subject properties
protected val subject: Subject[eventType, eventType]
def cancel(): Unit = {
remove()
subject.onComplete()
}
def publish(t: eventType): Task[Ack] = {
Task.fromFuture(subject.onNext(t).andThen {
case Success(Stop) ? remove()
})
}
private def cancel_1() = cancel()
val observable: Observable[eventType] = new Observable[eventType] {
override def unsafeSubscribeFn(subscriber: Subscriber[eventType]): Cancelable = {
val original: Cancelable = subject.unsafeSubscribeFn(subscriber)
add()
new Cancelable {
override def cancel(): Unit = {
cancel_1()
original.cancel()
}
}
}
}
protected def remove(): Unit
protected def add(): Unit
}
开发者ID:hypertino,项目名称:hyperbus,代码行数:52,代码来源:SubjectSubscription.scala
示例15: SchedulerInjector
//设置package包名称以及导入依赖的类
package com.hypertino.hyperbus.util
import monix.execution.Scheduler
import scaldi.{Injectable, Injector}
class SchedulerInjector(implicit val inj: Injector) extends Injectable{
def scheduler(name: Option[String]): Scheduler = name match {
case None ? inject[Scheduler]
case Some(s) ? inject[Scheduler] (identified by s and by default inject[Scheduler])
}
}
object SchedulerInjector {
def apply(name: Option[String])(implicit inj: Injector) : Scheduler = {
new SchedulerInjector()
.scheduler(name)
}
}
开发者ID:hypertino,项目名称:hyperbus,代码行数:19,代码来源:SchedulerInjector.scala
示例16: HttpDriver
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp
import fr.hmil.roshttp.node.Modules.{HttpModule, HttpsModule}
import fr.hmil.roshttp.response.{HttpResponse, HttpResponseFactory}
import monix.execution.Scheduler
import scala.concurrent.Future
private object HttpDriver extends DriverTrait {
private var _driver: Option[DriverTrait] = None
def send[T <: HttpResponse](req: HttpRequest, factory: HttpResponseFactory[T])(implicit scheduler: Scheduler):
Future[T] = {
_driver.getOrElse(chooseBackend()).send(req, factory)
}
private def chooseBackend(): DriverTrait = {
if (HttpModule.isAvailable && HttpsModule.isAvailable) {
_driver = Some(NodeDriver)
} else {
_driver = Some(BrowserDriver)
}
_driver.get
}
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:27,代码来源:HttpDriver.scala
示例17: MultiPartBody
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.body
import java.nio.ByteBuffer
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.util.Random
class MultiPartBody(parts: Map[String, BodyPart], subtype: String = "form-data")(implicit scheduler: Scheduler)
extends BodyPart {
val boundary = "----" + Random.alphanumeric.take(24).mkString.toLowerCase
override def contentType: String = s"multipart/$subtype; boundary=$boundary"
override def content: Observable[ByteBuffer] = {
parts.
// Prepend multipart encapsulation boundary and body part headers to
// each body part.
map({ case (name, part) =>
ByteBuffer.wrap(
("\r\n--" + boundary + "\r\n" +
"Content-Disposition: form-data; name=\"" + name + "\"\r\n" +
s"Content-Type: ${part.contentType}\r\n" +
"\r\n").getBytes("utf-8")
) +: part.content
}).
// Join body parts
reduceLeft((acc, elem) => acc ++ elem).
// Append the closing boundary
:+(ByteBuffer.wrap(s"\r\n--$boundary--\r\n".getBytes("utf-8")))
}
}
object MultiPartBody {
def apply(parts: (String, BodyPart)*)(implicit scheduler: Scheduler): MultiPartBody =
new MultiPartBody(Map(parts: _*))
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:41,代码来源:MultiPartBody.scala
示例18: StreamHttpResponse
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.response
import java.nio.ByteBuffer
import fr.hmil.roshttp.BackendConfig
import fr.hmil.roshttp.util.HeaderMap
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.concurrent.Future
class StreamHttpResponse(
val statusCode: Int,
val headers: HeaderMap[String],
val body: Observable[ByteBuffer])
extends HttpResponse
object StreamHttpResponse extends HttpResponseFactory[StreamHttpResponse] {
override def apply(
header: HttpResponseHeader,
bodyStream: Observable[ByteBuffer],
config: BackendConfig)
(implicit scheduler: Scheduler): Future[StreamHttpResponse] =
Future.successful(new StreamHttpResponse(header.statusCode, header.headers, bodyStream))
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:27,代码来源:StreamHttpResponse.scala
示例19: SimpleHttpResponse
//设置package包名称以及导入依赖的类
package fr.hmil.roshttp.response
import java.nio.ByteBuffer
import fr.hmil.roshttp.BackendConfig
import fr.hmil.roshttp.exceptions.ResponseException
import fr.hmil.roshttp.util.{HeaderMap, Utils}
import monix.execution.Scheduler
import monix.reactive.Observable
import scala.collection.mutable
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success}
class SimpleHttpResponse(
val statusCode: Int,
val headers: HeaderMap[String],
val body: String)
extends HttpResponse
object SimpleHttpResponse extends HttpResponseFactory[SimpleHttpResponse] {
override def apply(
header: HttpResponseHeader,
bodyStream: Observable[ByteBuffer],
config: BackendConfig)
(implicit scheduler: Scheduler): Future[SimpleHttpResponse] = {
val charset = Utils.charsetFromContentType(header.headers.getOrElse("content-type", null))
val buffers = mutable.Queue[ByteBuffer]()
val promise = Promise[SimpleHttpResponse]()
val streamCollector = bodyStream.
foreach(elem => buffers.enqueue(elem)).
map({_ =>
val body = recomposeBody(buffers, config.maxChunkSize, charset)
new SimpleHttpResponse(header.statusCode, header.headers, body)
})
streamCollector.onComplete({
case res:Success[SimpleHttpResponse] =>
promise.trySuccess(res.value)
case e:Failure[_] =>
promise.tryFailure(new ResponseException(e.exception, header))
})
promise.future
}
private def recomposeBody(seq: mutable.Queue[ByteBuffer], maxChunkSize: Int, charset: String): String = {
// Allocate maximum expected body length
val buffer = ByteBuffer.allocate(seq.length * maxChunkSize)
val totalBytes = seq.foldLeft(0)({ (count, chunk) =>
buffer.put(chunk)
count + chunk.limit
})
buffer.limit(totalBytes)
Utils.getStringFromBuffer(buffer, charset)
}
}
开发者ID:hmil,项目名称:RosHTTP,代码行数:61,代码来源:SimpleHttpResponse.scala
示例20: Interact
//设置package包名称以及导入依赖的类
package fakeio
import cats.Id
import monix.eval.Task
import monix.execution.Scheduler
abstract class Interact[F[_]] {
def tell(msg: String): F[Unit]
def ask(prompt: String): F[String]
}
object Interact {
implicit def async(implicit scheduler: Scheduler): Interact[Task] =
new Interact[Task] {
override def tell(msg: String): Task[Unit] = Task(println(msg))
override def ask(prompt: String): Task[String] =
Task {
val result = io.StdIn.readLine(prompt)
println("")
result
}
}
implicit val sync: Interact[Id] =
new Interact[Id] {
override def tell(msg: String): Id[Unit] = println(msg)
override def ask(prompt: String): Id[String] = {
val result = io.StdIn.readLine(prompt)
println("")
result
}
}
def fake(_stdin: List[String]): Interact[FakeIo] =
new Interact[FakeIo] {
private var stdin = _stdin
private var stdout: List[String] = List.empty
override def tell(msg: String): FakeIo[Unit] = {
stdout = msg :: stdout
FakeIo(stdin, stdout, ())
}
override def ask(prompt: String): FakeIo[String] = {
val result = stdin.head
stdin = stdin.tail
stdout = prompt :: stdout
FakeIo(stdin, stdout, result)
}
}
}
开发者ID:yawaramin,项目名称:fake-io,代码行数:55,代码来源:Interact.scala
注:本文中的monix.execution.Scheduler类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论