本文整理汇总了Scala中akka.stream.stage.GraphStageLogic类的典型用法代码示例。如果您正苦于以下问题:Scala GraphStageLogic类的具体用法?Scala GraphStageLogic怎么用?Scala GraphStageLogic使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了GraphStageLogic类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GridfsSource
//设置package包名称以及导入依赖的类
import java.nio.ByteBuffer
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import org.mongodb.scala.gridfs.GridFSDownloadStream
import scala.concurrent.{ExecutionContext, Future}
case class GridfsSource(stream: GridFSDownloadStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Future[ByteBuffer]]] {
val out: Outlet[Future[ByteBuffer]] = Outlet("File Stream")
override def shape: SourceShape[Future[ByteBuffer]] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val buffer = ByteBuffer.allocate(chunkSize)
val loaded = stream.read(buffer).toFuture().map(_ => buffer)
push(out, loaded)
}
override def onDownstreamFinish(): Unit = {
super.onDownstreamFinish()
stream.close()
complete(out)
}
})
}
}
object GridfsSource {
def apply(stream: GridFSDownloadStream)(implicit ec: ExecutionContext): Source[ByteString, NotUsed] = {
Source.fromGraph(GridfsSource(stream, 512 * 1024)).mapAsync(1)(fb => fb.map { buffer => buffer.flip(); ByteString(buffer) }).takeWhile(_.nonEmpty)
}
}
开发者ID:zhijun,项目名称:GridFSSource,代码行数:42,代码来源:GridFSSource.scala
示例2: FtpGraphStageLogic
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp
package impl
import akka.stream.Shape
import akka.stream.stage.GraphStageLogic
import scala.util.control.NonFatal
private[ftp] abstract class FtpGraphStageLogic[T, FtpClient, S <: RemoteFileSettings](
val shape: Shape,
val ftpLike: FtpLike[FtpClient, S],
val connectionSettings: S,
val ftpClient: () => FtpClient
) extends GraphStageLogic(shape) {
protected[this] implicit val client = ftpClient()
protected[this] var handler: Option[ftpLike.Handler] = Option.empty[ftpLike.Handler]
override def preStart(): Unit = {
super.preStart()
try {
val tryConnect = ftpLike.connect(connectionSettings)
if (tryConnect.isSuccess) {
handler = tryConnect.toOption
} else
tryConnect.failed.foreach { case NonFatal(t) => throw t }
doPreStart()
} catch {
case NonFatal(t) =>
matFailure(t)
failStage(t)
}
}
override def postStop(): Unit = {
matSuccess()
disconnect()
super.postStop()
}
protected[this] def doPreStart(): Unit
protected[this] def disconnect(): Unit =
handler.foreach(ftpLike.disconnect)
protected[this] def matSuccess(): Boolean
protected[this] def matFailure(t: Throwable): Boolean
}
开发者ID:akka,项目名称:alpakka,代码行数:51,代码来源:FtpGraphStageLogic.scala
示例3: JmsSinkStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.jms
import javax.jms.{MessageProducer, TextMessage}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape}
final class JmsSinkStage(settings: JmsSinkSettings) extends GraphStage[SinkShape[JmsTextMessage]] {
private val in = Inlet[JmsTextMessage]("JmsSink.in")
override def shape: SinkShape[JmsTextMessage] = SinkShape.of(in)
override protected def initialAttributes: Attributes =
ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with JmsConnector {
private var jmsProducer: MessageProducer = _
override private[jms] def jmsSettings = settings
override def preStart(): Unit = {
jmsSession = openSession()
jmsProducer = jmsSession.session.createProducer(jmsSession.destination)
if (settings.timeToLive.nonEmpty) {
jmsProducer.setTimeToLive(settings.timeToLive.get.toMillis)
}
pull(in)
}
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem: JmsTextMessage = grab(in)
val textMessage: TextMessage = jmsSession.session.createTextMessage(elem.body)
elem.properties.foreach {
case (key, v) =>
v match {
case v: String => textMessage.setStringProperty(key, v)
case v: Int => textMessage.setIntProperty(key, v)
case v: Boolean => textMessage.setBooleanProperty(key, v)
case v: Byte => textMessage.setByteProperty(key, v)
case v: Short => textMessage.setShortProperty(key, v)
case v: Long => textMessage.setLongProperty(key, v)
case v: Double => textMessage.setDoubleProperty(key, v)
}
}
jmsProducer.send(textMessage)
pull(in)
}
}
)
override def postStop(): Unit =
Option(jmsSession).foreach(_.closeSession())
}
}
开发者ID:akka,项目名称:alpakka,代码行数:62,代码来源:JmsSinkStage.scala
示例4: SplitAfterSize
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.stream.scaladsl.SubFlow
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStage
import akka.util.ByteString
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStageLogic
import akka.stream.Attributes
import akka.stream.stage.OutHandler
import akka.stream.stage.InHandler
import akka.stream.scaladsl.RunnableGraph
import akka.stream.scaladsl.Flow
private[alpakka] object SplitAfterSize {
def apply[I, M](minChunkSize: Long)(in: Flow[I, ByteString, M]): SubFlow[ByteString, M, in.Repr, in.Closed] =
in.via(insertMarkers(minChunkSize)).splitWhen(_ == NewStream).collect { case bs: ByteString => bs }
private case object NewStream
private def insertMarkers(minChunkSize: Long) = new GraphStage[FlowShape[ByteString, Any]] {
val in = Inlet[ByteString]("SplitAfterSize.in")
val out = Outlet[Any]("SplitAfterSize.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
var count: Long = 0
override def onPull(): Unit = pull(in)
override def onPush(): Unit = {
val elem = grab(in)
count += elem.size
if (count >= minChunkSize) {
count = 0
emitMultiple(out, elem :: NewStream :: Nil)
} else emit(out, elem)
}
setHandlers(in, out, this)
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:46,代码来源:SplitAfterSize.scala
示例5: MemoryBuffer
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
private[alpakka] final class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
val in = Inlet[ByteString]("MemoryBuffer.in")
val out = Outlet[Chunk]("MemoryBuffer.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var buffer = ByteString.empty
override def onPull(): Unit = if (isClosed(in)) emit() else pull(in)
override def onPush(): Unit = {
val elem = grab(in)
if (buffer.size + elem.size > maxSize) {
failStage(new IllegalStateException("Buffer size of " + maxSize + " bytes exceeded."))
} else {
buffer ++= elem
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) emit()
completeStage()
}
def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
setHandlers(in, out, this)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:39,代码来源:MemoryBuffer.scala
示例6: CsvParsingStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv
import akka.event.Logging
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.NonFatal
private[csv] class CsvParsingStage(delimiter: Byte, quoteChar: Byte, escapeChar: Byte)
extends GraphStage[FlowShape[ByteString, List[ByteString]]] {
private val in = Inlet[ByteString](Logging.simpleName(this) + ".in")
private val out = Outlet[List[ByteString]](Logging.simpleName(this) + ".out")
override val shape = FlowShape(in, out)
override protected def initialAttributes: Attributes = Attributes.name("CsvParsing")
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
private[this] val buffer = new CsvParser(delimiter, quoteChar, escapeChar)
setHandlers(in, out, this)
override def onPush(): Unit = {
buffer.offer(grab(in))
tryPollBuffer()
}
override def onPull(): Unit =
tryPollBuffer()
override def onUpstreamFinish(): Unit = {
emitRemaining()
completeStage()
}
private def tryPollBuffer() =
try buffer.poll(requireLineEnd = true) match {
case Some(csvLine) ? push(out, csvLine)
case _ ?
if (isClosed(in)) {
emitRemaining()
completeStage()
} else pull(in)
} catch {
case NonFatal(ex) ? failStage(ex)
}
@tailrec private def emitRemaining(): Unit =
buffer.poll(requireLineEnd = false) match {
case Some(csvLine) ?
emit(out, csvLine)
emitRemaining()
case _ ?
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:62,代码来源:CsvParsingStage.scala
示例7: CsvToMapStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv
import java.nio.charset.Charset
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import scala.collection.immutable
private[csv] class CsvToMapStage(columnNames: Option[immutable.Seq[String]], charset: Charset)
extends GraphStage[FlowShape[immutable.Seq[ByteString], Map[String, ByteString]]] {
override protected def initialAttributes: Attributes = Attributes.name("CsvToMap")
private val in = Inlet[immutable.Seq[ByteString]]("CsvToMap.in")
private val out = Outlet[Map[String, ByteString]]("CsvToMap.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var headers = columnNames
setHandlers(in, out, this)
override def onPush(): Unit = {
val elem = grab(in)
if (headers.isDefined) {
val map = headers.get.zip(elem).toMap
push(out, map)
} else {
headers = Some(elem.map(_.decodeString(charset)))
pull(in)
}
}
override def onPull(): Unit = pull(in)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:41,代码来源:CsvToMapStage.scala
示例8: IgnoreLastElements
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.mutable
final class IgnoreLastElements[E](ignoreCount: Int)
extends GraphStage[FlowShape[E, E]] {
val in = Inlet[E]("IgnoreLastElement.in")
val out = Outlet[E]("IgnoreLastElement.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
var isBuffered = false
val buffer: mutable.Queue[E] = new mutable.Queue[E]
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
buffer.enqueue(grab(in))
if(buffer.size == ignoreCount + 1) {
push(out, buffer.dequeue())
} else {
// As long as the buffer is not full, nothing will be sent downstream.
// As a result the downstream component will not call 'onPull' and we have
// to manually pull upstream
pull(in)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
})
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:47,代码来源:IgnoreLastElements.scala
示例9: AccumulateWhileUnchanged
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import scala.collection.immutable
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private var currentState: Option[P] = None
private val buffer = Vector.newBuilder[E]
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
if (currentState.isEmpty || currentState.contains(nextState)) {
buffer += nextElement
pull(in)
} else {
val result = buffer.result()
buffer.clear()
buffer += nextElement
push(out, result)
}
currentState = Some(nextState)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
}
completeStage()
}
})
override def postStop(): Unit = {
buffer.clear()
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:59,代码来源:AccumulateWhileUnchanged.scala
示例10: ByteStringToDeltaStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}
import scodec.DecodeResult
import scodec.bits.BitVector
import scala.annotation.tailrec
class ByteStringToDeltaStage extends GraphStage[FlowShape[ByteString, FactorizedDeltaTick]] {
val in = Inlet[ByteString]("ByteStringToDeltaStage.in")
val out = Outlet[FactorizedDeltaTick]("ByteStringToDeltaStage.out")
def shape: FlowShape[ByteString, FactorizedDeltaTick] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
val inHandler = new InHandler {
def decodeAllFromBits(bits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {
@tailrec
def compute(results: Vector[NonNegativeFactorizedDeltaTick], remainingBits: BitVector): (Vector[NonNegativeFactorizedDeltaTick], BitVector) = {
NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV.decode(remainingBits) match {
case Successful(DecodeResult(value, BitVector.empty)) =>
(results :+ value, BitVector.empty)
case Successful(DecodeResult(value, remainder)) if remainder.sizeGreaterThan(25) =>
compute(results :+ value, remainder)
case Successful(DecodeResult(value, remainder)) =>
(results :+ value, remainder)
case Failure(e) =>
println("e = " + e)
(results, BitVector.empty)
}
}
compute(Vector.empty, bits)
}
private[this] var remainingBits = BitVector.empty
def onPush(): Unit = {
val bits = BitVector.view(grab(in).asByteBuffer)
val (results, rest) = decodeAllFromBits(remainingBits ++ bits)
emitMultiple(out, results.map(_.withNegatives))
remainingBits = rest
}
}
setHandler(in, inHandler)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:62,代码来源:ByteStringToDeltaStage.scala
示例11: DeltaToByteStringStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import com.martinseeler.dtf.NonNegativeFactorizedDeltaTick.nonNegFactorizedDeltaTickCodecV
import com.martinseeler.dtf.{FactorizedDeltaTick, NonNegativeFactorizedDeltaTick}
import scodec.Attempt.{Failure, Successful}
class DeltaToByteStringStage extends GraphStage[FlowShape[FactorizedDeltaTick, ByteString]] {
val in = Inlet[FactorizedDeltaTick]("DeltaToByteStringStage.in")
val out = Outlet[ByteString]("DeltaToByteStringStage.out")
def shape: FlowShape[FactorizedDeltaTick, ByteString] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
def onPush(): Unit = {
val delta = grab(in)
nonNegFactorizedDeltaTickCodecV.encode(delta.nonNegative) match {
case Successful(x) => emit(out, ByteString(x.toByteBuffer))
case Failure(err) => fail(out, new Exception(err.messageWithContext))
}
}
setHandler(in, this)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:35,代码来源:DeltaToByteStringStage.scala
示例12: TickToDeltaStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}
class TickToDeltaStage
extends GraphStage[FlowShape[Tick, FactorizedDeltaTick]] {
val in = Inlet[Tick]("FactorizeDeltaTickStage.in")
val out = Outlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.out")
def shape: FlowShape[Tick, FactorizedDeltaTick] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
val firstTickInHandler: InHandler = new InHandler {
def onPush(): Unit = {
val tick = grab(in)
val factorizedTick: FactorizedTick = tick.factorize
emit(out, FactorizedDeltaTick(factorizedTick.time, factorizedTick.bid, factorizedTick.ask))
setHandler(in, withPrevInHandler(tick))
}
}
def withPrevInHandler(initialTick: Tick): InHandler = new InHandler {
private[this] var prevTick = initialTick
def onPush(): Unit = {
val tick = grab(in)
emit(out, tick.factorize.deltaTo(prevTick.factorize))
prevTick = tick
setHandler(in, withPrevInHandler(tick))
}
}
setHandler(in, firstTickInHandler)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:45,代码来源:TickToDeltaStage.scala
示例13: DeltaToTickStage
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf.stages
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.martinseeler.dtf.{FactorizedDeltaTick, FactorizedTick, Tick}
class DeltaToTickStage
extends GraphStage[FlowShape[FactorizedDeltaTick, Tick]] {
val in = Inlet[FactorizedDeltaTick]("FactorizeDeltaTickStage.in")
val out = Outlet[Tick]("FactorizeDeltaTickStage.out")
def shape: FlowShape[FactorizedDeltaTick, Tick] = FlowShape(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
setHandler(out, this)
val firstDeltaInHandler: InHandler = new InHandler {
def onPush(): Unit = {
val delta = grab(in)
println("delta = " + delta)
val newFactorizedTick: FactorizedTick = Tick(0L, 0, 0).factorize.withDelta(delta)
emit(out, newFactorizedTick.normalize)
setHandler(in, withPrevInHandler(newFactorizedTick))
}
}
def withPrevInHandler(initialFactorizedTick: FactorizedTick): InHandler = new InHandler {
private[this] var prevFactorizedTick = initialFactorizedTick
def onPush(): Unit = {
val delta = grab(in)
val newFactorizedTick: FactorizedTick = prevFactorizedTick.withDelta(delta)
emit(out, newFactorizedTick.normalize)
prevFactorizedTick = newFactorizedTick
}
}
setHandler(in, firstDeltaInHandler)
}
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:46,代码来源:DeltaToTickStage.scala
示例14: FileCipher
//设置package包名称以及导入依赖的类
package utils.streams
import javax.crypto._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import utils.Utils
class FileCipher(cipher: Cipher) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("FileCipher.in")
val out = Outlet[ByteString]("FileCipher.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val bytes = grab(in)
if (bytes.isEmpty)
pull(in) // Should not happen, request more bytes
else {
val ciphered = ByteString(cipher.update(bytes.toArray))
// En/Decryption (e.g. AES with CBC) will work with blocks, if the block is not completed
// the cipher will return an empty ByteString. We won't send it because it mess up with
// chucked encoding, so we pull to complete our block and push it when ready
if(ciphered.nonEmpty)
push(out, ciphered)
else
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
val bs = ByteString(cipher.doFinal())
if (bs.nonEmpty)
emit(out, bs) // Complete if necessary
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
object FileCipher {
def apply(cipher: Cipher): FileCipher = new FileCipher(cipher)
}
开发者ID:Cumulus-Cloud,项目名称:cumulus,代码行数:55,代码来源:FileCipher.scala
示例15: MapShape_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source, Tcp}
import GraphDSL.Implicits._
import akka.NotUsed
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object MapShape_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
val in = Inlet[A]("Map.in")
val out = Outlet[B]("Map.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
@scala.throws[Exception](classOf[Exception])
override def onPush(): Unit = {
push(out, f(grab(in)))
}
})
setHandler(out, new OutHandler {
@scala.throws[Exception](classOf[Exception])
override def onPull(): Unit = {
pull(in)
}
})
}
}
val flowGraph: Graph[FlowShape[Int, Int], NotUsed] = new Map[Int, Int]((a:Int) => a + 1)
val result = Source.fromIterator(() => Iterator from 5).via(flowGraph).to(Sink.foreach(println))
result.run()
Thread.sleep(300)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:52,代码来源:MapShape_.scala
示例16: FilterStage_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import akka.NotUsed
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
object FilterStage_ extends App {
// ?????
class Filter[A, B](f: A => Boolean) extends GraphStage[FlowShape[A, B]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[B]("Filter.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
@scala.throws[Exception](classOf[Exception])
override def onPush(): Unit = {
val elem = grab(in)
if(f(elem)) {
push(out, elem.asInstanceOf[B])
}
}
})
setHandler(out, new OutHandler {
@scala.throws[Exception](classOf[Exception])
override def onPull(): Unit = {
pull(in)
}
})
}
}
// ????
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val flowGraph: Graph[FlowShape[Int, Int], NotUsed] =
new Filter[Int, Int]((a:Int) => a < 100)
val result = Source.fromIterator(() => Iterator from 5)
.via(flowGraph)
.to(Sink.foreach(println))
result.run()
Thread.sleep(300)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:53,代码来源:FilterStage_.scala
示例17: SinkShape_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source, Tcp}
import GraphDSL.Implicits._
import akka.NotUsed
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import akka_in_action.streams.SourceShape_.NumbersSource
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object SinkShape_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
class StdoutSink extends GraphStage[SinkShape[Int]] {
val in: Inlet[Int] = Inlet("StdoutSink")
override val shape: SinkShape[Int] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
@throws[Exception](classOf[Exception])
override def onPush(): Unit = {
println(s"grab: ${grab(in)}")
pull(in)
}
})
}
}
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val sinkGraph: Graph[SinkShape[Int], NotUsed] = new StdoutSink
val mySink: Sink[Int, NotUsed] = Sink.fromGraph(sinkGraph)
val result = mySource.take(66).to(mySink)
result.run()
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:49,代码来源:SinkShape_.scala
示例18: SourceShape_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source, Tcp}
import GraphDSL.Implicits._
import akka.NotUsed
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
object SourceShape_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter = 1
setHandler(out, new OutHandler {
@throws[Exception](classOf[Exception])
override def onPull(): Unit = {
push(out, counter)
println(s"push $counter")
counter += 1
}
})
}
}
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val result1: Future[Int] = mySource.take(10).runReduce(_ + _)
val result2: Future[Int] = mySource.take(100).runReduce(_ + _)
val combined = for {
r1 <- result1
r2 <- result2
} yield(r1, r2)
val res = Await.result(combined, 300 millis)
println(res)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:55,代码来源:SourceShape_.scala
示例19: Digester
//设置package包名称以及导入依赖的类
package streams
import java.security.MessageDigest
import java.util.Base64
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import com.cryptoutility.protocol.crypto.Hex
class Digester(algorithm: String) extends GraphStage[FlowShape[ByteString, String]] {
val in = Inlet[ByteString]("Digester.in")
val out = Outlet[String]("Digester.out")
val digester = MessageDigest.getInstance(algorithm)
def shape: FlowShape[ByteString, String] = FlowShape.of(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler{
def onPush(): Unit = {
val chunk = grab(in)
digester.update(chunk.toArray)
pull(in)
}
@throws[Exception](classOf[Exception])
override def onUpstreamFinish(): Unit = {
val digest = {
val d = digester.digest()
Hex(d)
}
emit(out, digest)
completeStage()
}
})
}
}
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:47,代码来源:Digester.scala
示例20: Encryptor
//设置package包名称以及导入依赖的类
package streams
import java.security.Key
import java.security.spec.AlgorithmParameterSpec
import javax.crypto.Cipher
import akka.stream.{Outlet, Inlet, Attributes, FlowShape}
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage}
import akka.util.ByteString
class Encryptor(cipher: Cipher)
extends GraphStage[FlowShape[ByteString, ByteString]]{
val in = Inlet[ByteString]("Encryptor.in")
val out = Outlet[ByteString]("Encryptor.out")
def shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val chunk = grab(in).toArray
val encrypted = cipher.update(chunk)
emit(out, ByteString(encrypted))
}
override def onUpstreamFinish(): Unit = {
val last = cipher.doFinal()
emit(out, ByteString(last))
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:41,代码来源:Encryptor.scala
注:本文中的akka.stream.stage.GraphStageLogic类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论