• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala GraphDSL类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.scaladsl.GraphDSL的典型用法代码示例。如果您正苦于以下问题:Scala GraphDSL类的具体用法?Scala GraphDSL怎么用?Scala GraphDSL使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了GraphDSL类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: PartitionValidatedSpec

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream.cats

import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import cats.implicits._
import eu.svez.akka.stream.cats.Stages._

class PartitionValidatedSpec extends StageSpec {

  "PartitionValidated" should "partition a flow of Validation[E, A] in two flows of E and A" in new Test {
    val src = Source(List(
      1.valid[String],
      2.valid[String],
      "BOOM!".invalid[Int],
      3.valid[String],
      "BOOM 2!".invalid[Int]
    ))

    src.runWith(testSink)

    successProbe.request(3)
    failureProbe.request(2)

    successProbe.expectNext(1)
    successProbe.expectNext(2)
    successProbe.expectNext(3)
    failureProbe.expectNext("BOOM!")
    failureProbe.expectNext("BOOM 2!")
    successProbe.expectComplete()
    failureProbe.expectComplete()
  }

  trait Test {
    val failureProbe = TestSubscriber.probe[String]()
    val successProbe = TestSubscriber.probe[Int]()

    val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val valStage = builder.add(PartitionValidated[String, Int]())

      valStage.invalid ~> Sink.fromSubscriber(failureProbe)
      valStage.valid ~> Sink.fromSubscriber(successProbe)

      SinkShape(valStage.in)
    })
  }
} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:51,代码来源:PartitionValidatedSpec.scala


示例3: Stages

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream

import akka.NotUsed
import akka.stream.FanOutShape2
import akka.stream.scaladsl.{Flow, GraphDSL, Partition}

import scala.util.{Failure, Success, Try}

object Stages {

  object PartitionEither {
    def apply[A, B]() = GraphDSL.create[FanOutShape2[Either[A, B], A, B]]() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val left = builder.add(Flow[Either[A, B]].map (_.left.get))
      val right = builder.add(Flow[Either[A, B]].map (_.right.get))
      val partition = builder.add(Partition[Either[A, B]](2, _.fold(_ ? 0, _ ? 1)))

      partition ~> left
      partition ~> right

      new FanOutShape2[Either[A, B], A, B](partition.in, left.out, right.out)
    }
  }

  implicit class EitherShape[A, B](val shape: FanOutShape2[Either[A, B], A, B]) extends AnyVal {
    def left = shape.out0
    def right = shape.out1
  }

  object PartitionTry {
    def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ?
      import GraphDSL.Implicits._

      val success = builder.add(Flow[Try[T]].collect { case Success(a) ? a })
      val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ? t })
      val partition = builder.add(Partition[Try[T]](2, _.map(_ ? 1).getOrElse(0)))

      partition ~> failure
      partition ~> success

      new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
    }
  }

  implicit class TryShape[T](val shape: FanOutShape2[Try[T], Throwable, T]) extends AnyVal {
    def failure = shape.out0
    def success = shape.out1
  }

} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:52,代码来源:Stages.scala


示例4: PartitionEitherSpec

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream

import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import Stages._

class PartitionEitherSpec extends StageSpec {

  "PartitionEither" should "partition a flow of Either[A, B] into two flows of A and B" in new Test {
    val src = Source(List(
      Right(1),
      Right(2),
      Left("One"),
      Right(3),
      Left("Two")
    ))

    src.runWith(testSink)

    rightProbe.request(4)
    leftProbe.request(3)

    rightProbe.expectNext(1)
    rightProbe.expectNext(2)
    rightProbe.expectNext(3)
    leftProbe.expectNext("One")
    leftProbe.expectNext("Two")
    rightProbe.expectComplete()
    leftProbe.expectComplete()
  }

  trait Test {
    val leftProbe = TestSubscriber.probe[String]()
    val rightProbe = TestSubscriber.probe[Int]()

