• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Outlet类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.Outlet的典型用法代码示例。如果您正苦于以下问题:Scala Outlet类的具体用法?Scala Outlet怎么用?Scala Outlet使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Outlet类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: GridfsSource

//设置package包名称以及导入依赖的类
import java.nio.ByteBuffer

import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import org.mongodb.scala.gridfs.GridFSDownloadStream

import scala.concurrent.{ExecutionContext, Future}




case class GridfsSource(stream: GridFSDownloadStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Future[ByteBuffer]]] {
  val out: Outlet[Future[ByteBuffer]] = Outlet("File Stream")
  
  override def shape: SourceShape[Future[ByteBuffer]] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        val buffer = ByteBuffer.allocate(chunkSize)
        val loaded = stream.read(buffer).toFuture().map(_ => buffer)
        push(out, loaded)
      }

      override def onDownstreamFinish(): Unit = {
        super.onDownstreamFinish()
        stream.close()
        complete(out)
      }
    })
  }
}

object GridfsSource {
  def apply(stream: GridFSDownloadStream)(implicit ec: ExecutionContext): Source[ByteString, NotUsed] = {
    Source.fromGraph(GridfsSource(stream, 512 * 1024)).mapAsync(1)(fb => fb.map { buffer => buffer.flip(); ByteString(buffer) }).takeWhile(_.nonEmpty)
  }
} 
开发者ID:zhijun,项目名称:GridFSSource,代码行数:42,代码来源:GridFSSource.scala


示例2: MemoryBuffer

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString


private[alpakka] final class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
  val in = Inlet[ByteString]("MemoryBuffer.in")
  val out = Outlet[Chunk]("MemoryBuffer.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      var buffer = ByteString.empty
      override def onPull(): Unit = if (isClosed(in)) emit() else pull(in)

      override def onPush(): Unit = {
        val elem = grab(in)
        if (buffer.size + elem.size > maxSize) {
          failStage(new IllegalStateException("Buffer size of " + maxSize + " bytes exceeded."))
        } else {
          buffer ++= elem
          pull(in)
        }
      }

      override def onUpstreamFinish(): Unit = {
        if (isAvailable(out)) emit()
        completeStage()
      }

      def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
      setHandlers(in, out, this)
    }

} 
开发者ID:akka,项目名称:alpakka,代码行数:39,代码来源:MemoryBuffer.scala


示例3: CsvParsingStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv

import akka.event.Logging
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

import scala.annotation.tailrec
import scala.util.control.NonFatal


private[csv] class CsvParsingStage(delimiter: Byte, quoteChar: Byte, escapeChar: Byte)
    extends GraphStage[FlowShape[ByteString, List[ByteString]]] {

  private val in = Inlet[ByteString](Logging.simpleName(this) + ".in")
  private val out = Outlet[List[ByteString]](Logging.simpleName(this) + ".out")
  override val shape = FlowShape(in, out)

  override protected def initialAttributes: Attributes = Attributes.name("CsvParsing")

  override def createLogic(inheritedAttributes: Attributes) =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      private[this] val buffer = new CsvParser(delimiter, quoteChar, escapeChar)

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        buffer.offer(grab(in))
        tryPollBuffer()
      }

      override def onPull(): Unit =
        tryPollBuffer()

      override def onUpstreamFinish(): Unit = {
        emitRemaining()
        completeStage()
      }

      private def tryPollBuffer() =
        try buffer.poll(requireLineEnd = true) match {
          case Some(csvLine) ? push(out, csvLine)
          case _ ?
            if (isClosed(in)) {
              emitRemaining()
              completeStage()
            } else pull(in)
        } catch {
          case NonFatal(ex) ? failStage(ex)
        }

      @tailrec private def emitRemaining(): Unit =
        buffer.poll(requireLineEnd = false) match {
          case Some(csvLine) ?
            emit(out, csvLine)
            emitRemaining()
          case _ ?
        }

    }
} 
开发者ID:akka,项目名称:alpakka,代码行数:62,代码来源:CsvParsingStage.scala


示例4: CsvToMapStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv

import java.nio.charset.Charset

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

import scala.collection.immutable


private[csv] class CsvToMapStage(columnNames: Option[immutable.Seq[String]], charset: Charset)
    extends GraphStage[FlowShape[immutable.Seq[ByteString], Map[String, ByteString]]] {

  override protected def initialAttributes: Attributes = Attributes.name("CsvToMap")

  private val in = Inlet[immutable.Seq[ByteString]]("CsvToMap.in")
  private val out = Outlet[Map[String, ByteString]]("CsvToMap.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      private var headers = columnNames

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        val elem = grab(in)
        if (headers.isDefined) {
          val map = headers.get.zip(elem).toMap
          push(out, map)
        } else {
          headers = Some(elem.map(_.decodeString(charset)))
          pull(in)
        }
      }

      override def onPull(): Unit = pull(in)
    }
} 
开发者ID:akka,项目名称:alpakka,代码行数:41,代码来源:CsvToMapStage.scala


示例5: GeodeContinuousSourceStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache

import scala.concurrent.{Future, Promise}

class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String)
    extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {

  override protected def initialAttributes: Attributes =
    Attributes
      .name("GeodeContinuousSource")
      .and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

  val out = Outlet[V](s"geode.continuousSource")

  override def shape: SourceShape[V] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val subPromise = Promise[Done]

    (new GeodeCQueryGraphLogic[V](shape, cache, name, sql) {

      override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
        subPromise.success(Done)
      }

      val onElement: AsyncCallback[V] = getAsyncCallback[V] { element =>
        if (isAvailable(out)) {
          pushElement(out, element)
        } else
          enqueue(element)
        handleTerminaison()
      }

      //
      // This handler, will first forward initial (old) result, then new ones (continuous).
      //
      setHandler(
        out,
        new OutHandler {
          override def onPull() = {
            if (initialResultsIterator.hasNext)
              push(out, initialResultsIterator.next())
            else
              dequeue() foreach { e =>
                pushElement(out, e)
              }
            handleTerminaison()
          }
        }
      )

    }, subPromise.future)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:60,代码来源:GeodeContinuousSourceStage.scala


示例6: GeodeFiniteSourceStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache

import scala.concurrent.{Future, Promise}

class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String)
    extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {

  override protected def initialAttributes: Attributes =
    Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

  val out = Outlet[V]("geode.finiteSource")

  override def shape: SourceShape[V] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val subPromise = Promise[Done]

    (new GeodeQueryGraphLogic[V](shape, cache, sql) {

      override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
        subPromise.success(Done)
      }

      setHandler(
        out,
        new OutHandler {
          override def onPull() =
            if (initialResultsIterator.hasNext)
              push(out, initialResultsIterator.next())
            else
              completeStage()
        }
      )

    }, subPromise.future)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:43,代码来源:GeodeFiniteSourceStage.scala


示例7: IgnoreLastElements

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.collection.mutable


final class IgnoreLastElements[E](ignoreCount: Int)
  extends GraphStage[FlowShape[E, E]] {

  val in = Inlet[E]("IgnoreLastElement.in")
  val out = Outlet[E]("IgnoreLastElement.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    var isBuffered = false
    val buffer: mutable.Queue[E] = new mutable.Queue[E]

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {

        buffer.enqueue(grab(in))
        if(buffer.size == ignoreCount + 1) {
          push(out, buffer.dequeue())
        } else {
          // As long as the buffer is not full, nothing will be sent downstream.
          // As a result the downstream component will not call 'onPull' and we have
          // to manually pull upstream
          pull(in)
        }
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:47,代码来源:IgnoreLastElements.scala


示例8: AccumulateWhileUnchanged

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.collection.immutable


final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

  val in = Inlet[E]("AccumulateWhileUnchanged.in")
  val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    private var currentState: Option[P] = None
    private val buffer = Vector.newBuilder[E]

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (currentState.isEmpty || currentState.contains(nextState)) {
          buffer += nextElement
          pull(in)
        } else {
          val result = buffer.result()
          buffer.clear()
          buffer += nextElement
          push(out, result)
        }

        currentState = Some(nextState)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        val result = buffer.result()
        if (result.nonEmpty) {
          emit(out, result)
        }
        completeStage()
      }
    })

    override def postStop(): Unit = {
      buffer.clear()
    }
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:59,代码来源:AccumulateWhileUnchanged.scala


示例9: ByteStringToDeltaStage

//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}
import scodec.DecodeResult
import scodec.bits.BitVector

import scala.annotation.tailrec

class ByteStringToDeltaStage extends GraphStage[FlowShape[ByteString, FactorizedDeltaTick]] {

  val in = Inlet[ByteString]("ByteStringToDeltaStage.in")
  val out = Outlet[FactorizedDeltaTick]("ByteStringToDeltaStage.out")

  def shape: FlowShape[ByteString, FactorizedDeltaTick] = FlowShape(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
      setHandler(out, this)

      val inHandler = new InHandler {

        def decodeAllFromBits(bits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {

          @tailrec
          def compute(results: Vector[NonNegativeFactorizedDeltaTick], remainingBits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {
            NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV.decode(remainingBits) match {
              case Successful(DecodeResult(value, BitVector.empty)) =>
                (results :+ value, BitVector.empty)
              case Successful(DecodeResult(value, remainder)) if remainder.sizeGreaterThan(25) =>
                compute(results :+ value, remainder)
              case Successful(DecodeResult(value, remainder)) =>
                (results :+ value, remainder)
              case Failure(e) =>
                println("e = " + e)
                (results, BitVector.empty)
            }
          }

          compute(Vector.empty, bits)
        }

        private[this] var remainingBits = BitVector.empty

        def onPush(): Unit = {
          val bits = BitVector.view(grab(in).asByteBuffer)
          val (results, rest) = decodeAllFromBits(remainingBits ++ bits)
          emitMultiple(out, results.map(_.withNegatives))
          remainingBits = rest
        }
      }

      setHandler(in, inHandler)
    }

} 
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:62,代码来源:ByteStringToDeltaStage.scala


示例10: DeltaToByteStringStage

//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}

class DeltaToByteStringStage extends GraphStage[FlowShape[FactorizedDeltaTick, ByteString]] {

  val in = Inlet[FactorizedDeltaTick]("DeltaToByteStringStage.in")
  val out = Outlet[ByteString]("DeltaToByteStringStage.out")

  def shape: FlowShape[FactorizedDeltaTick, ByteString] = FlowShape(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
      setHandler(out, this)


      def onPush(): Unit = {
        val delta = grab(in)
        nonNegFactorizedDeltaTickCodecV.encode(delta.nonNegative) match {
          case Successful(x) => emit(out, ByteString(x.toByteBuffer))
          case Failure(err) => fail(out, new Exception(err.messageWithContext))
        }
      }
      setHandler(in, this)
    }

} 
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:35,代码来源:DeltaToByteStringStage.scala


示例11: TickToDeltaStage

//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}

class TickToDeltaStage
    extends GraphStage[FlowShape[Tick, FactorizedDeltaTick]] {

  val in = Inlet[Tick]("FactorizeDeltaTickStage.in")
  val out = Outlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.out")

  def shape: FlowShape[Tick, FactorizedDeltaTick] = FlowShape(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler {

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
      setHandler(out, this)

      val firstTickInHandler: InHandler = new InHandler {
        def onPush(): Unit = {
          val tick = grab(in)
          val factorizedTick: FactorizedTick = tick.factorize
          emit(out, FactorizedDeltaTick(factorizedTick.time, factorizedTick.bid, factorizedTick.ask))
          setHandler(in, withPrevInHandler(tick))
        }
      }

      def withPrevInHandler(initialTick: Tick): InHandler = new InHandler {

        private[this] var prevTick = initialTick

        def onPush(): Unit = {
          val tick = grab(in)
          emit(out, tick.factorize.deltaTo(prevTick.factorize))
          prevTick = tick
          setHandler(in, withPrevInHandler(tick))
        }
      }

      setHandler(in, firstTickInHandler)
    }
} 
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:45,代码来源:TickToDeltaStage.scala


示例12: DeltaToTickStage

//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}

class DeltaToTickStage
  extends GraphStage[FlowShape[FactorizedDeltaTick, Tick]] {

    val in = Inlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.in")
    val out = Outlet[Tick]("FactorizeDeltaTickStage.out")

    def shape: FlowShape[FactorizedDeltaTick, Tick] = FlowShape(in, out)

    def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new GraphStageLogic(shape) with OutHandler {

        def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
        setHandler(out, this)

        val firstDeltaInHandler: InHandler = new InHandler {
          def onPush(): Unit = {
            val delta = grab(in)
            println("delta = " + delta)
            val newFactorizedTick: FactorizedTick = Tick(0L, 0, 0).factorize.withDelta(delta)
            emit(out, newFactorizedTick.normalize)
            setHandler(in, withPrevInHandler(newFactorizedTick))
          }
        }

        def withPrevInHandler(initialFactorizedTick: FactorizedTick): InHandler = new InHandler {

          private[this] var prevFactorizedTick = initialFactorizedTick

          def onPush(): Unit = {
            val delta = grab(in)
            val newFactorizedTick: FactorizedTick = prevFactorizedTick.withDelta(delta)
            emit(out, newFactorizedTick.normalize)
            prevFactorizedTick = newFactorizedTick
          }
        }

        setHandler(in, firstDeltaInHandler)
      }
  } 
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:46,代码来源:DeltaToTickStage.scala


示例13: FileCipher

//设置package包名称以及导入依赖的类
package utils.streams

import javax.crypto._

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import utils.Utils


class FileCipher(cipher: Cipher) extends GraphStage[FlowShape[ByteString, ByteString]] {
  val in = Inlet[ByteString]("FileCipher.in")
  val out = Outlet[ByteString]("FileCipher.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val bytes = grab(in)

          if (bytes.isEmpty)
            pull(in) // Should not happen, request more bytes
          else {
            val ciphered = ByteString(cipher.update(bytes.toArray))

            // En/Decryption (e.g. AES with CBC) will work with blocks, if the block is not completed
            // the cipher will return an empty ByteString. We won't send it because it mess up with
            // chucked encoding, so we pull to complete our block and push it when ready
            if(ciphered.nonEmpty)
              push(out, ciphered)
            else
              pull(in)
          }
        }

        override def onUpstreamFinish(): Unit = {
          val bs = ByteString(cipher.doFinal())
          if (bs.nonEmpty)
            emit(out, bs) // Complete if necessary
          complete(out)
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

object FileCipher {
  def apply(cipher: Cipher): FileCipher = new FileCipher(cipher)
} 
开发者ID:Cumulus-Cloud,项目名称:cumulus,代码行数:55,代码来源:FileCipher.scala


示例14: Encryptor

//设置package包名称以及导入依赖的类
package streams

import java.security.Key
import java.security.spec.AlgorithmParameterSpec
import javax.crypto.Cipher

import akka.stream.{Outlet, Inlet, Attributes, FlowShape}
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage}
import akka.util.ByteString


class Encryptor(cipher: Cipher)
  extends GraphStage[FlowShape[ByteString, ByteString]]{

  val in = Inlet[ByteString]("Encryptor.in")
  val out = Outlet[ByteString]("Encryptor.out")

  def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

  def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val chunk = grab(in).toArray
        val encrypted = cipher.update(chunk)
        emit(out, ByteString(encrypted))

      }

      override def onUpstreamFinish(): Unit = {
        val last = cipher.doFinal()
        emit(out, ByteString(last))
        completeStage()
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit =  pull(in)
    })
  }
} 
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:41,代码来源:Encryptor.scala


示例15: FollowTheLeaderStage

//设置package包名称以及导入依赖的类
package pl.gosub.akka.online.follow.the.leader

import akka.Done
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler}
import akka.stream.{Attributes, FanInShape2, Inlet, Outlet}

import scala.concurrent.{Future, Promise}

class FollowTheLeaderStage(private val ftl: FollowTheLeaderLogic) extends GraphStageWithMaterializedValue[FanInShape2[Double, Double, Double], Future[Done]]{

  // Stage syntax
  val dataIn: Inlet[Double] = Inlet("FollowTheLeaderStage.dataIn")
  val resultsIn: Inlet[Double] = Inlet("FollowTheLeaderStage.resultsIn")
  val predictionsOut: Outlet[Double] = Outlet("FollowTheLeaderStage.predictionsOut")

  override val shape: FanInShape2[Double, Double, Double] = new FanInShape2(dataIn, resultsIn, predictionsOut)

  @scala.throws[Exception](classOf[Exception])
  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    // Completion notification
    val p: Promise[Done] = Promise()

    val logic = new GraphStageLogic(shape) {

      setHandler(resultsIn, new InHandler {
        @scala.throws[Exception](classOf[Exception])
        override def onPush(): Unit = {
          val nextResult = grab(resultsIn)
          read(dataIn)({ x =>
            if (isAvailable(predictionsOut)) push(predictionsOut, ftl.predict(x, nextResult))
          }, () => {})
        }
      })


      setHandler(dataIn, new InHandler {
        override def onPush(): Unit = {
          val x = grab(dataIn)
          read(resultsIn)({previousResult =>
            if (isAvailable(predictionsOut)) push(predictionsOut, ftl.predict(x, previousResult))
          }, () => {})
        }

        override def onUpstreamFinish(): Unit = {
          completeStage()
        }
      })

      setHandler(predictionsOut, new OutHandler {
        override def onPull(): Unit = {
          pull(dataIn)
        }
      })
    }

    (logic, p.future)
  }
} 
开发者ID:gosubpl,项目名称:akka-online,代码行数:59,代码来源:FollowTheLeaderStage.scala


示例16: RepeatNTimes

//设置package包名称以及导入依赖的类
package scaladays.akka.stream

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

final case class RepeatNTimes[T](n: Int) extends GraphStage[FlowShape[T, T]] {
  val in = Inlet[T]("RepeatNTimes.in")
  val out = Outlet[T]("RepeatNTimes.out")

  override def shape: FlowShape[T, T] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    var element: T = null.asInstanceOf[T]
    var remainToBePushed: Int = 0
    var terminating: Boolean = false
    
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        element = grab(in)
        if (remainToBePushed > 0) pushElement()
      }

      override def onUpstreamFinish(): Unit = {
        terminating = true
        if (remainToBePushed == 0) completeStage()
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if (element != null) {
          pushElement()
          completeOrReset()
        } else {
          pull(in)
          remainToBePushed = n
        }
      }
    })

    private def completeOrReset(): Unit = {
      if (remainToBePushed == 0)
        if (terminating) completeStage()
        else element = null.asInstanceOf[T]
    }

    private def pushElement(): Unit = {
      push(out, element)
      remainToBePushed -= 1
    }
  }

} 
开发者ID:ktoso,项目名称:scaladays-berlin-akka-streams,代码行数:55,代码来源:RepeatNTimes.scala


示例17: FrameChunker

//设置package包名称以及导入依赖的类
package fr.xebia.streams.video

import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage._
import akka.util.ByteString
import fr.xebia.streams.transform.Implicits

class FrameChunker(val beginOfFrame: ByteString, val endOfFrame: ByteString) extends GraphStage[FlowShape[ByteString, ByteString]] {
  val in = Inlet[ByteString]("Chunker.in")
  val out = Outlet[ByteString]("Chunker.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var buffer = ByteString.empty

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if (isClosed(in)) emitChunk()
        else pull(in)
      }
    })
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val elem = grab(in)
        buffer ++= elem
        emitChunk()
      }

      override def onUpstreamFinish(): Unit = {
        if (buffer.isEmpty) completeStage()
        // elements left in buffer, keep accepting downstream pulls
        // and push from buffer until buffer is emitted
      }
    })

    private def emitChunk(): Unit = {
      if (buffer.isEmpty) {
        if (isClosed(in)) completeStage()
        else pull(in)
      } else {
        import Implicits._
        buffer.sliceChunk(beginOfFrame, endOfFrame) match {
          case Some((chunk, remaining)) =>
            buffer = remaining
            push(out, chunk)

          case None =>
            pull(in)
        }
      }
    }

  }
} 
开发者ID:fagossa,项目名称:camera_stream,代码行数:55,代码来源:FrameChunker.scala


示例18: AccumulateWhileUnchanged

//设置package包名称以及导入依赖的类
package sample.blog.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.collection.immutable

//https://softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
//http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E ? P) extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
  val in = Inlet[E]("in")
  val out = Outlet[immutable.Seq[E]]("out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) =
    new GraphStageLogic(shape) {
      private var currentState: Option[P] = None
      private val buffer = Vector.newBuilder[E]

      setHandlers(in, out, new InHandler with OutHandler {
        override def onPush(): Unit = {
          val nextElement = grab(in)
          val nextState = propertyExtractor(nextElement)

          if (currentState.isEmpty || currentState.contains(nextState)) {
            buffer += nextElement
            pull(in)
          } else {
            val result = buffer.result
            buffer.clear
            buffer += nextElement
            push(out, result)
          }

          currentState = Some(nextState)
        }

        override def onPull(): Unit = pull(in)

        override def onUpstreamFinish(): Unit = {
          val result = buffer.result
          if (result.nonEmpty) {
            emit(out, result)
          }
          completeStage()
        }
      })

      override def postStop(): Unit = {
        buffer.clear
      }
    }
} 
开发者ID:haghard,项目名称:akka-pq,代码行数:55,代码来源:AccumulateWhileUnchanged.scala


示例19: Poll

//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream

import akka.stream.{Attributes, Inlet, Outlet}
import de.sciss.fscape.stream.impl.{NodeImpl, PollImpl, SinkShape2, StageImpl}

// XXX TODO --- we could use an `Outlet[String]`, that might be making perfect sense
object Poll {
  def apply(in: Outlet[BufLike], trig: OutI, label: String)(implicit b: Builder): Unit = {
    // println(s"Poll($in, $trig, $label)")
    val stage0  = new Stage(label = label)
    val stage   = b.add(stage0)
    b.connect(in  , stage.in0)
    b.connect(trig, stage.in1)
  }

  private final val name = "Poll"

  private type Shape = SinkShape2[BufLike, BufI]

  private final class Stage(label: String)(implicit ctrl: Control) extends StageImpl[Shape](name) {
    val shape = SinkShape2(
      in0 = Inlet[BufLike](s"$name.in"),
      in1 = InI(s"$name.trig")
    )

    def createLogic(attr: Attributes) = new Logic(label = label, shape = shape)
  }

  private final class Logic(label: String, shape: Shape)(implicit ctrl: Control)
    extends NodeImpl(name, shape)
      with PollImpl[BufLike] {

    override def toString = s"$name-L($label)"

    protected def trigger(buf: BufLike, off: Int): Unit = {
      val x0 = buf.buf(off)
      // XXX TODO --- make console selectable
      println(s"$label: $x0")
    }
  }
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:43,代码来源:Poll.scala


示例20: Builder

//设置package包名称以及导入依赖的类
package de.sciss.fscape.stream

import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{Graph, Inlet, Outlet, Shape}

object Builder {
  def apply()(implicit dsl: GraphDSL.Builder[NotUsed], ctrl: Control): Builder = new Impl(ctrl)

  private final class Impl(val control: Control)(implicit b: GraphDSL.Builder[NotUsed]) extends Builder {
    def add[S <: Shape](graph: Graph[S, _]): S = b.add(graph)

    def dsl: GraphDSL.Builder[NotUsed] = b

    def connect[A](out: Outlet[A], in: Inlet[A]): Unit = {
      import GraphDSL.Implicits._
      out ~> in
    }

    def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B] = {
      import GraphDSL.Implicits._
      out.map(fun).outlet
    }
  }
}
trait Builder {
  def control: Control

  def dsl: GraphDSL.Builder[NotUsed]

  def add[S <: Shape](graph: Graph[S, _]): S

  def map[A, B](out: Outlet[A])(fun: A => B): Outlet[B]

  def connect[A](out: Outlet[A], in: Inlet[A]): Unit
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:37,代码来源:Builder.scala



注:本文中的akka.stream.Outlet类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala DefaultDB类代码示例发布时间:2022-05-23
下一篇:
Scala ActorPath类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap