本文整理汇总了Scala中akka.stream.OverflowStrategy类的典型用法代码示例。如果您正苦于以下问题:Scala OverflowStrategy类的具体用法?Scala OverflowStrategy怎么用?Scala OverflowStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了OverflowStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: WebSocketHandler
//设置package包名称以及导入依赖的类
package websocket
import akka.NotUsed
import akka.actor._
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import core.entities.AuthTokenContext
import websocket.ClientEndpoint._
import websocket.WebSocketHandler._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
class WebSocketHandler(processingRouter: ActorRef, connectedClientsStore: ActorRef, processingTimeout: FiniteDuration)
(implicit system: ActorSystem, mat: ActorMaterializer, apiDispatcher: ExecutionContext) {
def clientEndpoint(ctx: AuthTokenContext): Flow[Message, Message, NotUsed] = {
val clientEndpoint =
system.actorOf(ClientEndpoint.props(ctx, processingRouter, connectedClientsStore, processingTimeout),
ClientEndpoint.name(ctx))
val incomingMessages: Sink[Message, NotUsed] = Flow[Message]
.map {
case TextMessage.Strict(jsContent) => Some(IncomingMessage(jsContent))
case ts: TextMessage.Streamed =>
ts.textStream.runWith(Sink.ignore)
None
case br: BinaryMessage =>
br.dataStream.runWith(Sink.ignore)
None
}
.collect {
case Some(message: IncomingMessage) => message
}
.to(Sink.actorRef[IncomingMessage](clientEndpoint, PoisonPill))
val outgoingMessages: Source[Message, NotUsed] = Source
.actorRef[OutgoingMessage](BUFFER_SIZE, OverflowStrategy.backpressure)
.mapMaterializedValue { socket =>
clientEndpoint ! Attach(socket)
NotUsed
}
.map {
case OutgoingMessage(jsContent) => TextMessage(jsContent)
}
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
}
object WebSocketHandler {
private val BUFFER_SIZE: Int = 10
}
开发者ID:lymr,项目名称:fun-chat,代码行数:57,代码来源:WebSocketHandler.scala
示例2: Flows
//设置package包名称以及导入依赖的类
package com.stulsoft.akka.stream
import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Sink, Source}
import com.typesafe.scalalogging.LazyLogging
object Flows extends App with LazyLogging {
logger.info("start")
// Explicitly creating and wiring up a Source, Sink and Flow
Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_)))
// Starting from a Source
val source = Source(1 to 6).map(_ * 2)
source.to(Sink.foreach(println(_)))
// Starting from a Sink
val sink: Sink[Int, NotUsed] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_)))
Source(1 to 6).to(sink)
// Broadcast to a sink inline
val otherSink: Sink[Int, NotUsed] =
Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
Source(1 to 6).to(otherSink)
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // back-pressures the source if the buffer is full
logger.info("end")
}
开发者ID:ysden123,项目名称:poc,代码行数:34,代码来源:Flows.scala
示例3: Application
//设置package包名称以及导入依赖的类
package controllers
import javax.inject._
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, _}
import play.api.mvc._
@Singleton
class Application @Inject() (system: ActorSystem) extends Controller {
val mediator = DistributedPubSub(system).mediator
def socket = WebSocket.accept[String, String] { request =>
new Chat(mediator, "chat").flow
}
}
class Chat(mediator: ActorRef, topic: String) {
def flow: Flow[String, String, NotUsed] =
Flow.fromSinkAndSource(
publishToMediatorSink,
sourceFrom)
private def publishToMediatorSink[In]: Sink[In, NotUsed] =
Flow[In].map(DistributedPubSubMediator.Publish(topic, _)) to
Sink.actorRef[DistributedPubSubMediator.Publish](mediator, ())
private def sourceFrom[Out]: Source[Out, ActorRef] =
Source
.actorRef[Out](5, OverflowStrategy.fail)
.mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }
}
开发者ID:jonasanso,项目名称:play-simple-chat,代码行数:40,代码来源:Application.scala
示例4: TestSessionExporterSolr
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionSolrDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestSessionExporterSolr {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
val writer = new TestSessionSolrDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-sessions-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
testSessions
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporterSolr.scala
示例5: TestSessionExporter
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionElasticsearchDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestSessionExporter {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
val writer = new TestSessionElasticsearchDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-sessions-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
testSessions
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporter.scala
示例6: TestGroupExporterSolr
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestGroupCassandraDataReaderComponent
import org.efset.writer.TestGroupSolrDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestGroupExporterSolr {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestGroupCassandraDataReaderComponent(query).dataReader
val writer = new TestGroupSolrDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-groups-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testGroups =>
println(s"Processing ${testGroups.size} items. Batch number ${total.incrementAndGet()}")
testGroups
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestGroupExporterSolr.scala
示例7: TestGroupExporter
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestGroupCassandraDataReaderComponent
import org.efset.writer.TestGroupElasticsearchDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestGroupExporter {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestGroupCassandraDataReaderComponent(query).dataReader
val writer = new TestGroupElasticsearchDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-groups-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testGroups =>
println(s"Processing ${testGroups.size} items. Batch number ${total.incrementAndGet()}")
testGroups
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestGroupExporter.scala
示例8: KafkaWriter
//设置package包名称以及导入依赖的类
package wiii
import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}
object KafkaWriter {
def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}
class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
override def preStart(): Unit = initWriter()
override def receive: Receive = {
case _ =>
}
def initWriter(): Unit = {
val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = topicName,
valueSerializer = TweetSerializer
))
val subscriber = context.actorOf(subscriberProps)
val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
val factory = new TwitterStreamFactory()
val twitterStream = factory.getInstance()
twitterStream.addListener(new StatusForwarder(actorRef))
twitterStream.filter(new FilterQuery("espn"))
Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
.runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
}
}
class StatusForwarder(publisher: ActorRef) extends StatusListener {
def onStatus(status: Status): Unit = publisher ! status
//\\ nop all the others for now //\\
def onStallWarning(warning: StallWarning): Unit = {}
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
def onException(ex: Exception): Unit = {}
}
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala
示例9: self
//设置package包名称以及导入依赖的类
package com.omearac.producers
import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Source}
import com.omearac.shared.JsonMessageConversion.Conversion
import com.omearac.shared.{AkkaStreams, EventSourcing}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
trait ProducerStream extends AkkaStreams with EventSourcing {
implicit val system: ActorSystem
def self: ActorRef
def createStreamSource[msgType] = {
Source.queue[msgType](Int.MaxValue,OverflowStrategy.backpressure)
}
def createStreamSink(producerProperties: Map[String, String]) = {
val kafkaMBAddress = producerProperties("bootstrap-servers")
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer).withBootstrapServers(kafkaMBAddress)
Producer.plainSink(producerSettings)
}
def createStreamFlow[msgType: Conversion](producerProperties: Map[String, String]) = {
val numberOfPartitions = producerProperties("num.partitions").toInt -1
val topicToPublish = producerProperties("publish-topic")
val rand = new scala.util.Random
val range = 0 to numberOfPartitions
Flow[msgType].map { msg =>
val partition = range(rand.nextInt(range.length))
val stringJSONMessage = Conversion[msgType].convertToJson(msg)
new ProducerRecord[Array[Byte], String](topicToPublish, partition, null, stringJSONMessage)
}
}
}
开发者ID:omearac,项目名称:reactive-kafka-microservice-template,代码行数:43,代码来源:ProducerStream.scala
示例10: RemoteBot
//设置package包名称以及导入依赖的类
package im.actor.botkit
import java.net.URLEncoder
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import akka.util.Timeout
import im.actor.bots.BotMessages
import im.actor.concurrent.ActorFutures
import upickle.default._
import scala.concurrent.duration._
object RemoteBot {
val DefaultEndpoint = "wss://api.actor.im"
private object StreamComplete
}
abstract class RemoteBot(token: String, endpoint: String) extends BotBase with ActorFutures {
import BotMessages._
import RemoteBot._
override protected implicit val timeout: Timeout = Timeout(30.seconds)
private implicit val mat = ActorMaterializer()
initFlow()
def onReceive(message: Object): Unit = {}
def receive: Receive = internalReceive orElse {
case message ?
onReceive(message.asInstanceOf[Object])
}
override protected def onStreamFailure(cause: Throwable): Unit = {
log.error(cause, "Stream failure")
initFlow()
}
private final def internalReceive: Receive = workingBehavior.orElse({
case StreamComplete ?
log.warning("Disconnected, reinitiating flow")
initFlow()
})
private def initFlow(): Unit = {
val (wsSource, wsSink) = WebsocketClient.sourceAndSink(s"$endpoint/v1/bots/${URLEncoder.encode(token, "UTF-8")}")
wsSource.map(read[BotMessageOut]).to(Sink.actorRef(self, StreamComplete)).run()
val rqSource = Source.actorRef(bufferSize = 100, overflowStrategy = OverflowStrategy.fail)
.map(write[BotRequest])
.to(wsSink)
.run()
setRqSource(rqSource)
}
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:61,代码来源:RemoteBot.scala
示例11: configChanges
//设置package包名称以及导入依赖的类
package net.flatmap.vscode.languageserver
import akka.NotUsed
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import cats.data.Xor
import io.circe.{Decoder, Json}
trait Configuration[T] extends LanguageServer {
private var configQueues = Set.empty[SourceQueueWithComplete[Json]]
def configChanges(implicit decoder: Decoder[T]): Source[T,NotUsed] =
Source.queue[Json](1024,OverflowStrategy.dropTail).mapMaterializedValue {
case queue =>
configQueues += queue
NotUsed
}.map(decoder.decodeJson).collect {
case Xor.Right(t) => t
}
override def didChangeConfiguration(settings: Json): Unit = {
configQueues.foreach(_.offer(settings))
}
}
开发者ID:flatmap,项目名称:vscode-languageserver-scala,代码行数:24,代码来源:Configuration.scala
示例12: queueRequest
//设置package包名称以及导入依赖的类
package rest.client
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import rest.client.HttpRequestQueue._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
trait HttpRequestQueue {
implicit val system: ActorSystem
implicit val mat: ActorMaterializer
implicit val ec: ExecutionContext
val QueueSize: Int
val poolClientFlow: Flow[(HttpRequest, Promise[HttpResponse]),
(Try[HttpResponse], Promise[HttpResponse]),
Http.HostConnectionPool]
private val queue =
Source
.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.backpressure)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(httpResponse), promisedResponse)) => promisedResponse.success(httpResponse)
case ((Failure(throwable), promisedResponse)) => promisedResponse.failure(throwable)
}))(Keep.left)
.run()
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue
.offer(request -> responsePromise)
.map {
case QueueOfferResult.Enqueued => responsePromise
case QueueOfferResult.Dropped => responsePromise.failure(new RuntimeException(QUEUE_OVERFLOW))
case QueueOfferResult.QueueClosed => responsePromise.failure(new RuntimeException(QUEUE_CLOSED))
case QueueOfferResult.Failure(ex) => responsePromise.failure(ex)
}
.flatMap(_.future)
}
}
object HttpRequestQueue {
private val QUEUE_OVERFLOW: String = "Queue overflowed. Try again later."
private val QUEUE_CLOSED: String = "Queue was closed (pool shut down) while running the request. Try again later."
}
开发者ID:lymr,项目名称:fun-chat,代码行数:53,代码来源:HttpRequestQueue.scala
示例13: HelloWorldBackpressured
//设置package包名称以及导入依赖的类
package eu.svez.backpressuredemo.A_local
import akka.NotUsed
import akka.pattern.after
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, Sink, Source}
import eu.svez.backpressuredemo.StreamDemo
import kamon.Kamon
import scala.concurrent.Future
import scala.concurrent.duration._
object HelloWorldBackpressured extends StreamDemo {
sourceRate.send(5)
sinkRate.send(5)
def valve[T](rate: => Int): Flow[T, T, NotUsed] =
Flow[T].mapAsync(1) { x =>
after(1.second / rate, system.scheduler)(Future.successful(x))
}
def meter[T](name: String): Flow[T, T, NotUsed] = {
val msgCounter = Kamon.metrics.counter(name)
Flow[T].map { x =>
msgCounter.increment()
x
}
}
Source.repeat("Hello world!")
.via(valve(sourceRate.get))
.via(meter("source"))
.buffer(100, OverflowStrategy.backpressure)
.via(valve(sinkRate.get))
.via(meter("sink"))
.runWith(Sink.ignore)
readRatesFromStdIn()
}
开发者ID:svezfaz,项目名称:akka-backpressure-scala-central-talk,代码行数:45,代码来源:HelloWorldBackpressured.scala
示例14: IntegratingWithActorsApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.{ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.pattern.ask
import akka.util.Timeout
import com.packt.chapter8.SinkActor.{AckSinkActor, CompletedSinkActor, InitSinkActor}
import scala.concurrent.duration._
object IntegratingWithActorsApplication extends App {
implicit val actorSystem = ActorSystem("IntegratingWithActors")
implicit val actorMaterializer = ActorMaterializer()
implicit val askTimeout = Timeout(5 seconds)
val stringCleaner = actorSystem.actorOf(Props[StringCleanerActor])
val sinkActor = actorSystem.actorOf(Props[SinkActor])
val source = Source.queue[String](100, OverflowStrategy.backpressure)
val sink = Sink.actorRefWithAck[String](sinkActor, InitSinkActor, AckSinkActor, CompletedSinkActor)
val queue = source
.mapAsync(parallelism = 5)(elem => (stringCleaner ? elem).mapTo[String])
.to(sink)
.run()
actorSystem.actorOf(SourceActor.props(queue))
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:30,代码来源:IntegratingWithActorsApplication.scala
示例15: EventDao
//设置package包名称以及导入依赖的类
package dao
import akka.NotUsed
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import org.reactivestreams.Publisher
import play.api.libs.json.{Format, JsSuccess, Json}
import scala.concurrent.Future
abstract class EventDao[Event](
implicit val mat: Materializer,
format: Format[Event]) {
def store(str: Event): Future[QueueOfferResult] =
in.offer(str)
val (
in: SourceQueueWithComplete[Event],
out: Publisher[Event]
) =
Source
.queue[Event](3, OverflowStrategy.backpressure)
.map(Json.toJson(_))
.map(Json.stringify)
.via(eventStore)
.map(Json.parse)
.map(Json.fromJson[Event])
.collect { case JsSuccess(event, _) => event }
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
.run()
protected def eventStore: Flow[String, String, NotUsed]
}
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:35,代码来源:EventDao.scala
示例16: StreamActor
//设置package包名称以及导入依赖的类
import akka.NotUsed
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}
import scala.concurrent.duration.Duration
import scala.io.Source.fromFile
class StreamActor extends Actor {
override def receive: Receive = {
case StreamBook(filename) =>
val materializer = ActorMaterializer.create(context) // Materializing and running a stream always requires a Materializer to be in implicit scope.
val sink = Source
.actorRef(1000, OverflowStrategy.dropNew) // If the buffer is full when a new element arrives, drops the new element.
.throttle(1, Duration(1, "seconds"), 1, ThrottleMode.shaping) // throttle - to slow down the stream to 1 element per second.
.to(Sink.actorRef(sender, NotUsed)) // Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.
.run()(materializer)
val lines = fromFile(filename).getLines
lines.foreach(line => sink ! StreamLine(line))
}
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(10, Duration(60, "seconds")) {
case _: Exception => Restart
}
}
case class StreamBook(fileName: String)
case class StreamLine(line: String)
开发者ID:Wwarrior1,项目名称:DistributedSystemsLab5Akka,代码行数:31,代码来源:StreamActor.scala
示例17: chatRoom
//设置package包名称以及导入依赖的类
package name.denyago.yasc.http.websocket
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import name.denyago.yasc.chat.ConnectedUser
import name.denyago.yasc.chat.events.{ConnectionEstablished, MessagePosted, MessageReceived}
trait WsFlow {
implicit val webSocketSys = ActorSystem("websocket-system")
implicit val webSocketMat = ActorMaterializer()
def chatRoom: ActorRef
def wsFlow: Flow[Message, Message, NotUsed] = {
val userActor = webSocketSys.actorOf(Props(new ConnectedUser(chatRoom)))
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message].map {
case TextMessage.Strict(text) => MessageReceived(text)
}.to(Sink.actorRef[MessageReceived](userActor, PoisonPill))
val outgoingMessages: Source[Message, NotUsed] =
Source.actorRef[MessagePosted](10, OverflowStrategy.fail)
.mapMaterializedValue { outgoingActor =>
userActor ! ConnectionEstablished(outgoingActor)
NotUsed
}
.map(
(outgoingMessage: MessagePosted) => TextMessage(outgoingMessage.text))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
}
开发者ID:denyago,项目名称:yet-another-simple-chat,代码行数:38,代码来源:WsFlow.scala
示例18: FilteringProcess
//设置package包名称以及导入依赖的类
package com.realizationtime.btdogg.filtering
import akka.actor.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import com.realizationtime.btdogg.BtDoggConfiguration.standardBufferSize
import com.realizationtime.btdogg.TKey
import com.realizationtime.btdogg.hashessource.HashesSource.SpottedHash
import com.realizationtime.btdogg.persist.MongoPersist
import redis.RedisClient
import scala.concurrent.ExecutionContext
class FilteringProcess(val entryFilterDB: RedisClient,
val hashesBeingScrapedDB: RedisClient,
val mongoPersist: MongoPersist)
(implicit private val ec: ExecutionContext) {
val onlyNewHashes: Source[TKey, ActorRef] = Source.actorRef[SpottedHash](bufferSize = standardBufferSize, OverflowStrategy.dropNew)
.via(new EntryFilter(entryFilterDB).flow)
.via(new HashesBeingScraped(hashesBeingScrapedDB).flow)
.via(new MongoFilter(mongoPersist, hashesBeingScrapedDB).flow)
}
object FilteringProcess {
object Result extends Enumeration {
type Result = Value
val NEW, ALREADY_EXISTED = Value
}
}
开发者ID:bwrega,项目名称:btdogg,代码行数:34,代码来源:FilteringProcess.scala
示例19: DistributedSourceWorkerSpec
//设置package包名称以及导入依赖的类
package aecor.tests
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.testkit.TestKit
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, FunSuiteLike, Matchers }
import scala.concurrent.duration._
object DistributedSourceWorkerSpec {
def conf: Config = ConfigFactory.parseString(s"""
cluster.system-name=test
akka.persistence.journal.plugin=akka.persistence.journal.inmem
akka.persistence.snapshot-store.plugin=akka.persistence.no-snapshot-store
aecor.akka-runtime.idle-timeout = 1s
cluster.seed-nodes = ["akka://[email protected]:51000"]
""").withFallback(ConfigFactory.load())
}
class DistributedSourceWorkerSpec
extends TestKit(ActorSystem("test", ShardedRuntimeSpec.conf))
with FunSuiteLike
with Matchers
with ScalaFutures
with BeforeAndAfterAll {
override implicit val patienceConfig = PatienceConfig(15.seconds, 150.millis)
override def afterAll: Unit =
TestKit.shutdownActorSystem(system)
val sink = Source.queue(10, OverflowStrategy.backpressure)
}
开发者ID:notxcain,项目名称:aecor,代码行数:38,代码来源:DistributedSourceWorkerSpec.scala
示例20: Connected
//设置package包名称以及导入依赖的类
package me.mmcoulombe.aad
import akka.actor._
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import me.mmcoulombe.add.Operation
trait ActorEventControl
case class Connected(subscriber: ActorRef) extends ActorEventControl
case class ReceivedEvent(event: String) extends ActorEventControl
case class SendEvent(event: Operation) extends ActorEventControl
case object Disconnected extends ActorEventControl
class EventManager(implicit system: ActorSystem) {
private val eventActor =
system.actorOf(Props(new Actor {
var subscribers = Set.empty[ActorRef]
override def receive: Receive = {
case Connected(subscriber) =>
println("WS Event Manager: Connected")
context.watch(subscriber)
subscribers += subscriber
case Disconnected => println("WS Event manager: Disconnected")
case ReceivedEvent(event) => println(s"Received event : $event")
case SendEvent(event) => {
println(s"SendEvent : $event")
dispatch(event)
}
case msg: Operation => dispatch(msg)
}
def dispatch(op: Operation): Unit = {
println(s"dispatch ${op.op}")
subscribers.foreach(_ ! op)
}
}))
def sendMessage(event: Operation): Unit = {
println("sendMessage")
eventActor ! SendEvent(event)
}
private val in = Flow[String] // heartbeat otherwise the socket will be close after x seconds in idle
.map(ReceivedEvent)
.to(Sink.actorRef[ActorEventControl](eventActor, Disconnected))
private val out =
Source.actorRef[Operation](1, OverflowStrategy.fail)
.mapMaterializedValue(eventActor ! Connected(_))
val flow = Flow.fromSinkAndSource(in, out)
}
开发者ID:mmcoulombe,项目名称:poc-aad,代码行数:58,代码来源:EventManager.scala
注:本文中的akka.stream.OverflowStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论