• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ThrottleMode类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.ThrottleMode的典型用法代码示例。如果您正苦于以下问题:Scala ThrottleMode类的具体用法?Scala ThrottleMode怎么用?Scala ThrottleMode使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ThrottleMode类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: SimulateWindTurbines

//设置package包名称以及导入依赖的类
package sample.stream_actor

import akka.actor.ActorSystem
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import sample.WindTurbineSimulator

import scala.concurrent.duration._


object SimulateWindTurbines extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val endpoint = "ws://127.0.0.1:8080"
  val numberOfTurbines = 5
  Source(1 to numberOfTurbines)
    .throttle(
      elements = 100, //number of elements to be taken from bucket
      per = 1.second,
      maximumBurst = 100,  //capacity of bucket
      mode = ThrottleMode.shaping
    )
    .map { _ =>
      val id = java.util.UUID.randomUUID.toString

      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          WindTurbineSimulator.props(id, endpoint),
          childName = id,
          minBackoff = 1.second,
          maxBackoff = 30.seconds,
          randomFactor = 0.2
        ))

      system.actorOf(supervisor, name = s"$id-backoff-supervisor")
    }
    .runWith(Sink.ignore)
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:41,代码来源:SimulateWindTurbines.scala


示例2: PerpetualStreamTest

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils

import akka.Done
import akka.actor.{ActorSystem, Props}
import akka.stream.scaladsl.{Concat, Keep, Sink, Source}
import akka.stream.{KillSwitches, ThrottleMode}
import akka.testkit.{TestKit, TestProbe}
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Millis, Seconds, Span}

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration._

class PerpetualStreamTest extends TestKit(ActorSystem()) with FlatSpecLike with Matchers with Eventually with BeforeAndAfterAll {
  implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(100, Millis))

  "PerpetualStream" should "work" in {
    val list = ListBuffer.empty[Int]
    val stream = system.actorOf(Props(new NumberGeneratingPerpetualStream(list)))
    val probe = new TestProbe(system)
    probe.watch(stream)

    eventually(require(list.length >= 20, "Must have emitted at least 20 numbers"))
    list.take(20) should be((-10 until 10).toList)

    GracefulShutdownExtension(system).triggerGracefulShutdown()
    probe.expectTerminated(stream)
  }

  override def afterAll(): Unit = TestKit.shutdownActorSystem(system)

  class NumberGeneratingPerpetualStream(list: ListBuffer[Int]) extends PerpetualStream {
    var count = 0

    override def stream = {
      val currentCount = count
      count = count + 1

      val source = currentCount match {
        case 0 => Source.combine(Source(-10 until 0), Source.failed[Int](new RuntimeException("ERROR")))(Concat(_))
        case 1 => Source(0 until 10)
        case _ => Source(10 to 100000).throttle(1000, 100.millis, 100, ThrottleMode.shaping)
      }

      source
        .via(watcher)
        .viaMat(KillSwitches.single)(Keep.right)
        .map { i => list.append(i); i }
        .to(Sink.ignore)
        .mapMaterializedValue(ks => () => {
          ks.shutdown()
          Future.successful(Done)
        })
    }
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:59,代码来源:PerpetualStreamTest.scala


示例3: StreamActor

//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}

import scala.concurrent.duration.Duration
import scala.io.Source.fromFile

class StreamActor extends Actor {
  override def receive: Receive = {
    case StreamBook(filename) =>
      val materializer = ActorMaterializer.create(context)  // Materializing and running a stream always requires a Materializer to be in implicit scope.
      val sink = Source
        .actorRef(1000, OverflowStrategy.dropNew)           // If the buffer is full when a new element arrives, drops the new element.
        .throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping)   // throttle - to slow down the stream to 1 element per second.
        .to(Sink.actorRef(sender, NotUsed))                 // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
        .run()(materializer)
      val lines = fromFile(filename).getLines
      lines.foreach(line => sink ! StreamLine(line))
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Restart
    }
}

case class StreamBook(fileName: String)
case class StreamLine(line: String) 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala


示例4: backPressureRequestDetailsUsing

//设置package包名称以及导入依赖的类
package services

import akka.Done
import akka.stream.scaladsl.Source
import akka.stream.{Materializer, ThrottleMode}
import models.pokemon.PokemonName

import scala.concurrent.{ExecutionContext, Future}

trait BackPressureCalls {

  import scala.concurrent.duration._

