本文整理汇总了Scala中akka.stream.scaladsl.Flow类的典型用法代码示例。如果您正苦于以下问题:Scala Flow类的具体用法?Scala Flow怎么用?Scala Flow使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Flow类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: Demo1
//设置package包名称以及导入依赖的类
package lew.bing.akka.http
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.{Directive, RequestContext, Route, RouteResult}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import scala.io.StdIn
object Demo1 {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("my-http")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route:Route =
path("hello"){
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h1>Say hello to akka-http</h1>"))
}
}
val map = Flow[RequestContext].map(route)
//???????????
val bindingFuture = Http().bindAndHandle(route,"localhost",9898)
println(s"Server online at http://localhost:9898/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
开发者ID:liuguobing634,项目名称:akka,代码行数:41,代码来源:Demo1.scala
示例3: CassandraSink
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.cassandra.scaladsl
import akka.Done
import akka.stream.scaladsl.{Flow, Keep, Sink}
import com.datastax.driver.core.{BoundStatement, PreparedStatement, Session}
import scala.concurrent.{ExecutionContext, Future}
import akka.stream.alpakka.cassandra.GuavaFutures._
object CassandraSink {
def apply[T](
parallelism: Int,
statement: PreparedStatement,
statementBinder: (T, PreparedStatement) => BoundStatement
)(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
Flow[T]
.mapAsyncUnordered(parallelism)(t ? session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
}
开发者ID:akka,项目名称:alpakka,代码行数:21,代码来源:CassandraSink.scala
示例4: IteratorProducerTest
//设置package包名称以及导入依赖的类
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class IteratorProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[Int](env, publisherShutdownTimeout)
with WithActorSystem with TestNGSuiteLike {
implicit val system = _system
def this(system: ActorSystem) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis)
}
def this() {
this(ActorSystem(classOf[IteratorProducerTest].getSimpleName, AkkaSpec.testConf))
}
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iter: Iterator[Int] =
if (elements == 0)
Iterator from 0
else
(Iterator from 0).take(elements)
Flow(iter).toProducer(materializer).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
Flow(List.empty[Int].iterator).toProducer(materializer).getPublisher
override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:41,代码来源:IteratorProducerTest.scala
示例5: IterableProducerTest
//设置package包名称以及导入依赖的类
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import scala.collection.immutable
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class IterableProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[Int](env, publisherShutdownTimeout)
with WithActorSystem with TestNGSuiteLike {
implicit val system = _system
def this(system: ActorSystem) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis)
}
def this() {
this(ActorSystem(classOf[IterableProducerTest].getSimpleName, AkkaSpec.testConf))
}
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == 0)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements
Flow(iterable).toProducer(materializer).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
Flow[Int](Nil).toProducer(materializer).getPublisher
override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:42,代码来源:IterableProducerTest.scala
示例6: FlowFilterSpec
//设置package包名称以及导入依赖的类
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.{ StreamTestKit, ScriptedTest }
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ? random }
import akka.stream.scaladsl.Flow
import akka.stream.impl.ActorBasedFlowMaterializer
class FlowFilterSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
"A Filter" must {
"filter" in {
def script = Script((1 to 50) map { _ ? val x = random.nextInt(); Seq(x) -> (if ((x & 1) == 0) Seq(x) else Seq()) }: _*)
(1 to 50) foreach (_ ? runScript(script, settings)(_.filter(_ % 2 == 0)))
}
"not blow up with high request counts" in {
val gen = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 1,
maximumInputBufferSize = 1,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 1))
val probe = StreamTestKit.consumerProbe[Int]
Flow(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
toProducer(gen).produceTo(probe)
val subscription = probe.expectSubscription()
for (_ ? 1 to 10000) {
subscription.requestMore(Int.MaxValue)
}
probe.expectNext(1)
probe.expectComplete()
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:48,代码来源:FlowFilterSpec.scala
示例7: ActorProducerTest
//设置package包名称以及导入依赖的类
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import org.reactivestreams.api.Producer
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class ActorProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[Int](env, publisherShutdownTimeout)
with WithActorSystem with TestNGSuiteLike {
implicit val system = _system
import system.dispatcher
def this(system: ActorSystem) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis)
}
def this() {
this(ActorSystem(classOf[ActorProducerTest].getSimpleName, AkkaSpec.testConf))
}
private val materializer = FlowMaterializer(MaterializerSettings())
private def createProducer(elements: Int): Producer[Int] = {
val iter = Iterator from 1000
val iter2 = if (elements > 0) iter take elements else iter
Flow(() ? if (iter2.hasNext) iter2.next() else throw Stop).toProducer(materializer)
}
def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher
override def createCompletedStatePublisher(): Publisher[Int] = {
val pub = createProducer(1)
Flow(pub).consume(materializer)
Thread.sleep(100)
pub.getPublisher
}
override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:45,代码来源:ActorProducerTest.scala
示例8: FlowMapSpec
//设置package包名称以及导入依赖的类
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.{ StreamTestKit, ScriptedTest }
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ? random }
import akka.stream.scaladsl.Flow
import akka.stream.impl.ActorBasedFlowMaterializer
class FlowMapSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
val gen = FlowMaterializer(settings)
"A Map" must {
"map" in {
def script = Script((1 to 50) map { _ ? val x = random.nextInt(); Seq(x) -> Seq(x.toString) }: _*)
(1 to 50) foreach (_ ? runScript(script, settings)(_.map(_.toString)))
}
"not blow up with high request counts" in {
val probe = StreamTestKit.consumerProbe[Int]
Flow(List(1).iterator).
map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).
toProducer(gen).produceTo(probe)
val subscription = probe.expectSubscription()
for (_ ? 1 to 10000) {
subscription.requestMore(Int.MaxValue)
}
probe.expectNext(6)
probe.expectComplete()
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:45,代码来源:FlowMapSpec.scala
示例9: FlowFromGraph
//设置package包名称以及导入依赖的类
package sample.graphDSL
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}
object FlowFromGraph {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("FlowFromGraph")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
val listOfFlows = List(processorFlow1, processorFlow2)
def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")
Flow.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))
indexFlows.foreach(broadcast ~> _ ~> merge)
FlowShape(broadcast.in, merge.out)
})
}
val compoundFlow = compoundFlowFrom(listOfFlows)
Source(1 to 10)
.via(compoundFlow)
.runWith(Sink.foreach(println(_)))
.onComplete(_ => system.terminate())
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala
示例10: HelloConsumerServiceImpl
//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl
import akka.stream.scaladsl.Flow
import akka.{Done, NotUsed}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import sample.helloworld.api.HelloService
import sample.helloworld.api.model.GreetingMessage
import sample.helloworldconsumer.api.HelloConsumerService
import sample.helloworldconsumer.impl.repositories.MessageRepository
class HelloConsumerServiceImpl (registery: PersistentEntityRegistry ,helloService: HelloService ,msgRepository:MessageRepository) extends HelloConsumerService {
helloService.greetingsTopic
.subscribe
.atLeastOnce(
Flow[GreetingMessage].map{ msg =>
putGreetingMessage(msg)
Done
}
)
var lastObservedMessage: String = _
private def putGreetingMessage(greetingMessage: GreetingMessage) = {
println(s"obersrve new message ${greetingMessage.message}")
entityRef(greetingMessage.message.toString).ask(SaveNewMessage(greetingMessage.message))
lastObservedMessage = greetingMessage.message
}
override def findTopHundredWordCounts(): ServiceCall[NotUsed, Map[String, Int]] = ServiceCall {
//fetch top 100 message and perform word count
req => msgRepository.fetchAndCountWordsFromMessages(100)
}
override def foo():ServiceCall[NotUsed, String] = ServiceCall{
req => scala.concurrent.Future.successful(lastObservedMessage)
}
private def entityRef(id: String) = registery.refFor[MessageEntity](id)
}
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:42,代码来源:HelloConsumerServiceImpl.scala
示例11: PacketLineFormatter
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.pcap
import akka.stream.scaladsl.Flow
import edu.uw.at.iroberts.wirefugue.pcap.PcapFileRaw.LinkType
import edu.uw.at.iroberts.wirefugue.protocol.overlay.{EtherType, Ethernet}
object PacketLineFormatter {
def apply() = Flow.fromFunction { p: Packet =>
val length: Long = p.data.length
val rcvdLength: Long = p.originalLength
val maybeFrame = p.network match {
case LinkType.ETHERNET =>
if (p.data.length >= Ethernet.minSize)
Some(Ethernet(p.data))
else
None
case _ => None
}
val maybePacket = maybeFrame.flatMap { frame =>
if (frame.etherType == EtherType.IPv4.id &&
frame.payload.length >= Datagram.headerLength)
Some(Datagram.parse(frame.payload))
else
None
}
s"[${p.timestamp}] $length bytes" + {
if (length < rcvdLength) s" (truncated from $rcvdLength)"
else ""
} +
maybeFrame.map("\n" + _).getOrElse("") +
maybePacket.map("\n" + _).getOrElse("")
}
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:37,代码来源:PacketLineFormatter.scala
示例12: printMessage
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.ws
import akka.event.slf4j.SLF4JLogging
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }
import akka.stream.scaladsl.Flow
trait WsUtils extends SLF4JLogging {
private[this] def printMessage(msg: Message): String = msg match {
case TextMessage.Strict(text) => text
case _: TextMessage.Streamed => "(streamed text message)"
case _: BinaryMessage => "(binary message)"
}
def logWsMessages(id: String)(handler: Flow[Message, Message, Any]) = {
Flow[Message]
.map { msg => log.debug(s"[$id] IN : ${printMessage(msg)}"); msg }
.via(handler)
.map { msg => log.debug(s"[$id] OUT: ${printMessage(msg)}"); msg }
}
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:22,代码来源:WsUtils.scala
示例13: SplitAfterSizeSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.testkit.TestKit
import akka.stream.ActorMaterializerSettings
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import org.scalatest.Matchers
import org.scalatest.FlatSpecLike
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import akka.stream.scaladsl.Sink
import org.scalatest.time.{Millis, Seconds, Span}
import scala.concurrent.duration._
class SplitAfterSizeSpec(_system: ActorSystem)
extends TestKit(_system)
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
def this() = this(ActorSystem("SplitAfterSizeSpec"))
implicit val defaultPatience =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))
"SplitAfterSize" should "yield a single empty substream on no input" in {
Source
.empty[ByteString]
.via(
SplitAfterSize(10)(Flow[ByteString]).concatSubstreams
)
.runWith(Sink.seq)
.futureValue should be(Seq.empty)
}
it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in {
Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
.via(
SplitAfterSize(10)(Flow[ByteString]).prefixAndTail(10).map { case (prefix, tail) => prefix }.concatSubstreams
)
.runWith(Sink.seq)
.futureValue should be(
Seq(
Seq(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12)),
Seq(ByteString(13, 14))
)
)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:56,代码来源:SplitAfterSizeSpec.scala
示例14: CsvParsing
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv.scaladsl
import akka.NotUsed
import akka.stream.alpakka.csv.CsvParsingStage
import akka.stream.scaladsl.Flow
import akka.util.ByteString
object CsvParsing {
val Backslash: Byte = '\\'
val Comma: Byte = ','
val SemiColon: Byte = ';'
val Colon: Byte = ':'
val Tab: Byte = '\t'
val DoubleQuote: Byte = '"'
def lineScanner(delimiter: Byte = Comma,
quoteChar: Byte = DoubleQuote,
escapeChar: Byte = Backslash): Flow[ByteString, List[ByteString], NotUsed] =
Flow.fromGraph(new CsvParsingStage(delimiter, quoteChar, escapeChar))
}
开发者ID:akka,项目名称:alpakka,代码行数:23,代码来源:CsvParsing.scala
示例15: format
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv.scaladsl
import java.nio.charset.{Charset, StandardCharsets}
import akka.NotUsed
import akka.stream.alpakka.csv.{javadsl, CsvFormatter}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import scala.collection.immutable
def format[T <: immutable.Iterable[String]](
delimiter: Char = Comma,
quoteChar: Char = DoubleQuote,
escapeChar: Char = Backslash,
endOfLine: String = "\r\n",
quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
charset: Charset = StandardCharsets.UTF_8,
byteOrderMark: Option[ByteString] = None
): Flow[T, ByteString, NotUsed] = {
val formatter =
new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine, quotingStyle, charset)
byteOrderMark.fold {
Flow[T].map(formatter.toCsv).named("CsvFormatting")
} { bom =>
Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:CsvFormatting.scala
示例16: FSSocket
//设置package包名称以及导入依赖的类
package esl
import akka.stream.scaladsl.{Flow, Sink}
import esl.domain.{CommandReply, FSMessage}
case class FSSocket[FS <: FSConnection](fsConnection: FS, commandReply: CommandReply) extends InfantFSSocket[FS]
trait InfantFSSocket[FS <: FSConnection] {
protected val fsConnection: FS
protected val commandReply: CommandReply
def attachSink(anotherSink: Sink[(FS, List[FSMessage]), _]): FSSocket[FS] = {
val flow = Flow[List[FSMessage]].map(f => fsConnection -> f)
.to(anotherSink)
fsConnection.attachSink(flow)
FSSocket(fsConnection, commandReply)
}
}
object InfantFSSocket {
def apply[FS <: FSConnection](fsConn: FS, cmdReply: CommandReply): InfantFSSocket[FS] = new InfantFSSocket[FS] {
override protected val fsConnection: FS = fsConn
override protected val commandReply: CommandReply = cmdReply
}
}
开发者ID:CallHandling,项目名称:freeswitch-scala-esl,代码行数:28,代码来源:InfantFSSocket.scala
示例17: Application
//设置package包名称以及导入依赖的类
package controllers
import javax.inject._
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, _}
import play.api.mvc._
@Singleton
class Application @Inject() (system: ActorSystem) extends Controller {
val mediator = DistributedPubSub(system).mediator
def socket = WebSocket.accept[String, String] { request =>
new Chat(mediator, "chat").flow
}
}
class Chat(mediator: ActorRef, topic: String) {
def flow: Flow[String, String, NotUsed] =
Flow.fromSinkAndSource(
publishToMediatorSink,
sourceFrom)
private def publishToMediatorSink[In]: Sink[In, NotUsed] =
Flow[In].map(DistributedPubSubMediator.Publish(topic, _)) to
Sink.actorRef[DistributedPubSubMediator.Publish](mediator, ())
private def sourceFrom[Out]: Source[Out, ActorRef] =
Source
.actorRef[Out](5, OverflowStrategy.fail)
.mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }
}
开发者ID:jonasanso,项目名称:play-simple-chat,代码行数:40,代码来源:Application.scala
示例18: InputCustomer
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.util.Random
object InputCustomer {
def random(): InputCustomer = {
InputCustomer(s"FirstName${Random.nextInt(1000)} " +
s"LastName${Random.nextInt(1000)}")
}
}
case class InputCustomer(name: String)
case class OutputCustomer(firstName: String, lastName: String)
object CustomersExample extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val materializer = ActorMaterializer()
val inputCustomers = Source((1 to 100).map(_ => InputCustomer.random()))
val normalize = Flow[InputCustomer]
.map(c => c.name.split(" ").toList)
.collect {
case firstName :: lastName :: Nil => OutputCustomer(firstName, lastName)
}
val writeCustomers = Sink.foreach { println} // ?-conversion
inputCustomers.via(normalize).runWith(writeCustomers).andThen {
case _ => actorSystem.terminate()
}
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:40,代码来源:CustomersExample.scala
示例19: GraphBuilder
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source}
import scala.concurrent.Await
import scala.concurrent.duration._
object GraphBuilder extends App {
implicit val system = ActorSystem("actor-system")
implicit val materializer = ActorMaterializer()
val topHeadSink = Sink.head[Int]
val middleSink = Sink.head[Int]
val bottomHeadSink = Sink.seq[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
val results = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, middleSink, bottomHeadSink)
((_, _, _)) { implicit builder =>
(topHS, midHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
val source = builder.add(Source(1 to 10))
Source.repeat(2) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> midHS.in
source ~> bottomHS.in
ClosedShape
}).run()
val r1 = Await.result(results._1, 300 millis)
val r2 = Await.result(results._2, 300 millis)
val r3 = Await.result(results._3, 300 millis)
println(r1, r2, r3)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:41,代码来源:GraphBuilder.scala
示例20: TestSessionExporterSolr
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionSolrDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestSessionExporterSolr {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
val writer = new TestSessionSolrDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-sessions-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
testSessions
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporterSolr.scala
注:本文中的akka.stream.scaladsl.Flow类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论