    val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val eitherStage = builder.add(PartitionEither[String, Int]())

      eitherStage.left ~> Sink.fromSubscriber(leftProbe)
      eitherStage.right ~> Sink.fromSubscriber(rightProbe)

      SinkShape(eitherStage.in)
    })
  }
} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:50,代码来源:PartitionEitherSpec.scala


示例5: FlowFromGraph

//设置package包名称以及导入依赖的类
package sample.graphDSL

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}


object FlowFromGraph {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("FlowFromGraph")
    implicit val ec = system.dispatcher
    implicit val materializer = ActorMaterializer()

    val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
    val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
    val listOfFlows = List(processorFlow1, processorFlow2)

    def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
      require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")

      Flow.fromGraph(GraphDSL.create() { implicit b =>
        import akka.stream.scaladsl.GraphDSL.Implicits._

        val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
        val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))

        indexFlows.foreach(broadcast ~> _ ~> merge)

        FlowShape(broadcast.in, merge.out)
      })
    }

    val compoundFlow = compoundFlowFrom(listOfFlows)

    Source(1 to 10)
      .via(compoundFlow)
      .runWith(Sink.foreach(println(_)))
      .onComplete(_ => system.terminate())
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala


示例6: stringPersister

//设置package包名称以及导入依赖的类
package services

import javax.inject.{Inject, Singleton}

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits._

import scala.concurrent.Future
import scala.util.{Failure, Success}


  def stringPersister(pf: String => Future[Unit]): Flow[String, String, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val persistenceSink = Sink.foreach[String] { content =>
        val f = pf(content)
        f.onComplete {
          case Success(u) => Logger.debug(s"Persisted content: '$content'")
          case Failure(t) => Logger.error(s"Failed to persist content: '$content", t)
        }
      }

      val bcast = builder.add(Broadcast[String](2))
      bcast.out(1) ~> persistenceSink

      FlowShape(bcast.in, bcast.out(0))
    })
} 
开发者ID:snackCake,项目名称:TweetStreamChallenge,代码行数:33,代码来源:PersistenceService.scala


示例7: StatisticsValidation

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL}
import org.hpi.esb.datavalidator.configuration.Config
import org.hpi.esb.datavalidator.data.{SimpleRecord, Statistics}
import org.hpi.esb.datavalidator.kafka.TopicHandler
import org.hpi.esb.datavalidator.validation.graphstage.{AccumulateWhileUnchanged, IgnoreLastElements, ZipWhileEitherAvailable}

class StatisticsValidation(inTopicHandler: TopicHandler,
                           outTopicHandler: TopicHandler, windowSize: Long,
                           materializer: ActorMaterializer)
  extends Validation[Statistics](inTopicHandler, outTopicHandler, materializer) {

  override val valueName = "Statistics"
  override val queryName = "Statistics Query"


  val collectByWindow = new AccumulateWhileUnchanged[SimpleRecord, Long](r => windowStart(r.timestamp))
  val calculateStatistics = Flow[Seq[SimpleRecord]].map(s =>
    s.foldLeft(new Statistics()())((stats, record) => stats.getUpdatedWithValue(record.timestamp, record.value)))


  def createSource(): Graph[SourceShape[(Option[Statistics], Option[Statistics])], NotUsed] = {

    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val zip = builder.add(ZipWhileEitherAvailable[Statistics]())
      val ignoreLastTwoElements = builder.add(new IgnoreLastElements[(Option[Statistics], Option[Statistics])](ignoreCount = 1))

      inTopicHandler.topicSource ~> take(inNumberOfMessages) ~> toSimpleRecords ~> collectByWindow ~> calculateStatistics ~> zip.in0
      outTopicHandler.topicSource ~> take(outNumberOfMessages) ~> toStatistics ~> zip.in1
      zip.out ~> ignoreLastTwoElements

      SourceShape(ignoreLastTwoElements.out)
    }
  }

  def windowStart(timestamp: Long): Long = {
    timestamp - (timestamp % windowSize)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:45,代码来源:StatisticsValidation.scala


示例8: IdentityValidation

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{ActorMaterializer, Graph, SourceShape}
import org.hpi.esb.datavalidator.configuration.Config
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.hpi.esb.datavalidator.kafka.TopicHandler
import org.hpi.esb.datavalidator.validation.graphstage.ZipWhileEitherAvailable

class IdentityValidation(inTopicHandler: TopicHandler,
                         outTopicHandler: TopicHandler,
                         materializer: ActorMaterializer)
  extends Validation[SimpleRecord](inTopicHandler, outTopicHandler, materializer) {

  override val valueName = "SimpleRecords"
  override val queryName = "Identity Query"

  def createSource(): Graph[SourceShape[(Option[SimpleRecord], Option[SimpleRecord])], NotUsed] = {

    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val zip = builder.add(ZipWhileEitherAvailable[SimpleRecord]())

      inTopicHandler.topicSource ~> take(inNumberOfMessages) ~> toSimpleRecords ~> zip.in0
      outTopicHandler.topicSource ~> take(outNumberOfMessages) ~> toSimpleRecords ~> zip.in1

      SourceShape(zip.out)
    }
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:33,代码来源:IdentityValidation.scala


示例9: PairsShape_

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{GraphDSL, Sink, Source, Zip}
import GraphDSL.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object PairsShape_ extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val pairs = Source.fromGraph(GraphDSL.create() { implicit builder =>
    val zip = builder.add(Zip[Int, Int]())

    def ints = Source.fromIterator(() => Iterator from(1))

    ints.filter(_ % 2 != 0) ~> zip.in0
    ints.filter(_ % 2 == 0) ~> zip.in1

    SourceShape(zip.out)
  })

  val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)
  val r = Await.result(firstPair, 300 millis)
  println(r)

  system.terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:32,代码来源:PairsShape_.scala


示例10: GraphBuilder

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source}

import scala.concurrent.Await
import scala.concurrent.duration._

object GraphBuilder extends App {
  implicit val system = ActorSystem("actor-system")
  implicit val materializer = ActorMaterializer()

  val topHeadSink = Sink.head[Int]
  val middleSink = Sink.head[Int]
  val bottomHeadSink = Sink.seq[Int]
  val sharedDoubler = Flow[Int].map(_ * 2)

  val results = RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, middleSink, bottomHeadSink)
  ((_, _, _)) { implicit builder =>
    (topHS, midHS, bottomHS) =>
      import GraphDSL.Implicits._
      val broadcast = builder.add(Broadcast[Int](2))
      val source = builder.add(Source(1 to 10))

      Source.repeat(2) ~> broadcast.in
      broadcast.out(0) ~> sharedDoubler ~> topHS.in
      broadcast.out(1) ~> sharedDoubler ~> midHS.in
      source ~> bottomHS.in

      ClosedShape
  }).run()

  val r1 = Await.result(results._1, 300 millis)
  val r2 = Await.result(results._2, 300 millis)
  val r3 = Await.result(results._3, 300 millis)

  println(r1, r2, r3)
  system.terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:41,代码来源:GraphBuilder.scala


示例11: MaterializedValue_

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, SourceShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import GraphDSL.Implicits._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object MaterializedValue_ extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(
    GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
      FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
    })
  val r = Source(1 to 10)
    .via(foldFlow)
    .runWith(Sink.head)
  println(Await.result(r, 200 millis))

  val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(
    GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
      Source(1 to 10) ~> fold
      SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
    })
  val r2 = cyclicFold
    .via(foldFlow)
    .runWith(Sink.head)
  println(Await.result(r2, 200 millis))

  system.terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:37,代码来源:MaterializedValue_.scala


示例12: HoeffdingTreeWithAlpakka

//设置package包名称以及导入依赖的类
import java.nio.file.FileSystems

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, ThrottleMode}
import akka.stream.scaladsl.{GraphDSL, Merge, RunnableGraph, Sink, Source}
import akka.stream.alpakka.file.scaladsl
import org.apache.spark.streamdm.core.ExampleParser
import org.apache.spark.streamdm.core.specification.SpecificationParser
import pl.gosub.akka.online.{HoeffdingTreeProcessor, LearnerQuery}

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration._


object HoeffdingTreeWithAlpakka extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val specParser = new SpecificationParser

  val arffPath = this.getClass.getResource("/elecNormNew.arff").getPath // add for Windows .replaceFirst("^/(.:/)", "$1")

  val exampleSpec = specParser.fromArff(arffPath)

  val fsPath = this.getClass.getResource("/elecNormData.txt").getPath // add for Windows .replaceFirst("^/(.:/)", "$1")
  println(fsPath)


  val fs = FileSystems.getDefault
  val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines(
    path = fs.getPath(fsPath),
    maxLineSize = 8192,
    pollingInterval = 250.millis
  )

//  if the lines below do not work, please make sure that you got the linefeed character right wrt your operating system (LF vs CRLF)
//  lines.map(line => LearnerQuery(line.split(";").apply(0), ExampleParser.fromArff(line.split(";").apply(1), exampleSpec)))
//    .runForeach(line => System.out.println(line))

  val masterControlProgram = RunnableGraph.fromGraph(GraphDSL.create(Sink.foreach(print)) { implicit builder =>
    outMatches =>
      import GraphDSL.Implicits._
      val taggedInput = lines.map(line => LearnerQuery(line.split(";").apply(0), ExampleParser.fromArff(line.split(";").apply(1), exampleSpec)))

      taggedInput.statefulMapConcat(() => {
        val proc = new HoeffdingTreeProcessor(exampleSpec)
        proc.process(_)
      }) ~> outMatches
      ClosedShape
  }).run()

  import scala.concurrent.ExecutionContext.Implicits.global

  masterControlProgram.onComplete(_ => system.terminate())
  Await.ready(system.whenTerminated, Duration.Inf)

} 
开发者ID:gosubpl,项目名称:akka-online,代码行数:60,代码来源:HoeffdingTreeWithAlpakka.scala


示例13: PartitionValidatedNelSpec

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream.cats

import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import cats.data.NonEmptyList
import cats.implicits._
import eu.svez.akka.stream.cats.Stages._

class PartitionValidatedNelSpec extends StageSpec {

  "PartitionValidatedNel" should "partition a flow of Validation[E, A] in two flows of E and A" in new Test {
    val src = Source(List(
      1.valid[NonEmptyList[String]],
      2.valid[NonEmptyList[String]],
      NonEmptyList.of("BOOM!", "KABOOM!").invalid[Int],
      3.valid[NonEmptyList[String]],
      NonEmptyList.of("BOOM 2!").invalid[Int]
    ))

    src.runWith(testSink)

    successProbe.request(3)
    failureProbe.request(3)

    successProbe.expectNext(1)
    successProbe.expectNext(2)
    successProbe.expectNext(3)
    failureProbe.expectNext("BOOM!")
    failureProbe.expectNext("KABOOM!")
    failureProbe.expectNext("BOOM 2!")
    successProbe.expectComplete()
    failureProbe.expectComplete()
  }

  trait Test {
    val failureProbe = TestSubscriber.probe[String]()
    val successProbe = TestSubscriber.probe[Int]()

    val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val valStage = builder.add(PartitionValidatedNel[String, Int]())

      valStage.invalid ~> Sink.fromSubscriber(failureProbe)
      valStage.valid ~> Sink.fromSubscriber(successProbe)

      SinkShape(valStage.in)
    })
  }
} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:53,代码来源:PartitionValidatedNelSpec.scala


示例14: PartitionIorSpec

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream.cats

import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import cats.data.Ior
import eu.svez.akka.stream.cats.Stages._

class PartitionIorSpec extends StageSpec {

  "PartitionIor" should "partition a flow of Ior[A, B] into two flows of A and B" in new Test {
    val src = Source(List(
      Ior.Right(1),
      Ior.Right(2),
      Ior.Left("One"),
      Ior.Right(3),
      Ior.Left("Two"),
      Ior.Both("Three", 4)
    ))

    src.runWith(testSink)

    rightProbe.request(4)
    leftProbe.request(3)

    rightProbe.expectNext(1)
    rightProbe.expectNext(2)
    rightProbe.expectNext(3)
    rightProbe.expectNext(4)
    leftProbe.expectNext("One")
    leftProbe.expectNext("Two")
    leftProbe.expectNext("Three")
    rightProbe.expectComplete()
    leftProbe.expectComplete()
  }

  trait Test {
    val leftProbe = TestSubscriber.probe[String]()
    val rightProbe = TestSubscriber.probe[Int]()

    val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val iorStage = builder.add(PartitionIor[String, Int]())

      iorStage.left ~> Sink.fromSubscriber(leftProbe)
      iorStage.right ~> Sink.fromSubscriber(rightProbe)

      SinkShape(iorStage.in)
    })
  }
} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:54,代码来源:PartitionIorSpec.scala


示例15: PartitionTrySpec

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream

import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import eu.svez.akka.stream.Stages._

import scala.util.{Failure, Success}

class PartitionTrySpec extends StageSpec {

  "PartitionTry" should "partition a flow of Try[T] into two flows of Throwable and T" in new Test {
    val src = Source(List(
      Success(1),
      Success(2),
      Failure(new IllegalArgumentException("error 1")),
      Success(3),
      Failure(new ArrayIndexOutOfBoundsException("error 2"))
    ))

    src.runWith(testSink)

    successProbe.request(4)
    failureProbe.request(3)

    successProbe.expectNext(1)
    successProbe.expectNext(2)
    successProbe.expectNext(3)

    val t1 = failureProbe.expectNext()
    t1 shouldBe an[IllegalArgumentException]
    t1 should have message "error 1"

    val t2 = failureProbe.expectNext()
    t2 shouldBe an[ArrayIndexOutOfBoundsException]
    t2 should have message "error 2"

    successProbe.expectComplete()
    failureProbe.expectComplete()
  }

  trait Test {
    val failureProbe = TestSubscriber.probe[Throwable]()
    val successProbe = TestSubscriber.probe[Int]()

    val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val tryStage = builder.add(PartitionTry[Int]())

      tryStage.failure ~> Sink.fromSubscriber(failureProbe)
      tryStage.success ~> Sink.fromSubscriber(successProbe)

      SinkShape(tryStage.in)
    })
  }
} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:59,代码来源:PartitionTrySpec.scala


示例16: TimedFlowOps

//设置package包名称以及导入依赖的类
package com.flipkart.connekt.busybees.streams.flows.profilers

import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{BidiFlow, Flow, GraphDSL}
import akka.stream.{BidiShape, FlowShape}
import com.flipkart.connekt.busybees.models.RequestTracker
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import com.flipkart.connekt.commons.metrics.Instrumented
import com.flipkart.connekt.commons.utils.StringUtils._

import scala.collection.JavaConverters._
import scala.util.Try

object TimedFlowOps {

  implicit class TimedFlow[I, O, T <: RequestTracker, M](dispatchFlow: Flow[(I, T), (Try[O], T), M]) extends Instrumented {

    val startTimes = new ConcurrentHashMap[T, Long]().asScala

    private def profilingShape(apiName: String) = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>

      val out = b.add(Flow[(I, T)].map {
        case (request, requestTracker) =>
          startTimes.put(requestTracker, System.currentTimeMillis())
          (request, requestTracker)
      })

      val in = b.add(Flow[(Try[O], T)].map {
        case (response, httpRequestTracker) =>
          startTimes.get(httpRequestTracker).map(start => {
            startTimes.remove(httpRequestTracker)
            val duration = System.currentTimeMillis() - start
            ConnektLogger(LogFile.PROCESSORS).trace(s"TimedFlowOps/$apiName MessageId: ${httpRequestTracker.messageId} took : $duration ms")
            duration
          }).foreach(registry.timer(getMetricName(apiName + Option(httpRequestTracker.provider).map("." + _).orEmpty)).update(_, TimeUnit.MILLISECONDS))

          (response, httpRequestTracker)
      })

      BidiShape.fromFlows(out, in)
    })

    def timedAs(apiName: String) = Flow.fromGraph(GraphDSL.create() { implicit b =>
      val s = b.add(profilingShape(apiName))
      val p = b.add(dispatchFlow)

      s.out1 ~> p ~> s.in2

      FlowShape(s.in1, s.out2)
    })
  }

} 
开发者ID:ayush03agarwal,项目名称:connekt,代码行数:56,代码来源:TimedFlowOps.scala


示例17: Builder

//设置package包名称以及导入依赖的类
package de.sciss.fscape.stream

import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{Graph, Inlet, Outlet, Shape}

object Builder {
  def apply()(implicit dsl: GraphDSL.Builder[NotUsed], ctrl: Control): Builder = new Impl(ctrl)

  private final class Impl(val control: Control)(implicit b: GraphDSL.Builder[NotUsed]) extends Builder {
    def add[S <: Shape](graph: Graph[S, _]): S = b.add(graph)

    def dsl: GraphDSL.Builder[NotUsed] = b

    def connect[A](out: Outlet[A], in: Inlet[A]): Unit = {
      import GraphDSL.Implicits._
      out ~> in
    }

    def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B] = {
      import GraphDSL.Implicits._
      out.map(fun).outlet
    }
  }
}
trait Builder {
  def control: Control

  def dsl: GraphDSL.Builder[NotUsed]

  def add[S <: Shape](graph: Graph[S, _]): S

  def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B]

  def connect[A](out: Outlet[A], in: Inlet[A]): Unit
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:37,代码来源:Builder.scala


示例18: InI

//设置package包名称以及导入依赖的类
package de.sciss.fscape

import akka.NotUsed
import akka.stream.scaladsl.{FlowOps, GraphDSL}
import akka.stream.{Inlet, Outlet}

package object stream {
  type Signal[A] = FlowOps[A, NotUsed]

//  // to-do: `unfold` is unnecessarily inefficient because of producing `Option[Int]`.
//  implicit def constIntSignal   (i: Int   ): Signal[Int]    = Source.repeat(i) // or better `single`?
//  implicit def constDoubleSignal(d: Double): Signal[Double] = Source.repeat(d) // or better `single`?

  type InI                     = Inlet[BufI]
  type InD                     = Inlet[BufD]
  type InL                     = Inlet[BufL]
  type InA                     = Inlet[BufLike]

  @inline
  def  InI(name: String): InI = Inlet[BufI](name)
  @inline
  def  InD(name: String): InD = Inlet[BufD](name)
  @inline
  def  InL(name: String): InL = Inlet[BufL](name)
  @inline
  def  InA(name: String): InA = Inlet[BufLike](name)

  type BufElem[A]               = BufLike { type Elem = A }

  type OutI                     = Outlet[BufI]
  type OutD                     = Outlet[BufD]
  type OutL                     = Outlet[BufL]
  type OutA                     = Outlet[BufLike]
  type OutElem[A]               = Outlet[BufElem[A]]

  @inline
  def  OutI(name: String): OutI = Outlet[BufI](name)
  @inline
  def  OutD(name: String): OutD = Outlet[BufD](name)
  @inline
  def  OutL(name: String): OutL = Outlet[BufL](name)

  type GBuilder = GraphDSL.Builder[NotUsed]

  type StreamInElem[A1, Buf1 >: Null <: BufElem[A1]] = StreamIn { type A = A1; type Buf = Buf1 }
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:47,代码来源:package.scala


示例19: DebugSink

//设置package包名称以及导入依赖的类
package de.sciss.fscape.graph

import akka.stream.scaladsl.{GraphDSL, Sink}
import de.sciss.fscape.UGenSource.unwrap
import de.sciss.fscape.stream.StreamIn
import de.sciss.fscape.{GE, UGen, UGenGraph, UGenIn, UGenSource, stream}

import scala.collection.immutable.{IndexedSeq => Vec}


case class DebugSink(in: GE) extends UGenSource.ZeroOut {
  protected def makeUGens(implicit b: UGenGraph.Builder): Unit =
    unwrap(this, Vector(in.expand))

  protected def makeUGen(args: Vec[UGenIn])(implicit b: UGenGraph.Builder): Unit =
    UGen.ZeroOut(this, inputs = args)

  private[fscape] def makeStream(args: Vec[StreamIn])(implicit b: stream.Builder): Unit = {
    val Vec(in) = args
    val peer = in.toAny
    implicit val dsl = b.dsl
    import GraphDSL.Implicits._
    peer ~> Sink.ignore
  }
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:26,代码来源:DebugSink.scala


示例20: WorkingWithGraphsApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
import scala.concurrent.duration._
import scala.util.Random


object WorkingWithGraphsApplication extends App {

  implicit val actorSystem = ActorSystem("WorkingWithGraphs")
  implicit val actorMaterializer = ActorMaterializer()

  trait MobileMsg {
    def id = Random.nextInt(1000)
    def toGenMsg(origin: String) = GenericMsg(id, origin)
  }
  class AndroidMsg extends MobileMsg
  class IosMsg extends MobileMsg
  case class GenericMsg(id: Int, origin: String)

  val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    //Sources
    val androidNotification = Source.tick(2 seconds, 500 millis, new AndroidMsg)
    val iOSNotification = Source.tick(700 millis, 600 millis, new IosMsg)

    //Flow
    val groupAndroid = Flow[AndroidMsg].map(_.toGenMsg("ANDROID")).groupedWithin(5, 5 seconds).async
    val groupIos = Flow[IosMsg].map(_.toGenMsg("IOS")).groupedWithin(5, 5 seconds).async
    def counter = Flow[Seq[GenericMsg]].via(new StatefulCounterFlow())
    def mapper = Flow[Seq[GenericMsg]].mapConcat(_.toList)

    //Junctions
    val aBroadcast = builder.add(Broadcast[Seq[GenericMsg]](2))
    val iBroadcast = builder.add(Broadcast[Seq[GenericMsg]](2))
    val balancer = builder.add(Balance[Seq[GenericMsg]](2))
    val notitificationMerge = builder.add(Merge[Seq[GenericMsg]](2))
    val genericNotitificationMerge = builder.add(Merge[GenericMsg](2))

    def counterSink(s: String) = Sink.foreach[Int](x => println(s"$s: [$x]"))

    //Graph
    androidNotification ~> groupAndroid ~> aBroadcast ~> counter ~> counterSink("Android")
                                           aBroadcast ~> notitificationMerge
                                           iBroadcast ~> notitificationMerge
    iOSNotification     ~> groupIos     ~> iBroadcast ~> counter ~> counterSink("Ios")

    notitificationMerge ~> balancer ~> mapper.async ~> genericNotitificationMerge
                           balancer ~> mapper.async ~> genericNotitificationMerge

    genericNotitificationMerge ~> Sink.foreach(println)

    ClosedShape
  })

  graph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:61,代码来源:WorkingWithGraphsApplication.scala



注:本文中的akka.stream.scaladsl.GraphDSL类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala CategoricalSplit类代码示例发布时间:2022-05-23
下一篇:
Scala InternalNode类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap