• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala OverflowStrategy类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.OverflowStrategy的典型用法代码示例。如果您正苦于以下问题:Scala OverflowStrategy类的具体用法?Scala OverflowStrategy怎么用?Scala OverflowStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了OverflowStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: WebSocketHandler

//设置package包名称以及导入依赖的类
package websocket

import akka.NotUsed
import akka.actor._
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import core.entities.AuthTokenContext
import websocket.ClientEndpoint._
import websocket.WebSocketHandler._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

class WebSocketHandler(processingRouter: ActorRef, connectedClientsStore: ActorRef, processingTimeout: FiniteDuration)
                      (implicit system: ActorSystem, mat: ActorMaterializer, apiDispatcher: ExecutionContext) {

  def clientEndpoint(ctx: AuthTokenContext): Flow[Message, Message, NotUsed] = {
    val clientEndpoint =
      system.actorOf(ClientEndpoint.props(ctx, processingRouter, connectedClientsStore, processingTimeout),
                     ClientEndpoint.name(ctx))

    val incomingMessages: Sink[Message, NotUsed] = Flow[Message]
      .map {
        case TextMessage.Strict(jsContent) => Some(IncomingMessage(jsContent))

        case ts: TextMessage.Streamed =>
          ts.textStream.runWith(Sink.ignore)
          None

        case br: BinaryMessage =>
          br.dataStream.runWith(Sink.ignore)
          None
      }
      .collect {
        case Some(message: IncomingMessage) => message
      }
      .to(Sink.actorRef[IncomingMessage](clientEndpoint, PoisonPill))

    val outgoingMessages: Source[Message, NotUsed] = Source
      .actorRef[OutgoingMessage](BUFFER_SIZE, OverflowStrategy.backpressure)
      .mapMaterializedValue { socket =>
        clientEndpoint ! Attach(socket)
        NotUsed
      }
      .map {
        case OutgoingMessage(jsContent) => TextMessage(jsContent)
      }

    Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
  }
}

object WebSocketHandler {
  private val BUFFER_SIZE: Int = 10
} 
开发者ID:lymr,项目名称:fun-chat,代码行数:57,代码来源:WebSocketHandler.scala


示例2: Flows

//设置package包名称以及导入依赖的类
package com.stulsoft.akka.stream

import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.typesafe.scalalogging.LazyLogging


object Flows extends App with LazyLogging {
  logger.info("start")

  // Explicitly creating and wiring up a Source, Sink and Flow
  Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_)))

  // Starting from a Source
  val source = Source(1 to 6).map(_ * 2)
  source.to(Sink.foreach(println(_)))

  // Starting from a Sink
  val sink: Sink[Int, NotUsed] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_)))
  Source(1 to 6).to(sink)

  // Broadcast to a sink inline
  val otherSink: Sink[Int, NotUsed] =
  Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
  Source(1 to 6).to(otherSink)

  val flowDoublingElements = Flow[Int].map(_ * 2)
  val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
  val flowBatchingElements = Flow[Int].grouped(10)
  val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // back-pressures the source if the buffer is full
  logger.info("end")
} 
开发者ID:ysden123,项目名称:poc,代码行数:34,代码来源:Flows.scala


示例3: Application

//设置package包名称以及导入依赖的类
package controllers

import javax.inject._

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, _}
import play.api.mvc._

@Singleton
class Application @Inject() (system: ActorSystem) extends Controller {

  val mediator = DistributedPubSub(system).mediator

  def socket = WebSocket.accept[String, String] { request =>
    new Chat(mediator, "chat").flow
  }
}

class Chat(mediator: ActorRef, topic: String) {

  def flow: Flow[String, String, NotUsed] =
    Flow.fromSinkAndSource(
      publishToMediatorSink,
      sourceFrom)

  private def publishToMediatorSink[In]: Sink[In, NotUsed] =
    Flow[In].map(DistributedPubSubMediator.Publish(topic, _)) to
      Sink.actorRef[DistributedPubSubMediator.Publish](mediator, ())

  private def sourceFrom[Out]: Source[Out, ActorRef] =
    Source
      .actorRef[Out](5, OverflowStrategy.fail)
      .mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }


} 
开发者ID:jonasanso,项目名称:play-simple-chat,代码行数:40,代码来源:Application.scala


示例4: TestSessionExporterSolr

//设置package包名称以及导入依赖的类
package org.efset

import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionSolrDataWriterComponent

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object TestSessionExporterSolr {

  def apply(query: String, maybeDateFilter: Option[Date]) = {
    val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
    val writer = new TestSessionSolrDataWriterComponent().dataWriter

    implicit val system = ActorSystem("test-sessions-export")
    implicit val materializer = ActorMaterializer()
    implicit val ec = ExecutionContext.global

    def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
      val maybeField = thing.fields.find(f => f.name == "create_date")

      maybeField match {
        case Some(ModelDate(_, dt)) => dt.after(endDate)
        case _ => false
      }
    }

    val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))

    val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
    val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
    val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
    val sink = Sink.ignore

    val total = new AtomicInteger()

    val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
      println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
      testSessions
    }

    val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()

    reader.read().foreach { thing =>
      flow ! thing
      Thread.sleep(ContextConfig.sleepLength)
    }

    println("Finished...")
  }
} 
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporterSolr.scala


示例5: TestSessionExporter

//设置package包名称以及导入依赖的类
package org.efset

import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionElasticsearchDataWriterComponent

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object TestSessionExporter {

  def apply(query: String, maybeDateFilter: Option[Date]) = {
    val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
    val writer = new TestSessionElasticsearchDataWriterComponent().dataWriter

    implicit val system = ActorSystem("test-sessions-export")
    implicit val materializer = ActorMaterializer()
    implicit val ec = ExecutionContext.global

    def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
      val maybeField = thing.fields.find(f => f.name == "create_date")

      maybeField match {
        case Some(ModelDate(_, dt)) => dt.after(endDate)
        case _ => false
      }
    }

    val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))

    val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
    val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
    val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
    val sink = Sink.ignore

    val total = new AtomicInteger()

    val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
      println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
      testSessions
    }

    val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()

    reader.read().foreach { thing =>
      flow ! thing
      Thread.sleep(ContextConfig.sleepLength)
    }

    println("Finished...")
  }
} 
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporter.scala


示例6: TestGroupExporterSolr

//设置package包名称以及导入依赖的类
package org.efset

import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestGroupCassandraDataReaderComponent
import org.efset.writer.TestGroupSolrDataWriterComponent

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object TestGroupExporterSolr {

  def apply(query: String, maybeDateFilter: Option[Date]) = {
    val reader = new TestGroupCassandraDataReaderComponent(query).dataReader
    val writer = new TestGroupSolrDataWriterComponent().dataWriter

    implicit val system = ActorSystem("test-groups-export")
    implicit val materializer = ActorMaterializer()
    implicit val ec = ExecutionContext.global

    def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
      val maybeField = thing.fields.find(f => f.name == "create_date")

      maybeField match {
        case Some(ModelDate(_, dt)) => dt.after(endDate)
        case _ => false
      }
    }

    val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))

    val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
    val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
    val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
    val sink = Sink.ignore

    val total = new AtomicInteger()

    val showGroup = Flow[Seq[ModelThing]].map { testGroups =>
      println(s"Processing ${testGroups.size} items. Batch number ${total.incrementAndGet()}")
      testGroups
    }

    val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()

    reader.read().foreach { thing =>
      flow ! thing
      Thread.sleep(ContextConfig.sleepLength)
    }

    println("Finished...")
  }
} 
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestGroupExporterSolr.scala


示例7: TestGroupExporter

//设置package包名称以及导入依赖的类
package org.efset

import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestGroupCassandraDataReaderComponent
import org.efset.writer.TestGroupElasticsearchDataWriterComponent

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object TestGroupExporter {

  def apply(query: String, maybeDateFilter: Option[Date]) = {
    val reader = new TestGroupCassandraDataReaderComponent(query).dataReader
    val writer = new TestGroupElasticsearchDataWriterComponent().dataWriter

    implicit val system = ActorSystem("test-groups-export")
    implicit val materializer = ActorMaterializer()
    implicit val ec = ExecutionContext.global

    def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
      val maybeField = thing.fields.find(f => f.name == "create_date")

      maybeField match {
        case Some(ModelDate(_, dt)) => dt.after(endDate)
        case _ => false
      }
    }

    val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))

    val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
    val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
    val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
    val sink = Sink.ignore

    val total = new AtomicInteger()

    val showGroup = Flow[Seq[ModelThing]].map { testGroups =>
      println(s"Processing ${testGroups.size} items. Batch number ${total.incrementAndGet()}")
      testGroups
    }

    val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()

    reader.read().foreach { thing =>
      flow ! thing
      Thread.sleep(ContextConfig.sleepLength)
    }

    println("Finished...")
  }
} 
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestGroupExporter.scala


示例8: KafkaWriter

//设置package包名称以及导入依赖的类
package wiii

import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}


object KafkaWriter {
    def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}

class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
    override def preStart(): Unit = initWriter()
    override def receive: Receive = {
        case _ =>
    }

    def initWriter(): Unit = {
        val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
            bootstrapServers = "localhost:9092",
            topic = topicName,
            valueSerializer = TweetSerializer
        ))
        val subscriber = context.actorOf(subscriberProps)
        val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

        val factory = new TwitterStreamFactory()
        val twitterStream = factory.getInstance()
        twitterStream.addListener(new StatusForwarder(actorRef))
        twitterStream.filter(new FilterQuery("espn"))

        Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
        .runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
    }
}

class StatusForwarder(publisher: ActorRef) extends StatusListener {
    def onStatus(status: Status): Unit = publisher ! status

    //\\ nop all the others for now  //\\
    def onStallWarning(warning: StallWarning): Unit = {}
    def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
    def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
    def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
    def onException(ex: Exception): Unit = {}
} 
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala


示例9: self

//设置package包名称以及导入依赖的类
package com.omearac.producers

import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}



trait ProducerStream extends AkkaStreams with EventSourcing {
    implicit val system: ActorSystem
    def self: ActorRef

    def createStreamSource[msgType] = {
        Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
    }

    def createStreamSink(producerProperties: Map[String, String]) = {
        val kafkaMBAddress = producerProperties("bootstrap-servers")
        val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)

        Producer.plainSink(producerSettings)
    }

    def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
        val numberOfPartitions = producerProperties("num.partitions").toInt -1
        val topicToPublish = producerProperties("publish-topic")
        val rand = new scala.util.Random
        val range = 0 to numberOfPartitions

        Flow[msgType].map { msg =>
            val partition = range(rand.nextInt(range.length))
            val stringJSONMessage = Conversion[msgType].convertToJson(msg)
            new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
        }
    }
} 
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala


示例10: RemoteBot

//设置package包名称以及导入依赖的类
package im.actor.botkit

import java.net.URLEncoder

import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import akka.util.Timeout
import im.actor.bots.BotMessages
import im.actor.concurrent.ActorFutures
import upickle.default._

import scala.concurrent.duration._

object RemoteBot {
  val DefaultEndpoint = "wss://api.actor.im"

  private object StreamComplete
}

abstract class RemoteBot(token: String, endpoint: String) extends BotBase with ActorFutures {

  import BotMessages._
  import RemoteBot._

  override protected implicit val timeout: Timeout = Timeout(30.seconds)
  private implicit val mat = ActorMaterializer()

  initFlow()

  def onReceive(message: Object): Unit = {}

  def receive: Receive = internalReceive orElse {
    case message ?
      onReceive(message.asInstanceOf[Object])
  }

  override protected def onStreamFailure(cause: Throwable): Unit = {
    log.error(cause, "Stream failure")
    initFlow()
  }

  private final def internalReceive: Receive = workingBehavior.orElse({
    case StreamComplete ?
      log.warning("Disconnected, reinitiating flow")
      initFlow()
  })

  private def initFlow(): Unit = {
    val (wsSource, wsSink) = WebsocketClient.sourceAndSink(s"$endpoint/v1/bots/${URLEncoder.encode(token, "UTF-8")}")

    wsSource.map(read[BotMessageOut]).to(Sink.actorRef(self, StreamComplete)).run()

    val rqSource = Source.actorRef(bufferSize = 100, overflowStrategy = OverflowStrategy.fail)
      .map(write[BotRequest])
      .to(wsSink)
      .run()

    setRqSource(rqSource)
  }
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:61,代码来源:RemoteBot.scala


示例11: configChanges

//设置package包名称以及导入依赖的类
package net.flatmap.vscode.languageserver
import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import cats.data.Xor
import io.circe.{Decoder, Json}

trait Configuration[T] extends LanguageServer {
  private var configQueues = Set.empty[SourceQueueWithComplete[Json]]

  def configChanges(implicit decoder: Decoder[T]): Source[T,NotUsed] =
    Source.queue[Json](1024,OverflowStrategy.dropTail).mapMaterializedValue {
      case queue =>
        configQueues += queue
        NotUsed
    }.map(decoder.decodeJson).collect {
      case Xor.Right(t) => t
    }

  override def didChangeConfiguration(settings: Json): Unit = {
    configQueues.foreach(_.offer(settings))
  }
} 
开发者ID:flatmap,项目名称:vscode-languageserver-scala,代码行数:24,代码来源:Configuration.scala


示例12: queueRequest

//设置package包名称以及导入依赖的类
package rest.client

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import rest.client.HttpRequestQueue._

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

trait HttpRequestQueue {

  implicit val system: ActorSystem
  implicit val mat: ActorMaterializer
  implicit val ec: ExecutionContext

  val QueueSize: Int

  val poolClientFlow: Flow[(HttpRequest, Promise[HttpResponse]),
                           (Try[HttpResponse], Promise[HttpResponse]),
                           Http.HostConnectionPool]

  private val queue =
    Source
      .queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.backpressure)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(httpResponse), promisedResponse)) => promisedResponse.success(httpResponse)
        case ((Failure(throwable), promisedResponse))    => promisedResponse.failure(throwable)
      }))(Keep.left)
      .run()

  def queueRequest(request: HttpRequest): Future[HttpResponse] = {
    val responsePromise = Promise[HttpResponse]()
    queue
      .offer(request -> responsePromise)
      .map {
        case QueueOfferResult.Enqueued    => responsePromise
        case QueueOfferResult.Dropped     => responsePromise.failure(new RuntimeException(QUEUE_OVERFLOW))
        case QueueOfferResult.QueueClosed => responsePromise.failure(new RuntimeException(QUEUE_CLOSED))
        case QueueOfferResult.Failure(ex) => responsePromise.failure(ex)
      }
      .flatMap(_.future)
  }
}

object HttpRequestQueue {
  private val QUEUE_OVERFLOW: String = "Queue overflowed. Try again later."
  private val QUEUE_CLOSED: String   = "Queue was closed (pool shut down) while running the request. Try again later."
} 
开发者ID:lymr,项目名称:fun-chat,代码行数:53,代码来源:HttpRequestQueue.scala


示例13: HelloWorldBackpressured

//设置package包名称以及导入依赖的类
package eu.svez.backpressuredemo.A_local

import akka.NotUsed
import akka.pattern.after
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Sink, Source}
import eu.svez.backpressuredemo.StreamDemo
import kamon.Kamon

import scala.concurrent.Future
import scala.concurrent.duration._

object HelloWorldBackpressured extends StreamDemo {

  sourceRate.send(5)
  sinkRate.send(5)

  def valve[T](rate: => Int): Flow[T, T, NotUsed] =
    Flow[T].mapAsync(1) { x =>
      after(1.second / rate, system.scheduler)(Future.successful(x))
    }

  def meter[T](name: String): Flow[T, T, NotUsed] = {
    val msgCounter = Kamon.metrics.counter(name)

    Flow[T].map { x =>
      msgCounter.increment()
      x
    }
  }

  Source.repeat("Hello world!")
    .via(valve(sourceRate.get))
    .via(meter("source"))

    .buffer(100, OverflowStrategy.backpressure)
    .via(valve(sinkRate.get))
    .via(meter("sink"))
    .runWith(Sink.ignore)



  readRatesFromStdIn()
} 
开发者ID:svezfaz,项目名称:akka-backpressure-scala-central-talk,代码行数:45,代码来源:HelloWorldBackpressured.scala


示例14: IntegratingWithActorsApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.{ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.pattern.ask
import akka.util.Timeout
import com.packt.chapter8.SinkActor.{AckSinkActor, CompletedSinkActor, InitSinkActor}

import scala.concurrent.duration._

object IntegratingWithActorsApplication extends App {

  implicit val actorSystem = ActorSystem("IntegratingWithActors")
  implicit val actorMaterializer = ActorMaterializer()

  implicit val askTimeout = Timeout(5 seconds)
  val stringCleaner = actorSystem.actorOf(Props[StringCleanerActor])
  val sinkActor = actorSystem.actorOf(Props[SinkActor])

  val source = Source.queue[String](100, OverflowStrategy.backpressure)
  val sink = Sink.actorRefWithAck[String](sinkActor, InitSinkActor, AckSinkActor, CompletedSinkActor)
  val queue = source
    .mapAsync(parallelism = 5)(elem => (stringCleaner ? elem).mapTo[String])
    .to(sink)
    .run()

  actorSystem.actorOf(SourceActor.props(queue))
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:30,代码来源:IntegratingWithActorsApplication.scala


示例15: EventDao

//设置package包名称以及导入依赖的类
package dao

import akka.NotUsed
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import org.reactivestreams.Publisher
import play.api.libs.json.{Format, JsSuccess, Json}

import scala.concurrent.Future

abstract class EventDao[Event](
    implicit val mat: Materializer,
    format: Format[Event]) {

  def store(str: Event): Future[QueueOfferResult] =
    in.offer(str)

  val (
    in: SourceQueueWithComplete[Event],
    out: Publisher[Event]
  ) =
    Source
      .queue[Event](3, OverflowStrategy.backpressure)
      .map(Json.toJson(_))
      .map(Json.stringify)
      .via(eventStore)
      .map(Json.parse)
      .map(Json.fromJson[Event])
      .collect { case JsSuccess(event, _) => event }
      .toMat(Sink.asPublisher(fanout = true))(Keep.both)
      .run()

  protected def eventStore: Flow[String, String, NotUsed]
} 
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:35,代码来源:EventDao.scala


示例16: StreamActor

//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}

import scala.concurrent.duration.Duration
import scala.io.Source.fromFile

class StreamActor extends Actor {
  override def receive: Receive = {
    case StreamBook(filename) =>
      val materializer = ActorMaterializer.create(context)  // Materializing and running a stream always requires a Materializer to be in implicit scope.
      val sink = Source
        .actorRef(1000, OverflowStrategy.dropNew)           // If the buffer is full when a new element arrives, drops the new element.
        .throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping)   // throttle - to slow down the stream to 1 element per second.
        .to(Sink.actorRef(sender, NotUsed))                 // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
        .run()(materializer)
      val lines = fromFile(filename).getLines
      lines.foreach(line => sink ! StreamLine(line))
  }

  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(10, Duration(60, "seconds")) {
      case _: Exception => Restart
    }
}

case class StreamBook(fileName: String)
case class StreamLine(line: String) 
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala


示例17: chatRoom

//设置package包名称以及导入依赖的类
package name.denyago.yasc.http.websocket

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import name.denyago.yasc.chat.ConnectedUser
import name.denyago.yasc.chat.events.{ConnectionEstablished, MessagePosted, MessageReceived}


trait WsFlow {
  implicit val webSocketSys = ActorSystem("websocket-system")
  implicit val webSocketMat = ActorMaterializer()

  def chatRoom: ActorRef

  def wsFlow: Flow[Message, Message, NotUsed] = {
    val userActor = webSocketSys.actorOf(Props(new ConnectedUser(chatRoom)))

    val incomingMessages: Sink[Message, NotUsed] =
      Flow[Message].map {
        case TextMessage.Strict(text) => MessageReceived(text)
      }.to(Sink.actorRef[MessageReceived](userActor, PoisonPill))

    val outgoingMessages: Source[Message, NotUsed] =
      Source.actorRef[MessagePosted](10, OverflowStrategy.fail)
          .mapMaterializedValue { outgoingActor =>
            userActor ! ConnectionEstablished(outgoingActor)
            NotUsed
          }
        .map(
          (outgoingMessage: MessagePosted) => TextMessage(outgoingMessage.text))

    Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
  }
} 
开发者ID:denyago,项目名称:yet-another-simple-chat,代码行数:38,代码来源:WsFlow.scala


示例18: FilteringProcess

//设置package包名称以及导入依赖的类
package com.realizationtime.btdogg.filtering

import akka.actor.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import com.realizationtime.btdogg.BtDoggConfiguration.standardBufferSize
import com.realizationtime.btdogg.TKey
import com.realizationtime.btdogg.hashessource.HashesSource.SpottedHash
import com.realizationtime.btdogg.persist.MongoPersist
import redis.RedisClient

import scala.concurrent.ExecutionContext

class FilteringProcess(val entryFilterDB: RedisClient,
                       val hashesBeingScrapedDB: RedisClient,
                       val mongoPersist: MongoPersist)
                      (implicit private val ec: ExecutionContext) {

  val onlyNewHashes: Source[TKey, ActorRef] = Source.actorRef[SpottedHash](bufferSize = standardBufferSize, OverflowStrategy.dropNew)
    .via(new EntryFilter(entryFilterDB).flow)
    .via(new HashesBeingScraped(hashesBeingScrapedDB).flow)
    .via(new MongoFilter(mongoPersist, hashesBeingScrapedDB).flow)

}

object FilteringProcess {

  object Result extends Enumeration {
    type Result = Value
    val NEW, ALREADY_EXISTED = Value
  }

} 
开发者ID:bwrega,项目名称:btdogg,代码行数:34,代码来源:FilteringProcess.scala


示例19: DistributedSourceWorkerSpec

//设置package包名称以及导入依赖的类
package aecor.tests

import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, FunSuiteLike, Matchers }