  def backPressureRequestDetailsUsing(maxPerSecond: Int)
                                     (elements: List[PokemonName])
                                     (runForEach: PokemonName => Unit)
                                     (implicit ex: ExecutionContext, mat: Materializer): Future[Done] = {
    Source(elements)
      .throttle(maxPerSecond, 1.second, 1, ThrottleMode.Shaping)
      .runForeach(runForEach(_))
  }

} 
开发者ID:fagossa,项目名称:poc_rest,代码行数:24,代码来源:BackPressureCalls.scala


示例5: throttolableConsumerFlow

//设置package包名称以及导入依赖的类
package com.github.everpeace

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ThrottleMode}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration.Duration

package object reactive_kafka {

  def throttolableConsumerFlow(implicit system: ActorSystem, mat: ActorMaterializer): Source[Done, Consumer.Control] = {
    val c = system.settings.config.getConfig("throttolable-consumer")
    implicit val ec = system.dispatcher

    val bootstrapServers = c getString "bootstrap-servers"
    val topic = c getString "topic"
    val autoRestConfig = c getString "auto-offset-reset"
    val groupId = c getString "group-id"
    val throttle = c getInt "throttle"
    val throttlePer = Duration.fromNanos((c getDuration "throttle-per").toNanos)
    val throttleBurst = c getInt "throttle-burst"
    val logPer = c getInt "log-per"
    val offsetCommitBatchSize = c getInt "offset-commit-batch-size"
    val offsetCommitParallelism = c getInt "offset-commit-parallelism"

    val consumerSettings =
      ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
        .withBootstrapServers(bootstrapServers)
        .withGroupId(groupId)
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoRestConfig)

    val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(topic)))

    val throttled = if (throttle > 0)
      source.throttle(throttle, throttlePer, throttleBurst, ThrottleMode.shaping)
    else source

    throttled.statefulMapConcat(() => {
      var counter = 0
      msg => {
        if (counter % logPer == 0) {
          system.log.info(s"FakeConsumer consume: $msg")
          counter = 0
        }
        counter += 1
        msg :: Nil
      }
    }).batch(max = offsetCommitBatchSize, m => CommittableOffsetBatch.empty.updated(m.committableOffset))((batch, m) => batch.updated(m.committableOffset))
      .mapAsync(offsetCommitParallelism) { batch =>
      batch.commitScaladsl()
    }
  }
} 
开发者ID:everpeace,项目名称:throttolable-perf-consumer,代码行数:60,代码来源:package.scala


示例6: Demo2

//设置package包名称以及导入依赖的类
package lew.bing.akka.stream


import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ThrottleMode}

import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}


object Demo2 {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()
    val source:Source[Int,Promise[Option[Int]]] = Source.maybe[Int]
    //????
//    val flow: Flow[Int, Int, Cancellable] = Throttler.RateInt(1)
    val cancellable = new Cancellable {
      private var cancelled = false

      override def cancel(): Boolean = {
        cancelled = true
        system.terminate()
        isCancelled
      }

      override def isCancelled: Boolean = cancelled
    }
    val flow:Flow[Int, Int, Cancellable] = Flow.fromProcessorMat(() => {
      (null, cancellable)
    }).map(o => 1).throttle(1, 1.second, 1, ThrottleMode.Shaping)

    val sink:Sink[Int,Future[Int]] = Sink.head[Int]
    val r1:RunnableGraph[Promise[Option[Int]]] = source.via(flow).to(sink)
    val r2:RunnableGraph[Cancellable] = source.viaMat(flow)(Keep.right).to(sink)
    val r3 = source.via(flow).toMat(sink)(Keep.right)

    val r4 = source.via(flow).runWith(sink)
    val r5 = flow.to(sink).runWith(source)
    val r6 = flow.runWith(source,sink)



  }

} 
开发者ID:liuguobing634,项目名称:akka,代码行数:49,代码来源:Demo2.scala


示例7: ExplicitBuffer

//设置package包名称以及导入依赖的类
package code

import akka.stream.{OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl._

import scala.concurrent.Future
import scala.concurrent.duration._

object ExplicitBuffer extends AkkaStreamsApp {
  override def akkaStreamsExample: Future[_] =
    Source(1 to 100000000)
      .log("passing")
      .buffer(5, OverflowStrategy.backpressure)
      .throttle(1, 1 second, 1, ThrottleMode.shaping)
      .take(20)
      .runForeach(e => log.debug("received: {}", e))
  
  runExample
} 
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:20,代码来源:ExplicitBuffer.scala



注:本文中的akka.stream.ThrottleMode类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala AmazonS3类代码示例发布时间:2022-05-23
下一篇:
Scala ActorRefRoutee类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap