• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Graph类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.Graph的典型用法代码示例。如果您正苦于以下问题:Scala Graph类的具体用法?Scala Graph怎么用?Scala Graph使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Graph类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: IdentityValidation

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{ActorMaterializer, Graph, SourceShape}
import org.hpi.esb.datavalidator.configuration.Config
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.hpi.esb.datavalidator.kafka.TopicHandler
import org.hpi.esb.datavalidator.validation.graphstage.ZipWhileEitherAvailable

class IdentityValidation(inTopicHandler: TopicHandler,
                         outTopicHandler: TopicHandler,
                         materializer: ActorMaterializer)
  extends Validation[SimpleRecord](inTopicHandler, outTopicHandler, materializer) {

  override val valueName = "SimpleRecords"
  override val queryName = "Identity Query"

  def createSource(): Graph[SourceShape[(Option[SimpleRecord], Option[SimpleRecord])], NotUsed] = {

    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val zip = builder.add(ZipWhileEitherAvailable[SimpleRecord]())

      inTopicHandler.topicSource ~> take(inNumberOfMessages) ~> toSimpleRecords ~> zip.in0
      outTopicHandler.topicSource ~> take(outNumberOfMessages) ~> toSimpleRecords ~> zip.in1

      SourceShape(zip.out)
    }
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:33,代码来源:IdentityValidation.scala


示例2: Sink

//设置package包名称以及导入依赖的类
package mesosphere.marathon.stream

import akka.actor.{ ActorRef, Props, Status }
import akka.{ Done, NotUsed }
import akka.stream.{ Graph, SinkShape, UniformFanOutShape }
import akka.stream.scaladsl.{ SinkQueueWithCancel, Sink => AkkaSink }
import org.reactivestreams.{ Publisher, Subscriber }

import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try


object Sink {
  def set[T]: AkkaSink[T, Future[immutable.Set[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, immutable.Set[T]](immutable.Set.newBuilder[T]))
  }

  def sortedSet[T](implicit ordering: Ordering[T]): AkkaSink[T, Future[immutable.SortedSet[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, immutable.SortedSet[T]](immutable.SortedSet.newBuilder[T]))
  }

  def map[K, V]: AkkaSink[(K, V), Future[immutable.Map[K, V]]] = {
    AkkaSink.fromGraph(new CollectionStage[(K, V), immutable.Map[K, V]](immutable.Map.newBuilder[K, V]))
  }

  def list[T]: AkkaSink[T, Future[List[T]]] = {
    AkkaSink.fromGraph(new CollectionStage[T, List[T]](List.newBuilder[T]))
  }

  // Akka's API
  def fromGraph[T, M](g: Graph[SinkShape[T], M]): AkkaSink[T, M] = AkkaSink.fromGraph(g)
  def fromSubscriber[T](subscriber: Subscriber[T]): AkkaSink[T, NotUsed] = AkkaSink.fromSubscriber(subscriber)
  def cancelled[T]: AkkaSink[T, NotUsed] = AkkaSink.cancelled
  def head[T]: AkkaSink[T, Future[T]] = AkkaSink.head
  def headOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.headOption
  def last[T]: AkkaSink[T, Future[T]] = AkkaSink.last[T]
  def lastOption[T]: AkkaSink[T, Future[Option[T]]] = AkkaSink.lastOption[T]
  def seq[T]: AkkaSink[T, Future[Seq[T]]] = AkkaSink.seq[T]
  def asPublisher[T](fanout: Boolean): AkkaSink[T, Publisher[T]] = AkkaSink.asPublisher[T](fanout)
  def ignore: AkkaSink[Any, Future[Done]] = AkkaSink.ignore
  def foreach[T](f: T => Unit): AkkaSink[T, Future[Done]] = AkkaSink.foreach[T](f)
  def combine[T, U](
    first: AkkaSink[U, _],
    second: AkkaSink[U, _],
    rest: AkkaSink[U, _]*)(strategy: Int ? Graph[UniformFanOutShape[T, U], NotUsed]): AkkaSink[T, NotUsed] =
    AkkaSink.combine[T, U](first, second, rest: _*)(strategy)
  def foreachParallel[T](parallelism: Int)(f: T ? Unit)(implicit ec: ExecutionContext): AkkaSink[T, Future[Done]] =
    AkkaSink.foreachParallel[T](parallelism)(f)
  def fold[U, T](zero: U)(f: (U, T) ? U): AkkaSink[T, Future[U]] = AkkaSink.fold[U, T](zero)(f)
  def reduce[T](f: (T, T) ? T): AkkaSink[T, Future[T]] = AkkaSink.reduce(f)
  def onComplete[T](callback: Try[Done] => Unit): AkkaSink[T, NotUsed] = AkkaSink.onComplete(callback)
  def actorRef[T](ref: ActorRef, onCompleteMessage: Any): AkkaSink[T, NotUsed] =
    AkkaSink.actorRef(ref, onCompleteMessage)
  def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
    onFailureMessage: (Throwable) ? Any = Status.Failure): AkkaSink[T, NotUsed] =
    AkkaSink.actorRefWithAck(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
  def actorSubscriber[T](props: Props): AkkaSink[T, ActorRef] = AkkaSink.actorSubscriber(props)
  def queue[T](): AkkaSink[T, SinkQueueWithCancel[T]] = AkkaSink.queue[T]()
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:62,代码来源:Sink.scala


示例3: Builder

//设置package包名称以及导入依赖的类
package de.sciss.fscape.stream

import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{Graph, Inlet, Outlet, Shape}

object Builder {
  def apply()(implicit dsl: GraphDSL.Builder[NotUsed], ctrl: Control): Builder = new Impl(ctrl)

  private final class Impl(val control: Control)(implicit b: GraphDSL.Builder[NotUsed]) extends Builder {
    def add[S <: Shape](graph: Graph[S, _]): S = b.add(graph)

    def dsl: GraphDSL.Builder[NotUsed] = b

    def connect[A](out: Outlet[A], in: Inlet[A]): Unit = {
      import GraphDSL.Implicits._
      out ~> in
    }

    def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B] = {
      import GraphDSL.Implicits._
      out.map(fun).outlet
    }
  }
}
trait Builder {
  def control: Control

  def dsl: GraphDSL.Builder[NotUsed]

  def add[S <: Shape](graph: Graph[S, _]): S

  def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B]

  def connect[A](out: Outlet[A], in: Inlet[A]): Unit
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:37,代码来源:Builder.scala


示例4: AccountWebServerTest

//设置package包名称以及导入依赖的类
package com.knoldus.account

import akka.http.scaladsl.model.headers.{CustomHeader, Upgrade, UpgradeProtocol}
import akka.http.scaladsl.model.ws.{Message, UpgradeToWebSocket}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.stream.{FlowShape, Graph}
import org.scalatest.{Matchers, WordSpec}


class AccountWebServerTest extends WordSpec with Matchers with ScalatestRouteTest with WebServer {

  "User" should {
    "be able to create account" in {
      Get("/register?name=john") ~>
        Upgrade(List(UpgradeProtocol("websocket"))) ~> emulateHttpCore ~>
        route ~> check {
        status shouldEqual StatusCodes.SwitchingProtocols
      }
    }
  }

  private def emulateHttpCore(req: HttpRequest): HttpRequest =
    req.header[Upgrade] match {
      case Some(upgrade) if upgrade.hasWebSocket => req.copy(headers = req.headers :+ upgradeToWebsocketHeaderMock)
      case _ => req
    }

  private def upgradeToWebsocketHeaderMock: UpgradeToWebSocket =
    new CustomHeader() with UpgradeToWebSocket {
      override def requestedProtocols = Nil

      override def name = "dummy"

      override def value = "dummy"

      override def renderInRequests = true

      override def renderInResponses = true

      override def handleMessages(handlerFlow: Graph[FlowShape[Message, Message], Any], subprotocol: Option[String]): HttpResponse =
        HttpResponse(StatusCodes.SwitchingProtocols)
    }
} 
开发者ID:knoldus,项目名称:akka-http-websocket-microservices.g8,代码行数:45,代码来源:AccountWebServerTest.scala


示例5: HierarchicalProcessingFlows

//设置package包名称以及导入依赖的类
package com.mooneyserver.akkastreams.hierarchical.flows

import akka.NotUsed
import akka.stream.scaladsl.{Flow, GraphDSL, UnzipWith, ZipWith}
import akka.stream.{FanInShape3, FanOutShape3, FlowShape, Graph}
import com.mooneyserver.akkastreams.hierarchical.model.{Child, Grandparent, Parent}
import GraphDSL.Implicits._

object HierarchicalProcessingFlows {

  val handleGrandParent: Graph[FlowShape[Grandparent, Grandparent], NotUsed] =
    GraphDSL.create() { implicit builder =>
      val Input: FanOutShape3[Grandparent, Grandparent, Seq[Parent], Seq[Child]] = builder.add(UnzipWith(breakdownFamily))
      val GrandParent_Proc: FlowShape[Grandparent, Grandparent] = builder.add(Flow[Grandparent].map(processGrandParent))
      val Parent_Proc: FlowShape[Seq[Parent], Seq[Parent]] = builder.add(Flow[Seq[Parent]].map(processParents))
      val Child_Proc: FlowShape[Seq[Child], Seq[Child]] = builder.add(Flow[Seq[Child]].map(processChildren))
      val Rebuild_Grandparent: FanInShape3[Grandparent, Seq[Parent], Seq[Child], Grandparent] = builder.add(ZipWith[Grandparent, Seq[Parent], Seq[Child], Grandparent](recombineGrandParent))

      Input.out0 ~> GrandParent_Proc ~> Rebuild_Grandparent.in0
      Input.out1 ~> Parent_Proc ~> Rebuild_Grandparent.in1
      Input.out2 ~> Child_Proc ~> Rebuild_Grandparent.in2

      FlowShape(Input.in, Rebuild_Grandparent.out)
    }

  def breakdownFamily(grandparent: Grandparent): Tuple3[Grandparent, Seq[Parent], Seq[Child]] =
    (grandparent, grandparent.parents, grandparent.parents.flatMap(_.children))

  def processGrandParent(gp: Grandparent): Grandparent =
    gp.copy(name = s"${gp.name}-Modified")

  def processParents(p: Seq[Parent]): Seq[Parent] =
    for {parent <- p} yield parent.copy(name = s"${parent.name}-Modified")

  def processChildren(c: Seq[Child]): Seq[Child] =
    for {child <- c} yield child.copy(name = s"${child.name}-Modified")

  def recombineGrandParent(gp: Grandparent, p: Seq[Parent], c: Seq[Child]): Grandparent = {
    val parents = p map { parent =>
      val chillins = c filter { _.name.startsWith(parent.name.split("-Modified")(0))}
      parent.copy(children = chillins)
    }

    gp.copy(parents = parents)
  }
} 
开发者ID:irishshagua,项目名称:akka-stream-playground,代码行数:47,代码来源:HierarchicalProcessingFlows.scala


示例6: AlsoToErrorHandling

//设置package包名称以及导入依赖的类
package sandbox

import com.typesafe.config.ConfigFactory
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.stream.scaladsl._
import akka.stream.Graph
import akka.stream.SinkShape
import akka.stream.FlowShape
import scala.annotation.unchecked.uncheckedVariance

object AlsoToErrorHandling extends App {
  val config = ConfigFactory.empty

  implicit val system = ActorSystem("AlsoToErrorHandling",config)
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  try {
    val source =
      Source(0 to 15).via(Flow[Int].alsoTo(Sink.foreach(n => println(s"HttpRequest: $n"))))

    val kafkaSink =
      Flow[Int]
        .map(n => if(n == 10) throw new Exception() else n)
        .to(Sink.foreach(n => println(s"Published to Kafka: $n")))

//    val flow = Flow[Int].via(safeAlsoToGraph(kafkaSink))

    val flow = Flow[Int].alsoTo(kafkaSink)

    val httpSink = Sink.foreach((n:Int) => println(s"HttpResponse: $n"))

    val res = waitFor(source.via(flow).toMat(httpSink)(Keep.right).run)

    println(res)

  }
  finally {
    waitFor(system.terminate)
  }

  def safeAlsoToGraph[Out,M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
    GraphDSL.create(that) { implicit b ? r ?
      import GraphDSL.Implicits._
      val bcast = b.add(Broadcast[Out](2,true))
      bcast.out(1) ~> r
      FlowShape(bcast.in, bcast.out(0))
    }

  def waitFor[T](f: Future[T], d: Duration = 10.seconds): T = Await.result(f,d)

} 
开发者ID:jthompson-hiya,项目名称:akka-streams-sandbox,代码行数:57,代码来源:AlsoToErrorHandling.scala



注:本文中的akka.stream.Graph类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala JsonObject类代码示例发布时间:2022-05-23
下一篇:
Scala Specs2RouteTest类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap