本文整理汇总了Scala中akka.stream.scaladsl.RunnableGraph类的典型用法代码示例。如果您正苦于以下问题:Scala RunnableGraph类的具体用法?Scala RunnableGraph怎么用?Scala RunnableGraph使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RunnableGraph类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: GraphBuilder
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source}
import scala.concurrent.Await
import scala.concurrent.duration._
object GraphBuilder extends App {
implicit val system = ActorSystem("actor-system")
implicit val materializer = ActorMaterializer()
val topHeadSink = Sink.head[Int]
val middleSink = Sink.head[Int]
val bottomHeadSink = Sink.seq[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
val results = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, middleSink, bottomHeadSink)
((_, _, _)) { implicit builder =>
(topHS, midHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
val source = builder.add(Source(1 to 10))
Source.repeat(2) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> midHS.in
source ~> bottomHS.in
ClosedShape
}).run()
val r1 = Await.result(results._1, 300 millis)
val r2 = Await.result(results._2, 300 millis)
val r3 = Await.result(results._3, 300 millis)
println(r1, r2, r3)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:41,代码来源:GraphBuilder.scala
示例3: Alpha
//设置package包名称以及导入依赖的类
package alpha
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.RunnableGraph
import alpha.models.pipelines.NoOpPipeline
object Alpha {
def initializeTradingStrategyWithOp[Mat, A](tradingPipeline: RunnableGraph[Mat], doWithMaterialized: Mat => A)(implicit materializer: Materializer): A = {
doWithMaterialized(tradingPipeline.run())
}
def initializeTradingStrategy[Mat](tradingPipeline: RunnableGraph[Mat])(implicit materializer: Materializer): Unit = {
initializeTradingStrategyWithOp(tradingPipeline, _: Mat => Unit)
}
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("alphaActorSystem")
implicit val materializer = ActorMaterializer()
println("Started pipeline: ")
NoOpPipeline.pipeline.run()
println("Ended")
}
}
开发者ID:kszlim,项目名称:scala-alpha,代码行数:25,代码来源:Alpha.scala
示例4: constructTradingStrategy
//设置package包名称以及导入依赖的类
package alpha.pipeline
import akka.NotUsed
import akka.stream.scaladsl.RunnableGraph
import alpha.models.filters.NoOpFilter
trait Pipeline[Mat] {
val pipeline: RunnableGraph[Mat]
def constructTradingStrategy(handler: DataHandler, strategy: Strategy, risk: Risk, fulfiller: Fulfiller, filter: DataFilter = NoOpFilter()): RunnableGraph[NotUsed] = {
handler.sourceFromData()
.via(filter.withFilters(Nil))
.via(strategy.handleMarketEvent())
.via(risk.riskManagement)
.to(fulfiller.createExecutionSink())
}
}
开发者ID:kszlim,项目名称:scala-alpha,代码行数:17,代码来源:Pipeline.scala
示例5: NotificationsApp
//设置package包名称以及导入依赖的类
package it.wknd.reactive.backend
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import com.softwaremill.macwire.wire
import com.typesafe.config.ConfigFactory
import it.wknd.reactive.backend.flow.EventGraph
import it.wknd.reactive.backend.model.{HealthNotification, HeartRate, Step}
import it.wknd.reactive.backend.source.{HrActorSource, SourceProvider, StepActorSource}
import scala.concurrent.Future
object NotificationsApp extends App {
implicit val config = ConfigFactory.load()
implicit val actorSystem = ActorSystem("hr-backend")
implicit val ec = actorSystem.dispatcher
implicit val materializer = ActorMaterializer()
lazy val sourceProvider = wire[SourceProvider]
val hrActor = actorSystem.actorOf(Props[HrActorSource])
val hrPub = ActorPublisher[HeartRate](hrActor)
val stepActor = actorSystem.actorOf(Props[StepActorSource])
val stepPub = ActorPublisher[Step](stepActor)
RunnableGraph fromGraph {
EventGraph(
stepSource = Source.fromPublisher(stepPub),
hrSource = Source.fromPublisher(hrPub),
sink = Sink.actorSubscriber[HealthNotification](Props[NotifierActor]))
} run()
val bindingFuture: Future[ServerBinding] =
Http().bindAndHandle(sourceProvider.routes(hrActor = hrActor, stepActor = stepActor), "localhost", 2525)
}
开发者ID:VlasShatokhin,项目名称:it-wknd-streams,代码行数:42,代码来源:NotificationsApp.scala
示例6: WorkingWithGraphsApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
import scala.concurrent.duration._
import scala.util.Random
object WorkingWithGraphsApplication extends App {
implicit val actorSystem = ActorSystem("WorkingWithGraphs")
implicit val actorMaterializer = ActorMaterializer()
trait MobileMsg {
def id = Random.nextInt(1000)
def toGenMsg(origin: String) = GenericMsg(id, origin)
}
class AndroidMsg extends MobileMsg
class IosMsg extends MobileMsg
case class GenericMsg(id: Int, origin: String)
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
//Sources
val androidNotification = Source.tick(2 seconds, 500 millis, new AndroidMsg)
val iOSNotification = Source.tick(700 millis, 600 millis, new IosMsg)
//Flow
val groupAndroid = Flow[AndroidMsg].map(_.toGenMsg("ANDROID")).groupedWithin(5, 5 seconds).async
val groupIos = Flow[IosMsg].map(_.toGenMsg("IOS")).groupedWithin(5, 5 seconds).async
def counter = Flow[Seq[GenericMsg]].via(new StatefulCounterFlow())
def mapper = Flow[Seq[GenericMsg]].mapConcat(_.toList)
//Junctions
val aBroadcast = builder.add(Broadcast[Seq[GenericMsg]](2))
val iBroadcast = builder.add(Broadcast[Seq[GenericMsg]](2))
val balancer = builder.add(Balance[Seq[GenericMsg]](2))
val notitificationMerge = builder.add(Merge[Seq[GenericMsg]](2))
val genericNotitificationMerge = builder.add(Merge[GenericMsg](2))
def counterSink(s: String) = Sink.foreach[Int](x => println(s"$s: [$x]"))
//Graph
androidNotification ~> groupAndroid ~> aBroadcast ~> counter ~> counterSink("Android")
aBroadcast ~> notitificationMerge
iBroadcast ~> notitificationMerge
iOSNotification ~> groupIos ~> iBroadcast ~> counter ~> counterSink("Ios")
notitificationMerge ~> balancer ~> mapper.async ~> genericNotitificationMerge
balancer ~> mapper.async ~> genericNotitificationMerge
genericNotitificationMerge ~> Sink.foreach(println)
ClosedShape
})
graph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:61,代码来源:WorkingWithGraphsApplication.scala
示例7: Sum1
//设置package包名称以及导入依赖的类
package com.stulsoft.akka.stream
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, RunnableGraph, Sink, Source}
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object Sum1 extends App with LazyLogging {
logger.info("Start")
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// connect the Source to the Sink, obtaining a RunnableGraph
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
// materialize the flow and get the value of the FoldSink
// Option 1
var sum: Future[Int] = runnable.run()
var result = Await.result(sum, 15.seconds)
logger.debug("result is {}", result)
// Option 2
sum = source.runWith(sink)
result = Await.result(sum, 15.seconds)
logger.debug("result is {}", result)
// Note! Immutable
val source2 = Source(1 to 10)
source2.map(_ => 0) // has no effect on source, since it's immutable
sum = source2.runWith(Sink.fold(0)(_ + _))
result = Await.result(sum, 15.seconds)
logger.debug("result is {}", result) // 55
val zeroes = source2.map(_ => 0)
sum = zeroes.runWith(Sink.fold(0)(_ + _))
result = Await.result(sum, 15.seconds)
logger.debug("result is {}", result) // 0
system.terminate()
logger.info("End")
}
开发者ID:ysden123,项目名称:poc,代码行数:49,代码来源:Sum1.scala
示例8: stop
//设置package包名称以及导入依赖的类
package co.blocke
package latekafka
import akka.actor.Actor
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import org.apache.kafka.common.serialization.Deserializer
trait GraphHolder[V] {
val host: String
val groupId: String
val topic: String
val deserializer: Deserializer[V]
val properties: Map[String, String]
val flow: RunnableGraph[akka.NotUsed]
var late: LateKafka[V] = null
def stop() = if (late != null) late.stop()
}
// NOTE: We must create the LateKafka object *inside* its own thread! So pass in all the needed
// parameters (in GraphHolder) and create it once the actor has started and called run().
class FlowActor[V](graph: GraphHolder[V])(implicit materializer: ActorMaterializer) extends Actor {
def run() = graph.flow.run()
override def postStop() {
graph.stop()
}
def receive: Actor.Receive = Actor.ignoringBehavior
run() // just go, by default
}
开发者ID:gzoller,项目名称:LateKafka,代码行数:36,代码来源:FlowActor.scala
示例9: KafkaConsumerExample
//设置package包名称以及导入依赖的类
package com.benencahill.akka.streams
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
import scala.util.Success
object KafkaConsumerExample extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val context = system.dispatcher
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("testing-akka-streams")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_DOC, "earliest")
val kafkaSource = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics("testing-akka-streams"))
.map( record => record.value())
val deserialize = Flow[String].mapAsync(1){ data =>
import UserEventJsonProtocol._
Unmarshal(data.mkString).to[UserEvent]
}
val print = Sink.foreach[UserEvent](event => println(event))
val blueprint: RunnableGraph[Future[Done]] = kafkaSource.take(100).via(deserialize).toMat(print)(Keep.right)
val materialzied = blueprint.run()
materialzied.andThen { case Success(_) => system.terminate() }
}
开发者ID:benen,项目名称:akka-streams-intro,代码行数:44,代码来源:KafkaConsumerExample.scala
示例10: Demo3
//设置package包名称以及导入依赖的类
package lew.bing.akka.stream
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, ClosedShape}
object Demo3 {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val g = RunnableGraph.fromGraph(GraphDSL.create(){implicit builder:GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach(println)
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = (k:Int) => Flow[Int].map(i => {
println(s"??$k : $i")
i + 10
})
in ~> f1(1) ~> bcast ~> f2(2) ~> merge ~> f3(3) ~> out
bcast ~> f4(4) ~> merge
ClosedShape
})
g.run()
Thread.sleep(3000)
System.exit(0)
}
}
开发者ID:liuguobing634,项目名称:akka,代码行数:38,代码来源:Demo3.scala
示例11: urlsStore
//设置package包名称以及导入依赖的类
package io.scalac.newspaper.crawler.fetching
import akka.NotUsed
import akka.actor.ActorRef
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Flow, GraphDSL, Partition, RunnableGraph, Sink}
import io.scalac.newspaper.crawler.failure.FailureHandler
import io.scalac.newspaper.crawler.fetching.FetchingFlow.{URLFetched, URLFetchingResult}
import io.scalac.newspaper.crawler.publishing.Publisher
import io.scalac.newspaper.crawler.urls.URLsStore
trait FetchingProcess extends FetchingFlow with Publisher {
def urlsStore: URLsStore
def failureHandler: ActorRef
def process = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val inputURLs = builder.add(urlsStore.getURLs)
val fetchingFlow = builder.add(fetchURLs)
val responsesSplitter = builder.add(Partition(2, splitResponses))
val publisher = builder.add(publish)
val failureSink = builder.add(Sink.actorRef(failureHandler, FailureHandler.Complete))
val toURLFetched = builder.add(urlFetchingResult2URLFetched)
inputURLs ~> fetchingFlow ~> responsesSplitter
responsesSplitter.out(0) ~> toURLFetched ~> publisher
responsesSplitter.out(1) ~> failureSink
ClosedShape
})
private def splitResponses: URLFetchingResult => Int = {
case _: URLFetched => 0
case _ => 1
}
private def urlFetchingResult2URLFetched: Flow[URLFetchingResult, URLFetched, NotUsed] =
Flow[URLFetchingResult].map {
case uf: URLFetched => uf
}
}
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:43,代码来源:FetchingProcess.scala
示例12: BlueprintExample
//设置package包名称以及导入依赖的类
package code
import java.nio.file.FileSystems
import akka.NotUsed
import akka.stream.IOResult
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, RunnableGraph}
import akka.util.ByteString
import scala.concurrent.Future
object BlueprintExample extends AkkaStreamsApp {
val file = this.getClass.getClassLoader.getResource("current_inventory.csv")
val inPath = FileSystems.getDefault.getPath(file.getPath)
val outPath = FileSystems.getDefault.getPath("no_inventory.csv")
val fileSource = FileIO.fromPath(inPath)
val fileSink = FileIO.toPath(outPath)
val csvHandler: Flow[String, List[String], NotUsed] =
Flow[String]
.drop(1)
.map(_.split(",").toList)
.log("csvHandler")
val lowInventoryFlow: RunnableGraph[Future[IOResult]] =
fileSource
.via(Framing.delimiter(ByteString("\n"), Integer.MAX_VALUE))
.map(_.utf8String)
.log("Before CSV Handler")
.via(csvHandler)
.filter(list => list(2).toInt == 0)
.log("After filter")
.map { list =>
ByteString(list.mkString(",")) ++ ByteString("\n")
}.toMat(fileSink)(Keep.right)
override def akkaStreamsExample: Future[_] =
lowInventoryFlow.run()
runExample
}
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:42,代码来源:BlueprintExample.scala
示例13: GraphMaker
//设置package包名称以及导入依赖的类
package gatling.protocol.protoclient
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, Framing, GraphDSL, RunnableGraph, Sink, Source, Tcp}
import akka.stream.{ClosedShape, FlowShape}
import akka.util.ByteString
object GraphMaker {
import GraphDSL.Implicits._
implicit val sys = ActorSystem("mySystem")
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString
}
def bsframer: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].via(Framing.delimiter(ByteString("!"), maximumFrameLength = 1000, allowTruncation = true))
def printer: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(x => {
println(escape(x.utf8String));
x
})
def tcpFlow(ip: String, port: Int, router: ActorRef) = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
val source = b.add(Source.actorPublisher[String](MessageSender.props(router)))
val tcp = b.add(Tcp().outgoingConnection(ip, port))
val sink = b.add(Sink.actorSubscriber(MessageReceiver.props(router)))
val mapToByteStr = b.add(Flow[String].map(x => ByteString(x + "!")))
//val framer: FlowShape[ByteString, ByteString] = b.add(bsframer)
//val separator = b.add(Flow[String].map(x => ByteString(x + "?")))
source ~> mapToByteStr ~> tcp ~> sink
ClosedShape
}
)
}
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:41,代码来源:GraphMaker.scala
示例14: createItemTypeGraph
//设置package包名称以及导入依赖的类
package com.github.j5ik2o.spetstore.adaptor.http
import java.util.UUID
import akka.NotUsed
import akka.stream.scaladsl.{ Flow, Keep, RunnableGraph, Sink, Source }
import com.github.j5ik2o.spetstore.domain.item.CategoryId
import com.github.j5ik2o.spetstore.usecase.{ CreateItemTypeApp, CustomerCreatedApp, ItemTypeCreatedApp, ItemTypeUseCase }
import scala.concurrent.Future
trait ItemTypeSupport {
val itemTypeUseCase: ItemTypeUseCase
protected val convertToCreateItemType: Flow[CreateItemTypeJson, CreateItemTypeApp, NotUsed] =
Flow[CreateItemTypeJson].map { json =>
CreateItemTypeApp(
CategoryId(UUID.fromString(json.categoryId)),
json.name,
json.description
)
}
protected val convertToItemTypeCreated: Flow[ItemTypeCreatedApp, ItemTypeCreatedJson, NotUsed] =
Flow[ItemTypeCreatedApp].map { e =>
ItemTypeCreatedJson(e.entityId.value.toString)
}
def createItemTypeGraph(json: CreateItemTypeJson): RunnableGraph[Future[ItemTypeCreatedJson]] =
Source.single(json)
.via(convertToCreateItemType)
.via(itemTypeUseCase.createItemType)
.via(convertToItemTypeCreated)
.toMat(Sink.head)(Keep.right)
}
开发者ID:j5ik2o,项目名称:spetstore-cqrs-es-akka,代码行数:37,代码来源:ItemTypeSupport.scala
示例15: createCustomerGraph
//设置package包名称以及导入依赖的类
package com.github.j5ik2o.spetstore.adaptor.http
import java.util.UUID
import akka.NotUsed
import akka.stream.scaladsl.{ Flow, Keep, RunnableGraph, Sink, Source }
import com.github.j5ik2o.spetstore.domain.basic.Pref
import com.github.j5ik2o.spetstore.domain.item.CategoryId
import com.github.j5ik2o.spetstore.usecase.{ CreateCustomerApp, CustomerCreatedApp, CustomerUseCase }
import scala.concurrent.Future
trait CustomerSupport {
val customerUseCase: CustomerUseCase
protected val convertToCreateCustomer: Flow[CreateCustomerJson, CreateCustomerApp, NotUsed] =
Flow[CreateCustomerJson].map { createCustomerJson =>
CreateCustomerApp(
createCustomerJson.name,
createCustomerJson.sexType,
createCustomerJson.zipCode,
Pref(createCustomerJson.pref),
createCustomerJson.cityName,
createCustomerJson.addressName,
createCustomerJson.buildingName,
createCustomerJson.email,
createCustomerJson.phone,
createCustomerJson.loginName,
createCustomerJson.password,
createCustomerJson.favoriteCategoryId.map(e => CategoryId(UUID.fromString(e)))
)
}
protected val convertToCustomerCreated: Flow[CustomerCreatedApp, CustomerCreatedJson, NotUsed] =
Flow[CustomerCreatedApp].map { e =>
CustomerCreatedJson(e.entityId.value.toString)
}
def createCustomerGraph(createCustomerJson: CreateCustomerJson): RunnableGraph[Future[CustomerCreatedJson]] =
Source.single(createCustomerJson)
.via(convertToCreateCustomer)
.via(customerUseCase.createCustomer)
.via(convertToCustomerCreated)
.toMat(Sink.head)(Keep.right)
}
开发者ID:j5ik2o,项目名称:spetstore-cqrs-es-akka,代码行数:47,代码来源:CustomerSupport.scala
注:本文中的akka.stream.scaladsl.RunnableGraph类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论