Scala FlowShape类代码示例

本文整理汇总了Scala中akka.stream.FlowShape的典型用法代码示例。如果您正苦于以下问题:Scala FlowShape类的具体用法?Scala FlowShape怎么用?Scala FlowShape使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。


示例1: FlowFromGraph

package sample.graphDSL

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}

object FlowFromGraph {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("FlowFromGraph")
    implicit val ec = system.dispatcher
    implicit val materializer = ActorMaterializer()

    val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
    val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
    val listOfFlows = List(processorFlow1, processorFlow2)

    def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
      require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")

      Flow.fromGraph(GraphDSL.create() { implicit b =>
        import akka.stream.scaladsl.GraphDSL.Implicits._

        val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
        val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))

        indexFlows.foreach(broadcast ~> _ ~> merge)

        FlowShape(broadcast.in, merge.out)

    val compoundFlow = compoundFlowFrom(listOfFlows)

    Source(1 to 10)
      .onComplete(_ => system.terminate())

示例2: every

package frontend

import akka.stream.{FlowShape, Attributes}
import akka.stream.scaladsl._
import scala.concurrent.duration.{Duration, FiniteDuration}

trait StreamSupport {

  def every[T](interval: FiniteDuration): Flow[T, T, akka.NotUsed] =
      GraphDSL.create() { implicit b ?
        import GraphDSL.Implicits._
        val zip = b.add(ZipWith[T, Unit, T](Keep.left).withAttributes(Attributes.inputBuffer(1, 1)))
        val dropOne = b.add(Flow[T].drop(1))
        Source.tick(Duration.Zero, interval, ()) ~> zip.in1
        zip.out ~> dropOne.in
        FlowShape(zip.in0, dropOne.outlet)

  def responseWindow(duration: FiniteDuration): Flow[play.api.libs.json.JsArray, play.api.libs.json.JsArray, akka.NotUsed] =
    (Flow[play.api.libs.json.JsArray].conflate((array, _) ? array)
      .zipWith(Source.tick(duration, duration, ()))(Keep.left))
      .scan(play.api.libs.json.JsArray(Seq.empty[play.api.libs.json.JsValue]))((_, stats) => stats)
      .withAttributes(Attributes.inputBuffer(1, 1))

示例3: MemoryBuffer

package akka.stream.alpakka.s3.impl

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString

private[alpakka] final class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
  val in = Inlet[ByteString]("MemoryBuffer.in")
  val out = Outlet[Chunk]("MemoryBuffer.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      var buffer = ByteString.empty
      override def onPull(): Unit = if (isClosed(in)) emit() else pull(in)

      override def onPush(): Unit = {
        val elem = grab(in)
        if (buffer.size + elem.size > maxSize) {
          failStage(new IllegalStateException("Buffer size of " + maxSize + " bytes exceeded."))
        } else {
          buffer ++= elem

      override def onUpstreamFinish(): Unit = {
        if (isAvailable(out)) emit()

      def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
      setHandlers(in, out, this)


示例4: CsvParsingStage

package akka.stream.alpakka.csv

import akka.event.Logging
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

import scala.annotation.tailrec
import scala.util.control.NonFatal

private[csv] class CsvParsingStage(delimiter: Byte, quoteChar: Byte, escapeChar: Byte)
    extends GraphStage[FlowShape[ByteString, List[ByteString]]] {

  private val in = Inlet[ByteString](Logging.simpleName(this) + ".in")
  private val out = Outlet[List[ByteString]](Logging.simpleName(this) + ".out")
  override val shape = FlowShape(in, out)

  override protected def initialAttributes: Attributes = Attributes.name("CsvParsing")

  override def createLogic(inheritedAttributes: Attributes) =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      private[this] val buffer = new CsvParser(delimiter, quoteChar, escapeChar)

      setHandlers(in, out, this)

      override def onPush(): Unit = {

      override def onPull(): Unit =

      override def onUpstreamFinish(): Unit = {

      private def tryPollBuffer() =
        try buffer.poll(requireLineEnd = true) match {
          case Some(csvLine) ? push(out, csvLine)
          case _ ?
            if (isClosed(in)) {
            } else pull(in)
        } catch {
          case NonFatal(ex) ? failStage(ex)

      @tailrec private def emitRemaining(): Unit =
        buffer.poll(requireLineEnd = false) match {
          case Some(csvLine) ?
            emit(out, csvLine)
          case _ ?


示例5: CsvToMapStage

package akka.stream.alpakka.csv

import java.nio.charset.Charset

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

import scala.collection.immutable

private[csv] class CsvToMapStage(columnNames: Option[immutable.Seq[String]], charset: Charset)
    extends GraphStage[FlowShape[immutable.Seq[ByteString], Map[String, ByteString]]] {

  override protected def initialAttributes: Attributes = Attributes.name("CsvToMap")

  private val in = Inlet[immutable.Seq[ByteString]]("CsvToMap.in")
  private val out = Outlet[Map[String, ByteString]]("CsvToMap.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      private var headers = columnNames

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        val elem = grab(in)
        if (headers.isDefined) {
          val map = headers.get.zip(elem).toMap
          push(out, map)
        } else {
          headers = Some(elem.map(_.decodeString(charset)))

      override def onPull(): Unit = pull(in)

示例6: IgnoreLastElements

package org.hpi.esb.datavalidator.validation.graphstage

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.collection.mutable

final class IgnoreLastElements[E](ignoreCount: Int)
  extends GraphStage[FlowShape[E, E]] {

  val in = Inlet[E]("IgnoreLastElement.in")
  val out = Outlet[E]("IgnoreLastElement.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    var isBuffered = false
    val buffer: mutable.Queue[E] = new mutable.Queue[E]

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {

        if(buffer.size == ignoreCount + 1) {
          push(out, buffer.dequeue())
        } else {
          // As long as the buffer is not full, nothing will be sent downstream.
          // As a result the downstream component will not call 'onPull' and we have
          // to manually pull upstream

      override def onPull(): Unit = {

      override def onUpstreamFinish(): Unit = {

示例7: AccumulateWhileUnchanged

package org.hpi.esb.datavalidator.validation.graphstage

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.collection.immutable

final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

  val in = Inlet[E]("AccumulateWhileUnchanged.in")
  val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    private var currentState: Option[P] = None
    private val buffer = Vector.newBuilder[E]

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (currentState.isEmpty || currentState.contains(nextState)) {
          buffer += nextElement
        } else {
          val result = buffer.result()
          buffer += nextElement
          push(out, result)

        currentState = Some(nextState)

      override def onPull(): Unit = {

      override def onUpstreamFinish(): Unit = {
        val result = buffer.result()
        if (result.nonEmpty) {
          emit(out, result)

    override def postStop(): Unit = {

示例8: ByteStringToDeltaStage

package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}
import scodec.DecodeResult
import scodec.bits.BitVector

import scala.annotation.tailrec

class ByteStringToDeltaStage extends GraphStage[FlowShape[ByteString, FactorizedDeltaTick]] {

  val in = Inlet[ByteString]("ByteStringToDeltaStage.in")
  val out = Outlet[FactorizedDeltaTick]("ByteStringToDeltaStage.out")

  def shape: FlowShape[ByteString, FactorizedDeltaTick] = FlowShape(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
      setHandler(out, this)

      val inHandler = new InHandler {

        def decodeAllFromBits(bits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {

          def compute(results: Vector[NonNegativeFactorizedDeltaTick], remainingBits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {
            NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV.decode(remainingBits) match {
              case Successful(DecodeResult(value, BitVector.empty)) =>
                (results :+ value, BitVector.empty)
              case Successful(DecodeResult(value, remainder)) if remainder.sizeGreaterThan(25) =>
                compute(results :+ value, remainder)
              case Successful(DecodeResult(value, remainder)) =>
                (results :+ value, remainder)
              case Failure(e) =>
                println("e = " + e)
                (results, BitVector.empty)

          compute(Vector.empty, bits)

        private[this] var remainingBits = BitVector.empty

        def onPush(): Unit = {
          val bits = BitVector.view(grab(in).asByteBuffer)
          val (results, rest) = decodeAllFromBits(remainingBits ++ bits)
          emitMultiple(out, results.map(_.withNegatives))
          remainingBits = rest

      setHandler(in, inHandler)


示例9: DeltaToByteStringStage

package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}

class DeltaToByteStringStage extends GraphStage[FlowShape[FactorizedDeltaTick, ByteString]] {

  val in = Inlet[FactorizedDeltaTick]("DeltaToByteStringStage.in")
  val out = Outlet[ByteString]("DeltaToByteStringStage.out")

  def shape: FlowShape[FactorizedDeltaTick, ByteString] = FlowShape(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
      setHandler(out, this)

      def onPush(): Unit = {
        val delta = grab(in)
        nonNegFactorizedDeltaTickCodecV.encode(delta.nonNegative) match {
          case Successful(x) => emit(out, ByteString(x.toByteBuffer))
          case Failure(err) => fail(out, new Exception(err.messageWithContext))
      setHandler(in, this)


示例10: TickToDeltaStage

package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}

class TickToDeltaStage
    extends GraphStage[FlowShape[Tick, FactorizedDeltaTick]] {

  val in = Inlet[Tick]("FactorizeDeltaTickStage.in")
  val out = Outlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.out")

  def shape: FlowShape[Tick, FactorizedDeltaTick] = FlowShape(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
      setHandler(out, this)

      val firstTickInHandler: InHandler = new InHandler {
        def onPush(): Unit = {
          val tick = grab(in)
          val factorizedTick: FactorizedTick = tick.factorize
          emit(out, FactorizedDeltaTick(factorizedTick.time, factorizedTick.bid, factorizedTick.ask))
          setHandler(in, withPrevInHandler(tick))

      def withPrevInHandler(initialTick: Tick): InHandler = new InHandler {

        private[this] var prevTick = initialTick

        def onPush(): Unit = {
          val tick = grab(in)
          emit(out, tick.factorize.deltaTo(prevTick.factorize))
          prevTick = tick
          setHandler(in, withPrevInHandler(tick))

      setHandler(in, firstTickInHandler)

示例11: DeltaToTickStage

package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}

class DeltaToTickStage
  extends GraphStage[FlowShape[FactorizedDeltaTick, Tick]] {

    val in = Inlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.in")
    val out = Outlet[Tick]("FactorizeDeltaTickStage.out")

    def shape: FlowShape[FactorizedDeltaTick, Tick] = FlowShape(in, out)

    def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) with OutHandler {

        def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
        setHandler(out, this)

        val firstDeltaInHandler: InHandler = new InHandler {
          def onPush(): Unit = {
            val delta = grab(in)
            println("delta = " + delta)
            val newFactorizedTick: FactorizedTick = Tick(0L, 0, 0).factorize.withDelta(delta)
            emit(out, newFactorizedTick.normalize)
            setHandler(in, withPrevInHandler(newFactorizedTick))

        def withPrevInHandler(initialFactorizedTick: FactorizedTick): InHandler = new InHandler {

          private[this] var prevFactorizedTick = initialFactorizedTick

          def onPush(): Unit = {
            val delta = grab(in)
            val newFactorizedTick: FactorizedTick = prevFactorizedTick.withDelta(delta)
            emit(out, newFactorizedTick.normalize)
            prevFactorizedTick = newFactorizedTick

        setHandler(in, firstDeltaInHandler)

示例12: FileCipher

package utils.streams

import javax.crypto._

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import utils.Utils

class FileCipher(cipher: Cipher) extends GraphStage[FlowShape[ByteString, ByteString]] {
  val in = Inlet[ByteString]("FileCipher.in")
  val out = Outlet[ByteString]("FileCipher.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val bytes = grab(in)

          if (bytes.isEmpty)
            pull(in) // Should not happen, request more bytes
          else {
            val ciphered = ByteString(cipher.update(bytes.toArray))

            // En/Decryption (e.g. AES with CBC) will work with blocks, if the block is not completed
            // the cipher will return an empty ByteString. We won't send it because it mess up with
            // chucked encoding, so we pull to complete our block and push it when ready
              push(out, ciphered)

        override def onUpstreamFinish(): Unit = {
          val bs = ByteString(cipher.doFinal())
          if (bs.nonEmpty)
            emit(out, bs) // Complete if necessary

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {

object FileCipher {
  def apply(cipher: Cipher): FileCipher = new FileCipher(cipher)

示例13: PairUpWithToString_

package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{FlowShape, ActorMaterializer}
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration._

object PairUpWithToString_ extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val pairUpWithToString =
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val broadcast = b.add(Broadcast[Int](2))
      val zip = b.add(Zip[Int, String]())

      broadcast.out(0).map(identity) ~> zip.in0
      broadcast.out(1).map(_.toString) ~> zip.in1

      FlowShape(broadcast.in, zip.out)

  val pair = pairUpWithToString.runWith(Source(List(1)), Sink.head)
  val result = Await.result(pair._2, 300 millis)


示例14: MaterializedValue_

package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, SourceShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import GraphDSL.Implicits._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object MaterializedValue_ extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(
    GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
      FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
  val r = Source(1 to 10)
  println(Await.result(r, 200 millis))

  val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(
    GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
      Source(1 to 10) ~> fold
  val r2 = cyclicFold
  println(Await.result(r2, 200 millis))


示例15: Encryptor

package streams

import java.security.Key
import java.security.spec.AlgorithmParameterSpec
import javax.crypto.Cipher

import akka.stream.{Outlet, Inlet, Attributes, FlowShape}
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage}
import akka.util.ByteString

class Encryptor(cipher: Cipher)
  extends GraphStage[FlowShape[ByteString, ByteString]]{

  val in = Inlet[ByteString]("Encryptor.in")
  val out = Outlet[ByteString]("Encryptor.out")

  def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val chunk = grab(in).toArray
        val encrypted = cipher.update(chunk)
        emit(out, ByteString(encrypted))


      override def onUpstreamFinish(): Unit = {
        val last = cipher.doFinal()
        emit(out, ByteString(last))

    setHandler(out, new OutHandler {
      override def onPull(): Unit =  pull(in)

示例16: RepeatNTimes

package scaladays.akka.stream

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

final case class RepeatNTimes[T](n: Int) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("RepeatNTimes.in")
  val out = Outlet[T]("RepeatNTimes.out")

  override def shape: FlowShape[T, T] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    var element: T = null.asInstanceOf[T]
    var remainToBePushed: Int = 0
    var terminating: Boolean = false
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        element = grab(in)
        if (remainToBePushed > 0) pushElement()

      override def onUpstreamFinish(): Unit = {
        terminating = true
        if (remainToBePushed == 0) completeStage()

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if (element != null) {
        } else {
          remainToBePushed = n

    private def completeOrReset(): Unit = {
      if (remainToBePushed == 0)
        if (terminating) completeStage()
        else element = null.asInstanceOf[T]

    private def pushElement(): Unit = {
      push(out, element)
      remainToBePushed -= 1


示例17: FrameChunker

package fr.xebia.streams.video

import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage._
import akka.util.ByteString
import fr.xebia.streams.transform.Implicits

class FrameChunker(val beginOfFrame: ByteString, val endOfFrame: ByteString) extends GraphStage[FlowShape[ByteString, ByteString]] {
  val in = Inlet[ByteString]("Chunker.in")
  val out = Outlet[ByteString]("Chunker.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var buffer = ByteString.empty

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if (isClosed(in)) emitChunk()
        else pull(in)
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val elem = grab(in)
        buffer ++= elem

      override def onUpstreamFinish(): Unit = {
        if (buffer.isEmpty) completeStage()
        // elements left in buffer, keep accepting downstream pulls
        // and push from buffer until buffer is emitted

    private def emitChunk(): Unit = {
      if (buffer.isEmpty) {
        if (isClosed(in)) completeStage()
        else pull(in)
      } else {
        import Implicits._
        buffer.sliceChunk(beginOfFrame, endOfFrame) match {
          case Some((chunk, remaining)) =>
            buffer = remaining
            push(out, chunk)

          case None =>


示例18: AccumulateWhileUnchanged

package sample.blog.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.collection.immutable

final class AccumulateWhileUnchanged[E, P](propertyExtractor: E ? P) extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
  val in = Inlet[E]("in")
  val out = Outlet[immutable.Seq[E]]("out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) =
    new GraphStageLogic(shape) {
      private var currentState: Option[P] = None
      private val buffer = Vector.newBuilder[E]

      setHandlers(in, out, new InHandler with OutHandler {
        override def onPush(): Unit = {
          val nextElement = grab(in)
          val nextState = propertyExtractor(nextElement)

          if (currentState.isEmpty || currentState.contains(nextState)) {
            buffer += nextElement
          } else {
            val result = buffer.result
            buffer += nextElement
            push(out, result)

          currentState = Some(nextState)

        override def onPull(): Unit = pull(in)

        override def onUpstreamFinish(): Unit = {
          val result = buffer.result
          if (result.nonEmpty) {
            emit(out, result)

      override def postStop(): Unit = {

示例19: TimedFlowOps

package com.flipkart.connekt.busybees.streams.flows.profilers

import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{BidiFlow, Flow, GraphDSL}
import akka.stream.{BidiShape, FlowShape}
import com.flipkart.connekt.busybees.models.RequestTracker
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import com.flipkart.connekt.commons.metrics.Instrumented
import com.flipkart.connekt.commons.utils.StringUtils._

import scala.collection.JavaConverters._
import scala.util.Try

object TimedFlowOps {

  implicit class TimedFlow[I, O, T <: RequestTracker, M](dispatchFlow: Flow[(I, T), (Try[O], T), M]) extends Instrumented {

    val startTimes = new ConcurrentHashMap[T, Long]().asScala

    private def profilingShape(apiName: String) = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>

      val out = b.add(Flow[(I, T)].map {
        case (request, requestTracker) =>
          startTimes.put(requestTracker, System.currentTimeMillis())
          (request, requestTracker)

      val in = b.add(Flow[(Try[O], T)].map {
        case (response, httpRequestTracker) =>
          startTimes.get(httpRequestTracker).map(start => {
            val duration = System.currentTimeMillis() - start
            ConnektLogger(LogFile.PROCESSORS).trace(s"TimedFlowOps/$apiName MessageId: ${httpRequestTracker.messageId} took : $duration ms")
          }).foreach(registry.timer(getMetricName(apiName + Option(httpRequestTracker.provider).map("." + _).orEmpty)).update(_, TimeUnit.MILLISECONDS))

          (response, httpRequestTracker)

      BidiShape.fromFlows(out, in)

    def timedAs(apiName: String) = Flow.fromGraph(GraphDSL.create() { implicit b =>
      val s = b.add(profilingShape(apiName))
      val p = b.add(dispatchFlow)

      s.out1 ~> p ~> s.in2

      FlowShape(s.in1, s.out2)


示例20: Timer

package de.sciss.fscape
package stream

import akka.stream.{Attributes, FlowShape}
import de.sciss.fscape.stream.impl.{FilterChunkImpl, FilterIn1LImpl, StageImpl, NodeImpl}

object Timer {
  def apply(trig: OutI)(implicit b: Builder): OutL = {
    val stage0  = new Stage
    val stage   = b.add(stage0)
    b.connect(trig, stage.in)

  private final val name = "Timer"

  private type Shape = FlowShape[BufI, BufL]

  private final class Stage(implicit ctrl: Control) extends StageImpl[Shape](name) {
    val shape = new FlowShape(
      in  = InI (s"$name.trig"),
      out = OutL(s"$name.out" )

    def createLogic(attr: Attributes) = new Logic(shape)

  private final class Logic(shape: Shape)(implicit ctrl: Control)
    extends NodeImpl(name, shape)
      with FilterIn1LImpl[BufI]
      with FilterChunkImpl[BufI, BufL, Shape] {

    private[this] var high      = false
    private[this] var count     = 0L

    protected def processChunk(inOff: Int, outOff: Int, len: Int): Unit = {
      val b0      = bufIn0.buf
      val out     = bufOut0.buf
      var h0      = high
      var h1      = false
      var c0      = count
      var inOffI  = inOff
      var outOffI = outOff
      val stop0   = inOff + len
      while (inOffI < stop0) {
        h1 = b0(inOffI) > 0
        if (h1 && !h0) {
          // println(s"RESET FROM $c0")
          c0 = 0L
        out(outOffI) = c0
        inOffI  += 1
        outOffI += 1
        c0      += 1
        h0       = h1
      high  = h0
      count = c0