import scala.concurrent.duration._

object DistributedSourceWorkerSpec {
  def conf: Config = ConfigFactory.parseString(s"""
        cluster.system-name=test
        akka.persistence.journal.plugin=akka.persistence.journal.inmem
        akka.persistence.snapshot-store.plugin=akka.persistence.no-snapshot-store
        aecor.akka-runtime.idle-timeout = 1s
        cluster.seed-nodes = ["akka://[email protected]:51000"]
     """).withFallback(ConfigFactory.load())
}

class DistributedSourceWorkerSpec
    extends TestKit(ActorSystem("test", ShardedRuntimeSpec.conf))
    with FunSuiteLike
    with Matchers
    with ScalaFutures
    with BeforeAndAfterAll {

  override implicit val patienceConfig = PatienceConfig(15.seconds, 150.millis)

  override def afterAll: Unit =
    TestKit.shutdownActorSystem(system)

  val sink = Source.queue(10, OverflowStrategy.backpressure)

} 
开发者ID:notxcain,项目名称:aecor,代码行数:38,代码来源:DistributedSourceWorkerSpec.scala


示例20: Connected

//设置package包名称以及导入依赖的类
package me.mmcoulombe.aad

import akka.actor._
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import me.mmcoulombe.add.Operation

trait ActorEventControl
case class Connected(subscriber: ActorRef) extends ActorEventControl
case class ReceivedEvent(event: String) extends ActorEventControl
case class SendEvent(event: Operation) extends ActorEventControl
case object Disconnected extends ActorEventControl

class EventManager(implicit system: ActorSystem) {

  private val eventActor =
    system.actorOf(Props(new Actor {
      var subscribers = Set.empty[ActorRef]
      override def receive: Receive = {
        case Connected(subscriber) =>
          println("WS Event Manager: Connected")
          context.watch(subscriber)
          subscribers += subscriber

        case Disconnected => println("WS Event manager: Disconnected")

        case ReceivedEvent(event) => println(s"Received event : $event")

        case SendEvent(event) => {
          println(s"SendEvent : $event")
          dispatch(event)
        }

        case msg: Operation => dispatch(msg)
      }
      def dispatch(op: Operation): Unit = {
        println(s"dispatch ${op.op}")
        subscribers.foreach(_ ! op)
      }

    }))

  def sendMessage(event: Operation): Unit = {
    println("sendMessage")
    eventActor ! SendEvent(event)
  }

  private val in = Flow[String] // heartbeat otherwise the socket will be close after x seconds in idle
      .map(ReceivedEvent)
      .to(Sink.actorRef[ActorEventControl](eventActor, Disconnected))

  private val out =
    Source.actorRef[Operation](1, OverflowStrategy.fail)
      .mapMaterializedValue(eventActor ! Connected(_))

  val flow = Flow.fromSinkAndSource(in, out)
} 
开发者ID:mmcoulombe,项目名称:poc-aad,代码行数:58,代码来源:EventManager.scala



注:本文中的akka.stream.OverflowStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Boolean类代码示例发布时间:2022-05-23
下一篇:
Scala FileInputFormat类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap