本文整理汇总了Scala中akka.stream.Graph类的典型用法代码示例。如果您正苦于以下问题:Scala Graph类的具体用法?Scala Graph怎么用?Scala Graph使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Graph类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: IdentityValidation
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation
import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{ActorMaterializer, Graph, SourceShape}
import org.hpi.esb.datavalidator.configuration.Config
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.hpi.esb.datavalidator.kafka.TopicHandler
import org.hpi.esb.datavalidator.validation.graphstage.ZipWhileEitherAvailable
class IdentityValidation(inTopicHandler: TopicHandler,
outTopicHandler: TopicHandler,
materializer: ActorMaterializer)
extends Validation[SimpleRecord](inTopicHandler, outTopicHandler, materializer) {
override val valueName = "SimpleRecords"
override val queryName = "Identity Query"
def createSource(): Graph[SourceShape[(Option[SimpleRecord], Option[SimpleRecord])], NotUsed] = {
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWhileEitherAvailable[SimpleRecord]())
inTopicHandler.topicSource ~> take(inNumberOfMessages) ~> toSimpleRecords ~> zip.in0
outTopicHandler.topicSource ~> take(outNumberOfMessages) ~> toSimpleRecords ~> zip.in1
SourceShape(zip.out)
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:33,代码来源:IdentityValidation.scala
示例2: Sink
//设置package包名称以及导入依赖的类
package mesosphere.marathon.stream
import akka.actor.{ ActorRef, Props, Status }
import akka.{ Done, NotUsed }
import akka.stream.{ Graph, SinkShape, UniformFanOutShape }
import akka.stream.scaladsl.{ SinkQueueWithCancel, Sink => AkkaSink }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
object Sink {
def set[T]: AkkaSink[T, Future[immutable.Set[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, immutable.Set[T]](immutable.Set.newBuilder[T]))
}
def sortedSet[T](implicit ordering: Ordering[T]): AkkaSink[T, Future[immutable.SortedSet[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, immutable.SortedSet[T]](immutable.SortedSet.newBuilder[T]))
}
def map[K, V]: AkkaSink[(K, V), Future[immutable.Map[K, V]]] = {
AkkaSink.fromGraph(new CollectionStage[(K, V), immutable.Map[K, V]](immutable.Map.newBuilder[K, V]))
}
def list[T]: AkkaSink[T, Future[List[T]]] = {
AkkaSink.fromGraph(new CollectionStage[T, List[T]](List.newBuilder[T]))
}
// Akka's API
def fromGraph[T, M](g: Graph[SinkShape[T], M]): AkkaSink[T, M] = AkkaSink.fromGraph(g)
def fromSubscriber[T](subscriber: Subscriber[T]): AkkaSink[T, NotUsed] = AkkaSink.fromSubscriber(subscriber)
def cancelled[T]: AkkaSink[T, NotUsed] = AkkaSink.cancelled
def head[T]: AkkaSink[T, Future[T]] = AkkaSink.head
def headOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.headOption
def last[T]: AkkaSink[T, Future[T]] = AkkaSink.last[T]
def lastOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.lastOption[T]
def seq[T]: AkkaSink[T, Future[Seq[T]]] = AkkaSink.seq[T]
def asPublisher[T](fanout: Boolean): AkkaSink[T, Publisher[T]] = AkkaSink.asPublisher[T](fanout)
def ignore: AkkaSink[Any, Future[Done]] = AkkaSink.ignore
def foreach[T](f: T => Unit): AkkaSink[T, Future[Done]] = AkkaSink.foreach[T](f)
def combine[T, U](
first: AkkaSink[U, _],
second: AkkaSink[U, _],
rest: AkkaSink[U, _]*)(strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): AkkaSink[T, NotUsed] =
AkkaSink.combine[T, U](first, second, rest: _*)(strategy)
def foreachParallel[T](parallelism: Int)(f: T ? Unit)(implicit ec: ExecutionContext): AkkaSink[T, Future[Done]] =
AkkaSink.foreachParallel[T](parallelism)(f)
def fold[U, T](zero: U)(f: (U, T) ? U): AkkaSink[T, Future[U]] = AkkaSink.fold[U, T](zero)(f)
def reduce[T](f: (T, T) ? T): AkkaSink[T, Future[T]] = AkkaSink.reduce(f)
def onComplete[T](callback: Try[Done] => Unit): AkkaSink[T, NotUsed] = AkkaSink.onComplete(callback)
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): AkkaSink[T, NotUsed] =
AkkaSink.actorRef(ref, onCompleteMessage)
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) ? Any = Status.Failure): AkkaSink[T, NotUsed] =
AkkaSink.actorRefWithAck(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
def actorSubscriber[T](props: Props): AkkaSink[T, ActorRef] = AkkaSink.actorSubscriber(props)
def queue[T](): AkkaSink[T, SinkQueueWithCancel[T]] = AkkaSink.queue[T]()
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:62,代码来源:Sink.scala
示例3: Builder
//设置package包名称以及导入依赖的类
package de.sciss.fscape.stream
import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{Graph, Inlet, Outlet, Shape}
object Builder {
def apply()(implicit dsl: GraphDSL.Builder[NotUsed], ctrl: Control): Builder = new Impl(ctrl)
private final class Impl(val control: Control)(implicit b: GraphDSL.Builder[NotUsed]) extends Builder {
def add[S <: Shape](graph: Graph[S, _]): S = b.add(graph)
def dsl: GraphDSL.Builder[NotUsed] = b
def connect[A](out: Outlet[A], in: Inlet[A]): Unit = {
import GraphDSL.Implicits._
out ~> in
}
def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B] = {
import GraphDSL.Implicits._
out.map(fun).outlet
}
}
}
trait Builder {
def control: Control
def dsl: GraphDSL.Builder[NotUsed]
def add[S <: Shape](graph: Graph[S, _]): S
def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B]
def connect[A](out: Outlet[A], in: Inlet[A]): Unit
}
开发者ID:Sciss,项目名称:FScape-next,代码行数:37,代码来源:Builder.scala
示例4: AccountWebServerTest
//设置package包名称以及导入依赖的类
package com.knoldus.account
import akka.http.scaladsl.model.headers.{CustomHeader, Upgrade, UpgradeProtocol}
import akka.http.scaladsl.model.ws.{Message, UpgradeToWebSocket}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.stream.{FlowShape, Graph}
import org.scalatest.{Matchers, WordSpec}
class AccountWebServerTest extends WordSpec with Matchers with ScalatestRouteTest with WebServer {
"User" should {
"be able to create account" in {
Get("/register?name=john") ~>
Upgrade(List(UpgradeProtocol("websocket"))) ~> emulateHttpCore ~>
route ~> check {
status shouldEqual StatusCodes.SwitchingProtocols
}
}
}
private def emulateHttpCore(req: HttpRequest): HttpRequest =
req.header[Upgrade] match {
case Some(upgrade) if upgrade.hasWebSocket => req.copy(headers = req.headers :+ upgradeToWebsocketHeaderMock)
case _ => req
}
private def upgradeToWebsocketHeaderMock: UpgradeToWebSocket =
new CustomHeader() with UpgradeToWebSocket {
override def requestedProtocols = Nil
override def name = "dummy"
override def value = "dummy"
override def renderInRequests = true
override def renderInResponses = true
override def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String]): HttpResponse =
HttpResponse(StatusCodes.SwitchingProtocols)
}
}
开发者ID:knoldus,项目名称:akka-http-websocket-microservices.g8,代码行数:45,代码来源:AccountWebServerTest.scala
示例5: HierarchicalProcessingFlows
//设置package包名称以及导入依赖的类
package com.mooneyserver.akkastreams.hierarchical.flows
import akka.NotUsed
import akka.stream.scaladsl.{Flow, GraphDSL, UnzipWith, ZipWith}
import akka.stream.{FanInShape3, FanOutShape3, FlowShape, Graph}
import com.mooneyserver.akkastreams.hierarchical.model.{Child, Grandparent, Parent}
import GraphDSL.Implicits._
object HierarchicalProcessingFlows {
val handleGrandParent: Graph[FlowShape[Grandparent, Grandparent], NotUsed] =
GraphDSL.create() { implicit builder =>
val Input: FanOutShape3[Grandparent, Grandparent, Seq[Parent], Seq[Child]] = builder.add(UnzipWith(breakdownFamily))
val GrandParent_Proc: FlowShape[Grandparent, Grandparent] = builder.add(Flow[Grandparent].map(processGrandParent))
val Parent_Proc: FlowShape[Seq[Parent], Seq[Parent]] = builder.add(Flow[Seq[Parent]].map(processParents))
val Child_Proc: FlowShape[Seq[Child], Seq[Child]] = builder.add(Flow[Seq[Child]].map(processChildren))
val Rebuild_Grandparent: FanInShape3[Grandparent, Seq[Parent], Seq[Child], Grandparent] = builder.add(ZipWith[Grandparent, Seq[Parent], Seq[Child], Grandparent](recombineGrandParent))
Input.out0 ~> GrandParent_Proc ~> Rebuild_Grandparent.in0
Input.out1 ~> Parent_Proc ~> Rebuild_Grandparent.in1
Input.out2 ~> Child_Proc ~> Rebuild_Grandparent.in2
FlowShape(Input.in, Rebuild_Grandparent.out)
}
def breakdownFamily(grandparent: Grandparent): Tuple3[Grandparent, Seq[Parent], Seq[Child]] =
(grandparent, grandparent.parents, grandparent.parents.flatMap(_.children))
def processGrandParent(gp: Grandparent): Grandparent =
gp.copy(name = s"${gp.name}-Modified")
def processParents(p: Seq[Parent]): Seq[Parent] =
for {parent <- p} yield parent.copy(name = s"${parent.name}-Modified")
def processChildren(c: Seq[Child]): Seq[Child] =
for {child <- c} yield child.copy(name = s"${child.name}-Modified")
def recombineGrandParent(gp: Grandparent, p: Seq[Parent], c: Seq[Child]): Grandparent = {
val parents = p map { parent =>
val chillins = c filter { _.name.startsWith(parent.name.split("-Modified")(0))}
parent.copy(children = chillins)
}
gp.copy(parents = parents)
}
}
开发者ID:irishshagua,项目名称:akka-stream-playground,代码行数:47,代码来源:HierarchicalProcessingFlows.scala
示例6: AlsoToErrorHandling
//设置package包名称以及导入依赖的类
package sandbox
import com.typesafe.config.ConfigFactory
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.stream.scaladsl._
import akka.stream.Graph
import akka.stream.SinkShape
import akka.stream.FlowShape
import scala.annotation.unchecked.uncheckedVariance
object AlsoToErrorHandling extends App {
val config = ConfigFactory.empty
implicit val system = ActorSystem("AlsoToErrorHandling",config)
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
try {
val source =
Source(0 to 15).via(Flow[Int].alsoTo(Sink.foreach(n => println(s"HttpRequest: $n"))))
val kafkaSink =
Flow[Int]
.map(n => if(n == 10) throw new Exception() else n)
.to(Sink.foreach(n => println(s"Published to Kafka: $n")))
// val flow = Flow[Int].via(safeAlsoToGraph(kafkaSink))
val flow = Flow[Int].alsoTo(kafkaSink)
val httpSink = Sink.foreach((n:Int) => println(s"HttpResponse: $n"))
val res = waitFor(source.via(flow).toMat(httpSink)(Keep.right).run)
println(res)
}
finally {
waitFor(system.terminate)
}
def safeAlsoToGraph[Out,M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b ? r ?
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Out](2,true))
bcast.out(1) ~> r
FlowShape(bcast.in, bcast.out(0))
}
def waitFor[T](f: Future[T], d: Duration = 10.seconds): T = Await.result(f,d)
}
开发者ID:jthompson-hiya,项目名称:akka-streams-sandbox,代码行数:57,代码来源:AlsoToErrorHandling.scala
注:本文中的akka.stream.Graph类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论