本文整理汇总了Scala中akka.stream.FlowShape类的典型用法代码示例。如果您正苦于以下问题:Scala FlowShape类的具体用法?Scala FlowShape怎么用?Scala FlowShape使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了FlowShape类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: FlowFromGraph
//设置package包名称以及导入依赖的类
package sample.graphDSL
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}
object FlowFromGraph {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("FlowFromGraph")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
val listOfFlows = List(processorFlow1, processorFlow2)
def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")
Flow.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))
indexFlows.foreach(broadcast ~> _ ~> merge)
FlowShape(broadcast.in, merge.out)
})
}
val compoundFlow = compoundFlowFrom(listOfFlows)
Source(1 to 10)
.via(compoundFlow)
.runWith(Sink.foreach(println(_)))
.onComplete(_ => system.terminate())
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala
示例2: every
//设置package包名称以及导入依赖的类
package frontend
import akka.stream.{FlowShape, Attributes}
import akka.stream.scaladsl._
import scala.concurrent.duration.{Duration, FiniteDuration}
trait StreamSupport {
def every[T](interval: FiniteDuration): Flow[T, T, akka.NotUsed] =
Flow.fromGraph(
GraphDSL.create() { implicit b ?
import GraphDSL.Implicits._
val zip = b.add(ZipWith[T, Unit, T](Keep.left).withAttributes(Attributes.inputBuffer(1, 1)))
val dropOne = b.add(Flow[T].drop(1))
Source.tick(Duration.Zero, interval, ()) ~> zip.in1
zip.out ~> dropOne.in
FlowShape(zip.in0, dropOne.outlet)
}
)
def responseWindow(duration: FiniteDuration): Flow[play.api.libs.json.JsArray, play.api.libs.json.JsArray, akka.NotUsed] =
(Flow[play.api.libs.json.JsArray].conflate((array, _) ? array)
.zipWith(Source.tick(duration, duration, ()))(Keep.left))
.scan(play.api.libs.json.JsArray(Seq.empty[play.api.libs.json.JsValue]))((_, stats) => stats)
.withAttributes(Attributes.inputBuffer(1, 1))
}
开发者ID:haghard,项目名称:scenter-frontend,代码行数:27,代码来源:StreamSupport.scala
示例3: MemoryBuffer
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
private[alpakka] final class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
val in = Inlet[ByteString]("MemoryBuffer.in")
val out = Outlet[Chunk]("MemoryBuffer.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var buffer = ByteString.empty
override def onPull(): Unit = if (isClosed(in)) emit() else pull(in)
override def onPush(): Unit = {
val elem = grab(in)
if (buffer.size + elem.size > maxSize) {
failStage(new IllegalStateException("Buffer size of " + maxSize + " bytes exceeded."))
} else {
buffer ++= elem
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) emit()
completeStage()
}
def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
setHandlers(in, out, this)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:39,代码来源:MemoryBuffer.scala
示例4: CsvParsingStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv
import akka.event.Logging
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.NonFatal
private[csv] class CsvParsingStage(delimiter: Byte, quoteChar: Byte, escapeChar: Byte)
extends GraphStage[FlowShape[ByteString, List[ByteString]]] {
private val in = Inlet[ByteString](Logging.simpleName(this) + ".in")
private val out = Outlet[List[ByteString]](Logging.simpleName(this) + ".out")
override val shape = FlowShape(in, out)
override protected def initialAttributes: Attributes = Attributes.name("CsvParsing")
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
private[this] val buffer = new CsvParser(delimiter, quoteChar, escapeChar)
setHandlers(in, out, this)
override def onPush(): Unit = {
buffer.offer(grab(in))
tryPollBuffer()
}
override def onPull(): Unit =
tryPollBuffer()
override def onUpstreamFinish(): Unit = {
emitRemaining()
completeStage()
}
private def tryPollBuffer() =
try buffer.poll(requireLineEnd = true) match {
case Some(csvLine) ? push(out, csvLine)
case _ ?
if (isClosed(in)) {
emitRemaining()
completeStage()
} else pull(in)
} catch {
case NonFatal(ex) ? failStage(ex)
}
@tailrec private def emitRemaining(): Unit =
buffer.poll(requireLineEnd = false) match {
case Some(csvLine) ?
emit(out, csvLine)
emitRemaining()
case _ ?
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:62,代码来源:CsvParsingStage.scala
示例5: CsvToMapStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv
import java.nio.charset.Charset
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import scala.collection.immutable
private[csv] class CsvToMapStage(columnNames: Option[immutable.Seq[String]], charset: Charset)
extends GraphStage[FlowShape[immutable.Seq[ByteString], Map[String, ByteString]]] {
override protected def initialAttributes: Attributes = Attributes.name("CsvToMap")
private val in = Inlet[immutable.Seq[ByteString]]("CsvToMap.in")
private val out = Outlet[Map[String, ByteString]]("CsvToMap.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var headers = columnNames
setHandlers(in, out, this)
override def onPush(): Unit = {
val elem = grab(in)
if (headers.isDefined) {
val map = headers.get.zip(elem).toMap
push(out, map)
} else {
headers = Some(elem.map(_.decodeString(charset)))
pull(in)
}
}
override def onPull(): Unit = pull(in)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:41,代码来源:CsvToMapStage.scala
示例6: IgnoreLastElements
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.mutable
final class IgnoreLastElements[E](ignoreCount: Int)
extends GraphStage[FlowShape[E, E]] {
val in = Inlet[E]("IgnoreLastElement.in")
val out = Outlet[E]("IgnoreLastElement.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
var isBuffered = false
val buffer: mutable.Queue[E] = new mutable.Queue[E]
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
buffer.enqueue(grab(in))
if(buffer.size == ignoreCount + 1) {
push(out, buffer.dequeue())
} else {
// As long as the buffer is not full, nothing will be sent downstream.
// As a result the downstream component will not call 'onPull' and we have
// to manually pull upstream
pull(in)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
})
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:47,代码来源:IgnoreLastElements.scala
示例7: AccumulateWhileUnchanged
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import scala.collection.immutable
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private var currentState: Option[P] = None
private val buffer = Vector.newBuilder[E]
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
if (currentState.isEmpty || currentState.contains(nextState)) {
buffer += nextElement
pull(in)
} else {
val result = buffer.result()
buffer.clear()
buffer += nextElement
push(out, result)
}
currentState = Some(nextState)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
}
completeStage()
}
})
override def postStop(): Unit = {
buffer.clear()
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:59,代码来源:AccumulateWhileUnchanged.scala
示例8: ByteStringToDeltaStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}
import scodec.DecodeResult
import scodec.bits.BitVector
import scala.annotation.tailrec
class ByteStringToDeltaStage extends GraphStage[FlowShape[ByteString, FactorizedDeltaTick]] {
val in = Inlet[ByteString]("ByteStringToDeltaStage.in")
val out = Outlet[FactorizedDeltaTick]("ByteStringToDeltaStage.out")
def shape: FlowShape[ByteString, FactorizedDeltaTick] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
val inHandler = new InHandler {
def decodeAllFromBits(bits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {
@tailrec
def compute(results: Vector[NonNegativeFactorizedDeltaTick], remainingBits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {
NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV.decode(remainingBits) match {
case Successful(DecodeResult(value, BitVector.empty)) =>
(results :+ value, BitVector.empty)
case Successful(DecodeResult(value, remainder)) if remainder.sizeGreaterThan(25) =>
compute(results :+ value, remainder)
case Successful(DecodeResult(value, remainder)) =>
(results :+ value, remainder)
case Failure(e) =>
println("e = " + e)
(results, BitVector.empty)
}
}
compute(Vector.empty, bits)
}
private[this] var remainingBits = BitVector.empty
def onPush(): Unit = {
val bits = BitVector.view(grab(in).asByteBuffer)
val (results, rest) = decodeAllFromBits(remainingBits ++ bits)
emitMultiple(out, results.map(_.withNegatives))
remainingBits = rest
}
}
setHandler(in, inHandler)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:62,代码来源:ByteStringToDeltaStage.scala
示例9: DeltaToByteStringStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}
class DeltaToByteStringStage extends GraphStage[FlowShape[FactorizedDeltaTick, ByteString]] {
val in = Inlet[FactorizedDeltaTick]("DeltaToByteStringStage.in")
val out = Outlet[ByteString]("DeltaToByteStringStage.out")
def shape: FlowShape[FactorizedDeltaTick, ByteString] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
def onPush(): Unit = {
val delta = grab(in)
nonNegFactorizedDeltaTickCodecV.encode(delta.nonNegative) match {
case Successful(x) => emit(out, ByteString(x.toByteBuffer))
case Failure(err) => fail(out, new Exception(err.messageWithContext))
}
}
setHandler(in, this)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:35,代码来源:DeltaToByteStringStage.scala
示例10: TickToDeltaStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}
class TickToDeltaStage
extends GraphStage[FlowShape[Tick, FactorizedDeltaTick]] {
val in = Inlet[Tick]("FactorizeDeltaTickStage.in")
val out = Outlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.out")
def shape: FlowShape[Tick, FactorizedDeltaTick] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
val firstTickInHandler: InHandler = new InHandler {
def onPush(): Unit = {
val tick = grab(in)
val factorizedTick: FactorizedTick = tick.factorize
emit(out, FactorizedDeltaTick(factorizedTick.time, factorizedTick.bid, factorizedTick.ask))
setHandler(in, withPrevInHandler(tick))
}
}
def withPrevInHandler(initialTick: Tick): InHandler = new InHandler {
private[this] var prevTick = initialTick
def onPush(): Unit = {
val tick = grab(in)
emit(out, tick.factorize.deltaTo(prevTick.factorize))
prevTick = tick
setHandler(in, withPrevInHandler(tick))
}
}
setHandler(in, firstTickInHandler)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:45,代码来源:TickToDeltaStage.scala
示例11: DeltaToTickStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}
class DeltaToTickStage
extends GraphStage[FlowShape[FactorizedDeltaTick, Tick]] {
val in = Inlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.in")
val out = Outlet[Tick]("FactorizeDeltaTickStage.out")
def shape: FlowShape[FactorizedDeltaTick, Tick] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
val firstDeltaInHandler: InHandler = new InHandler {
def onPush(): Unit = {
val delta = grab(in)
println("delta = " + delta)
val newFactorizedTick: FactorizedTick = Tick(0L, 0, 0).factorize.withDelta(delta)
emit(out, newFactorizedTick.normalize)
setHandler(in, withPrevInHandler(newFactorizedTick))
}
}
def withPrevInHandler(initialFactorizedTick: FactorizedTick): InHandler = new InHandler {
private[this] var prevFactorizedTick = initialFactorizedTick
def onPush(): Unit = {
val delta = grab(in)
val newFactorizedTick: FactorizedTick = prevFactorizedTick.withDelta(delta)
emit(out, newFactorizedTick.normalize)
prevFactorizedTick = newFactorizedTick
}
}
setHandler(in, firstDeltaInHandler)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:46,代码来源:DeltaToTickStage.scala
示例12: FileCipher
//设置package包名称以及导入依赖的类
package utils.streams
import javax.crypto._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import utils.Utils
class FileCipher(cipher: Cipher) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("FileCipher.in")
val out = Outlet[ByteString]("FileCipher.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val bytes = grab(in)
if (bytes.isEmpty)
pull(in) // Should not happen, request more bytes
else {
val ciphered = ByteString(cipher.update(bytes.toArray))
// En/Decryption (e.g. AES with CBC) will work with blocks, if the block is not completed
// the cipher will return an empty ByteString. We won't send it because it mess up with
// chucked encoding, so we pull to complete our block and push it when ready
if(ciphered.nonEmpty)
push(out, ciphered)
else
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
val bs = ByteString(cipher.doFinal())
if (bs.nonEmpty)
emit(out, bs) // Complete if necessary
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
object FileCipher {
def apply(cipher: Cipher): FileCipher = new FileCipher(cipher)
}
开发者ID:Cumulus-Cloud,项目名称:cumulus,代码行数:55,代码来源:FileCipher.scala
示例13: PairUpWithToString_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{FlowShape, ActorMaterializer}
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration._
object PairUpWithToString_ extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val pairUpWithToString =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Int, String]())
broadcast.out(0).map(identity) ~> zip.in0
broadcast.out(1).map(_.toString) ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
val pair = pairUpWithToString.runWith(Source(List(1)), Sink.head)
val result = Await.result(pair._2, 300 millis)
println(result)
system.terminate
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:33,代码来源:PairUpWithToString_.scala
示例14: MaterializedValue_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, SourceShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import GraphDSL.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object MaterializedValue_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(
GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
val r = Source(1 to 10)
.via(foldFlow)
.runWith(Sink.head)
println(Await.result(r, 200 millis))
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(
GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
Source(1 to 10) ~> fold
SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
})
val r2 = cyclicFold
.via(foldFlow)
.runWith(Sink.head)
println(Await.result(r2, 200 millis))
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:37,代码来源:MaterializedValue_.scala
示例15: Encryptor
//设置package包名称以及导入依赖的类
package streams
import java.security.Key
import java.security.spec.AlgorithmParameterSpec
import javax.crypto.Cipher
import akka.stream.{Outlet, Inlet, Attributes, FlowShape}
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage}
import akka.util.ByteString
class Encryptor(cipher: Cipher)
extends GraphStage[FlowShape[ByteString, ByteString]]{
val in = Inlet[ByteString]("Encryptor.in")
val out = Outlet[ByteString]("Encryptor.out")
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val chunk = grab(in).toArray
val encrypted = cipher.update(chunk)
emit(out, ByteString(encrypted))
}
override def onUpstreamFinish(): Unit = {
val last = cipher.doFinal()
emit(out, ByteString(last))
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:41,代码来源:Encryptor.scala
示例16: RepeatNTimes
//设置package包名称以及导入依赖的类
package scaladays.akka.stream
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
final case class RepeatNTimes[T](n: Int) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("RepeatNTimes.in")
val out = Outlet[T]("RepeatNTimes.out")
override def shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
var element: T = null.asInstanceOf[T]
var remainToBePushed: Int = 0
var terminating: Boolean = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
element = grab(in)
if (remainToBePushed > 0) pushElement()
}
override def onUpstreamFinish(): Unit = {
terminating = true
if (remainToBePushed == 0) completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (element != null) {
pushElement()
completeOrReset()
} else {
pull(in)
remainToBePushed = n
}
}
})
private def completeOrReset(): Unit = {
if (remainToBePushed == 0)
if (terminating) completeStage()
else element = null.asInstanceOf[T]
}
private def pushElement(): Unit = {
push(out, element)
remainToBePushed -= 1
}
}
}
开发者ID:ktoso,项目名称:scaladays-berlin-akka-streams,代码行数:55,代码来源:RepeatNTimes.scala
示例17: FrameChunker
//设置package包名称以及导入依赖的类
package fr.xebia.streams.video
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage._
import akka.util.ByteString
import fr.xebia.streams.transform.Implicits
class FrameChunker(val beginOfFrame: ByteString, val endOfFrame: ByteString) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("Chunker.in")
val out = Outlet[ByteString]("Chunker.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var buffer = ByteString.empty
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) emitChunk()
else pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
emitChunk()
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) completeStage()
// elements left in buffer, keep accepting downstream pulls
// and push from buffer until buffer is emitted
}
})
private def emitChunk(): Unit = {
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
else pull(in)
} else {
import Implicits._
buffer.sliceChunk(beginOfFrame, endOfFrame) match {
case Some((chunk, remaining)) =>
buffer = remaining
push(out, chunk)
case None =>
pull(in)
}
}
}
}
}
开发者ID:fagossa,项目名称:camera_stream,代码行数:55,代码来源:FrameChunker.scala
示例18: AccumulateWhileUnchanged
//设置package包名称以及导入依赖的类
package sample.blog.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import scala.collection.immutable
//https://softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
//http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E ? P) extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
val in = Inlet[E]("in")
val out = Outlet[immutable.Seq[E]]("out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) {
private var currentState: Option[P] = None
private val buffer = Vector.newBuilder[E]
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
if (currentState.isEmpty || currentState.contains(nextState)) {
buffer += nextElement
pull(in)
} else {
val result = buffer.result
buffer.clear
buffer += nextElement
push(out, result)
}
currentState = Some(nextState)
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
val result = buffer.result
if (result.nonEmpty) {
emit(out, result)
}
completeStage()
}
})
override def postStop(): Unit = {
buffer.clear
}
}
}
开发者ID:haghard,项目名称:akka-pq,代码行数:55,代码来源:AccumulateWhileUnchanged.scala
示例19: TimedFlowOps
//设置package包名称以及导入依赖的类
package com.flipkart.connekt.busybees.streams.flows.profilers
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{BidiFlow, Flow, GraphDSL}
import akka.stream.{BidiShape, FlowShape}
import com.flipkart.connekt.busybees.models.RequestTracker
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import com.flipkart.connekt.commons.metrics.Instrumented
import com.flipkart.connekt.commons.utils.StringUtils._
import scala.collection.JavaConverters._
import scala.util.Try
object TimedFlowOps {
implicit class TimedFlow[I, O, T <: RequestTracker, M](dispatchFlow: Flow[(I, T), (Try[O], T), M]) extends Instrumented {
val startTimes = new ConcurrentHashMap[T, Long]().asScala
private def profilingShape(apiName: String) = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val out = b.add(Flow[(I, T)].map {
case (request, requestTracker) =>
startTimes.put(requestTracker, System.currentTimeMillis())
(request, requestTracker)
})
val in = b.add(Flow[(Try[O], T)].map {
case (response, httpRequestTracker) =>
startTimes.get(httpRequestTracker).map(start => {
startTimes.remove(httpRequestTracker)
val duration = System.currentTimeMillis() - start
ConnektLogger(LogFile.PROCESSORS).trace(s"TimedFlowOps/$apiName MessageId: ${httpRequestTracker.messageId} took : $duration ms")
duration
}).foreach(registry.timer(getMetricName(apiName + Option(httpRequestTracker.provider).map("." + _).orEmpty)).update(_, TimeUnit.MILLISECONDS))
(response, httpRequestTracker)
})
BidiShape.fromFlows(out, in)
})
def timedAs(apiName: String) = Flow.fromGraph(GraphDSL.create() { implicit b =>
val s = b.add(profilingShape(apiName))
val p = b.add(dispatchFlow)
s.out1 ~> p ~> s.in2
FlowShape(s.in1, s.out2)
})
}
}
开发者ID:ayush03agarwal,项目名称:connekt,代码行数:56,代码来源:TimedFlowOps.scala
示例20: Timer
//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream
import akka.stream.{Attributes, FlowShape}
import de.sciss.fscape.stream.impl.{FilterChunkImpl, FilterIn1LImpl, StageImpl, NodeImpl}
object Timer {
def apply(trig: OutI)(implicit b: Builder): OutL = {
val stage0 = new Stage
val stage = b.add(stage0)
b.connect(trig, stage.in)
stage.out
}
private final val name = "Timer"
private type Shape = FlowShape[BufI, BufL]
private final class Stage(implicit ctrl: Control) extends StageImpl[Shape](name) {
val shape = new FlowShape(
in = InI (s"$name.trig"),
out = OutL(s"$name.out" )
)
def createLogic(attr: Attributes) = new Logic(shape)
}
private final class Logic(shape: Shape)(implicit ctrl: Control)
extends NodeImpl(name, shape)
with FilterIn1LImpl[BufI]
with FilterChunkImpl[BufI, BufL, Shape] {
private[this] var high = false
private[this] var count = 0L
protected def processChunk(inOff: Int, outOff: Int, len: Int): Unit = {
val b0 = bufIn0.buf
val out = bufOut0.buf
var h0 = high
var h1 = false
var c0 = count
var inOffI = inOff
var outOffI = outOff
val stop0 = inOff + len
while (inOffI < stop0) {
h1 = b0(inOffI) > 0
if (h1 && !h0) {
// println(s"RESET FROM $c0")
c0 = 0L
}
out(outOffI) = c0
inOffI += 1
outOffI += 1
c0 += 1
h0 = h1
}
high = h0
count = c0
}
}
}
开发者ID:Sciss,项目名称:FScape-next,代码行数:62,代码来源:Timer.scala
注:本文中的akka.stream.FlowShape类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论