本文整理汇总了Scala中akka.stream.ClosedShape类的典型用法代码示例。如果您正苦于以下问题:Scala ClosedShape类的具体用法?Scala ClosedShape怎么用?Scala ClosedShape使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ClosedShape类的13个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: StreamImporter
//设置package包名称以及导入依赖的类
package controllers.admin.gazetteers
import akka.stream.{ ActorAttributes, ClosedShape, Materializer, Supervision }
import akka.stream.scaladsl._
import akka.util.ByteString
import java.io.InputStream
import models.place.{ GazetteerRecord, PlaceService }
import play.api.Logger
import play.api.libs.json.Json
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._
class StreamImporter(implicit materializer: Materializer) {
private val BATCH_SIZE = 100
private val decider: Supervision.Decider = {
case t: Throwable =>
t.printStackTrace()
Supervision.Stop
}
def importPlaces(is: InputStream, crosswalk: String => Option[GazetteerRecord])(implicit places: PlaceService, ctx: ExecutionContext) = {
val source = StreamConverters.fromInputStream(() => is, 1024)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = Int.MaxValue, allowTruncation = false))
.map(_.utf8String)
val parser = Flow.fromFunction[String, Option[GazetteerRecord]](crosswalk)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.grouped(BATCH_SIZE)
val importer = Sink.foreach[Seq[Option[GazetteerRecord]]] { records =>
val toImport = records.flatten
Await.result(places.importRecords(toImport), 60.minutes)
}
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
source ~> parser ~> importer
ClosedShape
}).withAttributes(ActorAttributes.supervisionStrategy(decider))
graph.run()
}
}
开发者ID:pelagios,项目名称:recogito2,代码行数:51,代码来源:StreamImporter.scala
示例3: MergeGraph
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, UniformFanInShape, ClosedShape}
import akka.stream.scaladsl._
import scala.concurrent.duration._
import scala.concurrent.{Future, Await}
object MergeGraph extends App {
implicit val system = ActorSystem("actor-system")
implicit val materilizer = ActorMaterializer()
val pickMaxOfThree = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
zip1.out ~> zip2.in0
UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
}
val resultSink = Sink.head[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b => sink =>
import GraphDSL.Implicits._
val pm3 = b.add(pickMaxOfThree)
Source.single(1) ~> pm3.in(0)
Source.single(2) ~> pm3.in(1)
Source.single(3) ~> pm3.in(2)
pm3.out ~> sink.in
ClosedShape
})
val max: Future[Int] = g.run()
val r = Await.result(max, 300 millis)
println(r)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:44,代码来源:MergeGraph.scala
示例4: GraphBuilder
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source}
import scala.concurrent.Await
import scala.concurrent.duration._
object GraphBuilder extends App {
implicit val system = ActorSystem("actor-system")
implicit val materializer = ActorMaterializer()
val topHeadSink = Sink.head[Int]
val middleSink = Sink.head[Int]
val bottomHeadSink = Sink.seq[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
val results = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, middleSink, bottomHeadSink)
((_, _, _)) { implicit builder =>
(topHS, midHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
val source = builder.add(Source(1 to 10))
Source.repeat(2) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> midHS.in
source ~> bottomHS.in
ClosedShape
}).run()
val r1 = Await.result(results._1, 300 millis)
val r2 = Await.result(results._2, 300 millis)
val r3 = Await.result(results._3, 300 millis)
println(r1, r2, r3)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:41,代码来源:GraphBuilder.scala
示例5: HoeffdingTreeWithAlpakka
//设置package包名称以及导入依赖的类
import java.nio.file.FileSystems
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, ThrottleMode}
import akka.stream.scaladsl.{GraphDSL, Merge, RunnableGraph, Sink, Source}
import akka.stream.alpakka.file.scaladsl
import org.apache.spark.streamdm.core.ExampleParser
import org.apache.spark.streamdm.core.specification.SpecificationParser
import pl.gosub.akka.online.{HoeffdingTreeProcessor, LearnerQuery}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
object HoeffdingTreeWithAlpakka extends App {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val specParser = new SpecificationParser
val arffPath = this.getClass.getResource("/elecNormNew.arff").getPath // add for Windows .replaceFirst("^/(.:/)", "$1")
val exampleSpec = specParser.fromArff(arffPath)
val fsPath = this.getClass.getResource("/elecNormData.txt").getPath // add for Windows .replaceFirst("^/(.:/)", "$1")
println(fsPath)
val fs = FileSystems.getDefault
val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines(
path = fs.getPath(fsPath),
maxLineSize = 8192,
pollingInterval = 250.millis
)
// if the lines below do not work, please make sure that you got the linefeed character right wrt your operating system (LF vs CRLF)
// lines.map(line => LearnerQuery(line.split(";").apply(0), ExampleParser.fromArff(line.split(";").apply(1), exampleSpec)))
// .runForeach(line => System.out.println(line))
val masterControlProgram = RunnableGraph.fromGraph(GraphDSL.create(Sink.foreach(print)) { implicit builder =>
outMatches =>
import GraphDSL.Implicits._
val taggedInput = lines.map(line => LearnerQuery(line.split(";").apply(0), ExampleParser.fromArff(line.split(";").apply(1), exampleSpec)))
taggedInput.statefulMapConcat(() => {
val proc = new HoeffdingTreeProcessor(exampleSpec)
proc.process(_)
}) ~> outMatches
ClosedShape
}).run()
import scala.concurrent.ExecutionContext.Implicits.global
masterControlProgram.onComplete(_ => system.terminate())
Await.ready(system.whenTerminated, Duration.Inf)
}
开发者ID:gosubpl,项目名称:akka-online,代码行数:60,代码来源:HoeffdingTreeWithAlpakka.scala
示例6: ZipInTest
//设置package包名称以及导入依赖的类
package app
import akka.Done
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source, ZipWith}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object ZipInTest {
def doZipIn()(implicit actorSystem:ActorSystem) = {
implicit val materializer = ActorMaterializer()
// can be simplified with val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
// note, zipWith only takes 2 inputs, so we have to combine 2 of them
val pickMaxOfThree = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _)) // Note , it's [In1, In2, Out]
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _)) // Note , it's [In1, In2, Out]
zip1.out ~> zip2.in0
UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
}
val resultSink = Sink.foreach(println)
val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b => sink =>
import GraphDSL.Implicits._
// importing the partial graph will return its shape (inlets & outlets)
val pm3 = b.add(pickMaxOfThree)
Source.single(3) ~> pm3.in(0)
Source.single(4) ~> pm3.in(1)
Source.single(1) ~> pm3.in(2)
pm3.out ~> sink.in
ClosedShape
})
g.run()
}
}
开发者ID:rcongiu,项目名称:akka-streams-playground,代码行数:42,代码来源:ZipInTest.scala
示例7: OnComplete
//设置package包名称以及导入依赖的类
package de.sciss.fscape
package lucre.stream
import akka.stream.{Attributes, ClosedShape}
import de.sciss.fscape.lucre.UGenGraphBuilder.Input
import de.sciss.fscape.stream.impl.{NodeImpl, StageImpl}
import de.sciss.fscape.stream.{Builder, Control}
object OnComplete {
def apply(ref: Input.Action.Value)(implicit b: Builder): Unit = {
val stage0 = new Stage(ref)
b.add(stage0)
}
private final val name = "OnComplete"
private type Shape = ClosedShape
private final class Stage(ref: Input.Action.Value)(implicit ctrl: Control) extends StageImpl[Shape](name) {
val shape = ClosedShape
def createLogic(attr: Attributes) = new Logic(shape, ref)
}
private final class Logic(shape: Shape, ref: Input.Action.Value)(implicit ctrl: Control)
extends NodeImpl(name, shape) {
override def preStart(): Unit = {
super.preStart()
import ctrl.config.executionContext
ctrl.status.onComplete(ref.execute(_))
}
}
}
开发者ID:Sciss,项目名称:FScape-next,代码行数:35,代码来源:OnComplete.scala
示例8: Demo3
//设置package包名称以及导入依赖的类
package lew.bing.akka.stream
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, ClosedShape}
object Demo3 {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create(){implicit builder:GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach(println)
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = (k:Int) => Flow[Int].map(i => {
println(s"??$k : $i")
i + 10
})
in ~> f1(1) ~> bcast ~> f2(2) ~> merge ~> f3(3) ~> out
bcast ~> f4(4) ~> merge
ClosedShape
})
g.run()
Thread.sleep(3000)
System.exit(0)
}
}
开发者ID:liuguobing634,项目名称:akka,代码行数:38,代码来源:Demo3.scala
示例9: urlsStore
//设置package包名称以及导入依赖的类
package io.scalac.newspaper.crawler.fetching
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Flow, GraphDSL, Partition, RunnableGraph, Sink}
import io.scalac.newspaper.crawler.failure.FailureHandler
import io.scalac.newspaper.crawler.fetching.FetchingFlow.{URLFetched, URLFetchingResult}
import io.scalac.newspaper.crawler.publishing.Publisher
import io.scalac.newspaper.crawler.urls.URLsStore
trait FetchingProcess extends FetchingFlow with Publisher {
def urlsStore: URLsStore
def failureHandler: ActorRef
def process = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val inputURLs = builder.add(urlsStore.getURLs)
val fetchingFlow = builder.add(fetchURLs)
val responsesSplitter = builder.add(Partition(2, splitResponses))
val publisher = builder.add(publish)
val failureSink = builder.add(Sink.actorRef(failureHandler, FailureHandler.Complete))
val toURLFetched = builder.add(urlFetchingResult2URLFetched)
inputURLs ~> fetchingFlow ~> responsesSplitter
responsesSplitter.out(0) ~> toURLFetched ~> publisher
responsesSplitter.out(1) ~> failureSink
ClosedShape
})
private def splitResponses: URLFetchingResult => Int = {
case _: URLFetched => 0
case _ => 1
}
private def urlFetchingResult2URLFetched: Flow[URLFetchingResult, URLFetched, NotUsed] =
Flow[URLFetchingResult].map {
case uf: URLFetched => uf
}
}
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:43,代码来源:FetchingProcess.scala
示例10: SimpleGraphOutputExample
//设置package包名称以及导入依赖的类
package code
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import scala.concurrent.Future
object SimpleGraphOutputExample extends AkkaStreamsApp {
val foldSink = Sink.fold[Int,Int](0)(_+_)
val g = RunnableGraph.fromGraph(GraphDSL.create(foldSink) {
implicit builder => sink =>
import GraphDSL.Implicits._
val in = Source(1 to 5)
val f1 = Flow[Int].map(_*2).log("f1")
val f2 = Flow[Int].map(_ * 1).log("f2")
val f3 = Flow[Int].map(_*2).log("f3")
val f4 = Flow[Int].map(_+1).log("f4")
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
in ~> f1 ~> bcast ~> f2 ~> merge ~> f4 ~> sink
bcast ~> f3 ~> merge
ClosedShape
})
override def akkaStreamsExample: Future[_] =
g.run
runExample
}
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:34,代码来源:SimpleGraphOutputExample.scala
示例11: EventGraphExample
//设置package包名称以及导入依赖的类
package code
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import code.EdgeServices._
import scala.concurrent.Future
object EventGraphExample extends AkkaStreamsApp with EdgeServices {
val g = RunnableGraph.fromGraph(GraphDSL.create(redshiftSink) {
implicit builder => sink =>
import GraphDSL.Implicits._
val eventsSource = s3EventSource
val weather = Flow[Event].mapAsync(4)(e => fetchWeatherInfo(e.date)).log("weather")
val imageInfo = Flow[Event].mapAsync(4)(e => fetchImageInfo(e.imageUrl)).log("image-info")
val bcast = builder.add(Broadcast[Event](3))
val zip = builder.add(ZipWith[Event,WeatherData,ImageInfo,Event]{(e, w, i) =>
e.copy(weather = Some(w), imageInfo = Some(i))
})
eventsSource ~> bcast ~> zip.in0
bcast ~> weather ~> zip.in1
bcast ~> imageInfo ~> zip.in2
zip.out ~> sink
ClosedShape
})
override def akkaStreamsExample: Future[_] =
g.run
runExample
}
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:36,代码来源:EventGraphExample.scala
示例12: SimpleGraphExample
//设置package包名称以及导入依赖的类
package code
import akka.Done
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import scala.concurrent.Future
object SimpleGraphExample extends AkkaStreamsApp {
val out: Sink[Int, Future[Done]] =
Flow[Int]
.log("receiving")
.toMat(Sink.foreach[Int](e => log.debug("Received: {}", e)))(Keep.right)
val g = RunnableGraph.fromGraph(GraphDSL.create(out) {
implicit builder => sink =>
import GraphDSL.Implicits._
val in = Source(1 to 5)
val f1 = Flow[Int].map(_*2).log("f1")
val f2 = Flow[Int].map(_ * 1).log("f2")
val f3 = Flow[Int].map(_*2).log("f3")
val f4 = Flow[Int].map(_+1).log("f4")
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
in ~> f1 ~> bcast ~> f2 ~> merge ~> f4 ~> sink
bcast ~> f3 ~> merge
ClosedShape
})
override def akkaStreamsExample: Future[_] =
g.run
runExample
}
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:40,代码来源:SimpleGraphExample.scala
示例13: GraphMaker
//设置package包名称以及导入依赖的类
package gatling.protocol.protoclient
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, Framing, GraphDSL, RunnableGraph, Sink, Source, Tcp}
import akka.stream.{ClosedShape, FlowShape}
import akka.util.ByteString
object GraphMaker {
import GraphDSL.Implicits._
implicit val sys = ActorSystem("mySystem")
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString
}
def bsframer: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].via(Framing.delimiter(ByteString("!"), maximumFrameLength = 1000, allowTruncation = true))
def printer: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(x => {
println(escape(x.utf8String));
x
})
def tcpFlow(ip: String, port: Int, router: ActorRef) = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
val source = b.add(Source.actorPublisher[String](MessageSender.props(router)))
val tcp = b.add(Tcp().outgoingConnection(ip, port))
val sink = b.add(Sink.actorSubscriber(MessageReceiver.props(router)))
val mapToByteStr = b.add(Flow[String].map(x => ByteString(x + "!")))
//val framer: FlowShape[ByteString, ByteString] = b.add(bsframer)
//val separator = b.add(Flow[String].map(x => ByteString(x + "?")))
source ~> mapToByteStr ~> tcp ~> sink
ClosedShape
}
)
}
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:41,代码来源:GraphMaker.scala
注:本文中的akka.stream.ClosedShape类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论