本文整理汇总了Scala中akka.stream.SourceShape类的典型用法代码示例。如果您正苦于以下问题:Scala SourceShape类的具体用法?Scala SourceShape怎么用?Scala SourceShape使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SourceShape类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GridfsSource
//设置package包名称以及导入依赖的类
import java.nio.ByteBuffer
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import org.mongodb.scala.gridfs.GridFSDownloadStream
import scala.concurrent.{ExecutionContext, Future}
case class GridfsSource(stream: GridFSDownloadStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Future[ByteBuffer]]] {
val out: Outlet[Future[ByteBuffer]] = Outlet("File Stream")
override def shape: SourceShape[Future[ByteBuffer]] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val buffer = ByteBuffer.allocate(chunkSize)
val loaded = stream.read(buffer).toFuture().map(_ => buffer)
push(out, loaded)
}
override def onDownstreamFinish(): Unit = {
super.onDownstreamFinish()
stream.close()
complete(out)
}
})
}
}
object GridfsSource {
def apply(stream: GridFSDownloadStream)(implicit ec: ExecutionContext): Source[ByteString, NotUsed] = {
Source.fromGraph(GridfsSource(stream, 512 * 1024)).mapAsync(1)(fb => fb.map { buffer => buffer.flip(); ByteString(buffer) }).takeWhile(_.nonEmpty)
}
}
开发者ID:zhijun,项目名称:GridFSSource,代码行数:42,代码来源:GridFSSource.scala
示例2: GeodeQueryGraphLogic
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage
import akka.stream.SourceShape
import akka.stream.stage.StageLogging
import org.apache.geode.cache.client.ClientCache
import org.apache.geode.cache.query.SelectResults
import scala.util.Try
abstract class GeodeQueryGraphLogic[V](val shape: SourceShape[V], val clientCache: ClientCache, val query: String)
extends GeodeSourceStageLogic[V](shape, clientCache)
with StageLogging {
override def executeQuery() = Try {
qs.newQuery(query)
.execute()
.asInstanceOf[SelectResults[V]]
.iterator()
}
}
开发者ID:akka,项目名称:alpakka,代码行数:22,代码来源:GeodeQueryGraphLogic.scala
示例3: GeodeContinuousSourceStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage
import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache
import scala.concurrent.{Future, Promise}
class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String)
extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {
override protected def initialAttributes: Attributes =
Attributes
.name("GeodeContinuousSource")
.and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
val out = Outlet[V](s"geode.continuousSource")
override def shape: SourceShape[V] = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val subPromise = Promise[Done]
(new GeodeCQueryGraphLogic[V](shape, cache, name, sql) {
override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
subPromise.success(Done)
}
val onElement: AsyncCallback[V] = getAsyncCallback[V] { element =>
if (isAvailable(out)) {
pushElement(out, element)
} else
enqueue(element)
handleTerminaison()
}
//
// This handler, will first forward initial (old) result, then new ones (continuous).
//
setHandler(
out,
new OutHandler {
override def onPull() = {
if (initialResultsIterator.hasNext)
push(out, initialResultsIterator.next())
else
dequeue() foreach { e =>
pushElement(out, e)
}
handleTerminaison()
}
}
)
}, subPromise.future)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:60,代码来源:GeodeContinuousSourceStage.scala
示例4: GeodeFiniteSourceStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage
import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache
import scala.concurrent.{Future, Promise}
class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String)
extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {
override protected def initialAttributes: Attributes =
Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
val out = Outlet[V]("geode.finiteSource")
override def shape: SourceShape[V] = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val subPromise = Promise[Done]
(new GeodeQueryGraphLogic[V](shape, cache, sql) {
override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
subPromise.success(Done)
}
setHandler(
out,
new OutHandler {
override def onPull() =
if (initialResultsIterator.hasNext)
push(out, initialResultsIterator.next())
else
completeStage()
}
)
}, subPromise.future)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:43,代码来源:GeodeFiniteSourceStage.scala
示例5: IdentityValidation
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation
import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{ActorMaterializer, Graph, SourceShape}
import org.hpi.esb.datavalidator.configuration.Config
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.hpi.esb.datavalidator.kafka.TopicHandler
import org.hpi.esb.datavalidator.validation.graphstage.ZipWhileEitherAvailable
class IdentityValidation(inTopicHandler: TopicHandler,
outTopicHandler: TopicHandler,
materializer: ActorMaterializer)
extends Validation[SimpleRecord](inTopicHandler, outTopicHandler, materializer) {
override val valueName = "SimpleRecords"
override val queryName = "Identity Query"
def createSource(): Graph[SourceShape[(Option[SimpleRecord], Option[SimpleRecord])], NotUsed] = {
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWhileEitherAvailable[SimpleRecord]())
inTopicHandler.topicSource ~> take(inNumberOfMessages) ~> toSimpleRecords ~> zip.in0
outTopicHandler.topicSource ~> take(outNumberOfMessages) ~> toSimpleRecords ~> zip.in1
SourceShape(zip.out)
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:33,代码来源:IdentityValidation.scala
示例6: PairsShape_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{GraphDSL, Sink, Source, Zip}
import GraphDSL.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object PairsShape_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val pairs = Source.fromGraph(GraphDSL.create() { implicit builder =>
val zip = builder.add(Zip[Int, Int]())
def ints = Source.fromIterator(() => Iterator from(1))
ints.filter(_ % 2 != 0) ~> zip.in0
ints.filter(_ % 2 == 0) ~> zip.in1
SourceShape(zip.out)
})
val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)
val r = Await.result(firstPair, 300 millis)
println(r)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:32,代码来源:PairsShape_.scala
示例7: MaterializedValue_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, SourceShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import GraphDSL.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object MaterializedValue_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(
GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
val r = Source(1 to 10)
.via(foldFlow)
.runWith(Sink.head)
println(Await.result(r, 200 millis))
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(
GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
Source(1 to 10) ~> fold
SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
})
val r2 = cyclicFold
.via(foldFlow)
.runWith(Sink.head)
println(Await.result(r2, 200 millis))
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:37,代码来源:MaterializedValue_.scala
示例8: DebugGen
//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream
import akka.stream.{Attributes, SourceShape}
import de.sciss.fscape.stream.impl.{GenChunkImpl, GenIn0DImpl, StageImpl, NodeImpl}
import scala.util.Random
object DebugGen {
def apply()(implicit b: Builder): OutD = {
val stage0 = new Stage
val stage = b.add(stage0)
stage.out
}
private final val name = "DebugGen"
private type Shape = SourceShape[BufD]
private final class Stage(implicit ctrl: Control) extends StageImpl[Shape](name) {
val shape = new SourceShape(
out = OutD(s"$name.out")
)
def createLogic(attr: Attributes) = new Logic(shape)
}
private final class Logic(shape: Shape)(implicit ctrl: Control)
extends NodeImpl(name, shape)
with GenChunkImpl[BufD, BufD, Shape]
with GenIn0DImpl {
private[this] val rnd: Random = ctrl.mkRandom()
protected def processChunk(inOff: Int, outOff: Int, chunk: Int): Unit = {
val buf = bufOut0.buf
var off = outOff
val stop = off + chunk
while (off < stop) {
buf(off) = rnd.nextDouble() * 2 - 1
off += 1
}
}
}
}
开发者ID:Sciss,项目名称:FScape-next,代码行数:47,代码来源:DebugGen.scala
示例9: WhiteNoise
//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream
import akka.stream.{Attributes, SourceShape}
import de.sciss.fscape.stream.impl.{GenChunkImpl, GenIn0DImpl, StageImpl, NodeImpl}
import scala.util.Random
object WhiteNoise {
def apply()(implicit b: Builder): OutD = {
val stage0 = new Stage
val stage = b.add(stage0)
stage.out
}
private final val name = "WhiteNoise"
private type Shape = SourceShape[BufD]
private final class Stage(implicit ctrl: Control) extends StageImpl[Shape](name) {
val shape = new SourceShape(
out = OutD(s"$name.out")
)
def createLogic(attr: Attributes) = new Logic(shape)
}
private final class Logic(shape: Shape)(implicit ctrl: Control)
extends NodeImpl(name, shape)
with GenChunkImpl[BufD, BufD, Shape]
with GenIn0DImpl {
private[this] val rnd: Random = ctrl.mkRandom()
protected def processChunk(inOff: Int, outOff: Int, chunk: Int): Unit = {
val buf = bufOut0.buf
var off = outOff
val stop = off + chunk
while (off < stop) {
buf(off) = rnd.nextDouble() * 2 - 1
off += 1
}
}
}
}
开发者ID:Sciss,项目名称:FScape-next,代码行数:47,代码来源:WhiteNoise.scala
示例10: HelloAkkaStreamsSource
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage._
class HelloAkkaStreamsSource extends GraphStage[SourceShape[String]] {
val out: Outlet[String] = Outlet("SystemInputSource")
override val shape: SourceShape[String] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull() = {
val line = "Hello World Akka Streams!"
push(out, line)
}
})
}
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:22,代码来源:HelloAkkaStreamsSource.scala
示例11: ZMQSubSocket
//设置package包名称以及导入依赖的类
package com.mintbeans.rzmq
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.util.ByteString
import com.mintbeans.rzmq.ZMQErrors._
import com.mintbeans.rzmq.ZMQMessages._
import com.typesafe.scalalogging.LazyLogging
import org.zeromq.{ ZContext, ZMQ, ZMsg }
import scala.collection.JavaConverters._
private[rzmq] class ZMQSubSocket(endpoint: String, topic: String) extends GraphStage[SourceShape[ZMQMessage]] with LazyLogging {
val out: Outlet[ZMQMessage] = Outlet("ZMQSubSocket")
override val shape: SourceShape[ZMQMessage] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
logger.info("Initializing ZMQ context.")
val context = new ZContext(1)
logger.info(s"Connecting SUB socket to ${endpoint}")
val socket = {
val s = context.createSocket(ZMQ.SUB)
s.subscribe(topic.getBytes(ZMQ.CHARSET))
s.connect(endpoint)
s
}
setHandler(out, outHandler())
override def postStop() = {
logger.info("Closing socket.")
context.destroySocket(socket)
logger.info("Closing ZMQ context.")
context.close()
super.postStop()
}
def outHandler() = new OutHandler {
override def onPull(): Unit = {
logger.debug("Reading message...")
Option(ZMsg.recvMsg(socket)).map { zMsg =>
zMsg.asScala.toList.map(zFrame => ByteString(zFrame.getData))
}.map {
case topic :: payload :: Nil => {
logger.debug(s"Received topic (${topic}) message: ${payload}")
push(out, ZMQMessage(payload))
}
case _ => fail(out, new MessageFormatException("Invalid message format"))
}
}
override def onDownstreamFinish(): Unit = {
logger.info("Downstream finish.")
super.onDownstreamFinish()
}
}
}
}
开发者ID:sbilinski,项目名称:reactive-zeromq,代码行数:62,代码来源:ZMQSubSocket.scala
示例12: ZMQPullSocket
//设置package包名称以及导入依赖的类
package com.mintbeans.rzmq
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.util.ByteString
import com.mintbeans.rzmq.ZMQErrors._
import com.mintbeans.rzmq.ZMQMessages._
import com.typesafe.scalalogging.LazyLogging
import org.zeromq.{ ZContext, ZMQ, ZMsg }
import scala.collection.JavaConverters._
private[rzmq] class ZMQPullSocket(endpoint: String) extends GraphStage[SourceShape[ZMQMessage]] with LazyLogging {
val out: Outlet[ZMQMessage] = Outlet("ZMQPullSocket")
override val shape: SourceShape[ZMQMessage] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
logger.info("Initializing ZMQ context.")
val context = new ZContext(1)
logger.info(s"Binding PULL socket to ${endpoint}")
val socket = {
val s = context.createSocket(ZMQ.PULL)
s.bind(endpoint)
s
}
setHandler(out, outHandler())
override def postStop() = {
logger.info("Closing socket.")
context.destroySocket(socket)
logger.info("Closing ZMQ context.")
context.close()
super.postStop()
}
def outHandler() = new OutHandler {
override def onPull(): Unit = {
logger.debug("Reading message...")
Option(ZMsg.recvMsg(socket)).map { zMsg =>
zMsg.asScala.toList.map(zFrame => ByteString(zFrame.getData))
}.map {
case payload :: Nil => {
logger.debug(s"Received message: ${payload}")
push(out, ZMQMessage(payload))
}
case _ => fail(out, new MessageFormatException("Invalid message format"))
}
}
override def onDownstreamFinish(): Unit = {
logger.info("Downstream finish.")
super.onDownstreamFinish()
}
}
}
}
开发者ID:sbilinski,项目名称:reactive-zeromq,代码行数:61,代码来源:ZMQPullSocket.scala
示例13: StreamingSpec
//设置package包名称以及导入依赖的类
package com.beachape.sparkka
import akka.actor.ActorSystem
import akka.stream.{ SourceShape, ActorMaterializer }
import akka.stream.scaladsl._
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ Matchers, FunSpec }
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
class StreamingSpec extends FunSpec with ScalaFutures with Matchers with Eventually {
override implicit val patienceConfig: PatienceConfig = PatienceConfig(Span(30, Seconds), Span(150, Millis))
describe("streamConnection") {
it("should allow both the original flow and the connected InputDStream to receive all expected values") {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ssc = LocalContext.ssc
// InputDStream can then be used to build elements of the graph that require integration with Spark
val (inputDStream, feedDInput) = Streaming.connection[Int]()
val source = Source.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val bCast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val add1 = Flow[Int].map(_ + 1)
val times3 = Flow[Int].map(_ * 3)
source ~> bCast ~> add1 ~> merge
bCast ~> times3 ~> feedDInput ~> merge
SourceShape(merge.out)
})
val reducedFlow = source.runWith(Sink.fold(0)(_ + _))
whenReady(reducedFlow)(_ shouldBe 230)
val sharedVar = ssc.sparkContext.accumulator(0)
inputDStream.foreachRDD { rdd =>
rdd.foreach { i =>
sharedVar += i
}
}
ssc.start()
eventually(sharedVar.value shouldBe 165)
}
}
}
开发者ID:lloydmeta,项目名称:sparkka-streams,代码行数:57,代码来源:StreamingSpec.scala
示例14: SessionStream
//设置package包名称以及导入依赖的类
package com.auginte.eventsourced
import akka.actor.ActorRef
import akka.stream.SourceShape
import akka.stream.scaladsl.{GraphDSL, Merge, Source}
case class SessionStream(storage: Storage, project: Project, realTimeMessages: ActorRef, uuid: UUID = SessionStream.newUuid) {
lazy val stream = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2)) // Blocking operation
storage.readAll(project) ~> merge
RealTimeMessages.source(realTimeMessages) ~> merge
SourceShape(merge.out)
}
}
object SessionStream {
def newUuid: UUID = java.util.UUID.randomUUID.toString
}
开发者ID:Auginte,项目名称:event-sourced-notepad,代码行数:23,代码来源:SessionStream.scala
注:本文中的akka.stream.SourceShape类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论