• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala SourceShape类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.SourceShape的典型用法代码示例。如果您正苦于以下问题:Scala SourceShape类的具体用法?Scala SourceShape怎么用?Scala SourceShape使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SourceShape类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: GridfsSource

//设置package包名称以及导入依赖的类
import java.nio.ByteBuffer

import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import org.mongodb.scala.gridfs.GridFSDownloadStream

import scala.concurrent.{ExecutionContext, Future}




case class GridfsSource(stream: GridFSDownloadStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Future[ByteBuffer]]] {
  val out: Outlet[Future[ByteBuffer]] = Outlet("File Stream")
  
  override def shape: SourceShape[Future[ByteBuffer]] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        val buffer = ByteBuffer.allocate(chunkSize)
        val loaded = stream.read(buffer).toFuture().map(_ => buffer)
        push(out, loaded)
      }

      override def onDownstreamFinish(): Unit = {
        super.onDownstreamFinish()
        stream.close()
        complete(out)
      }
    })
  }
}

object GridfsSource {
  def apply(stream: GridFSDownloadStream)(implicit ec: ExecutionContext): Source[ByteString, NotUsed] = {
    Source.fromGraph(GridfsSource(stream, 512 * 1024)).mapAsync(1)(fb => fb.map { buffer => buffer.flip(); ByteString(buffer) }).takeWhile(_.nonEmpty)
  }
} 
开发者ID:zhijun,项目名称:GridFSSource,代码行数:42,代码来源:GridFSSource.scala


示例2: GeodeQueryGraphLogic

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.stream.SourceShape
import akka.stream.stage.StageLogging
import org.apache.geode.cache.client.ClientCache
import org.apache.geode.cache.query.SelectResults

import scala.util.Try

abstract class GeodeQueryGraphLogic[V](val shape: SourceShape[V], val clientCache: ClientCache, val query: String)
    extends GeodeSourceStageLogic[V](shape, clientCache)
    with StageLogging {

  override def executeQuery() = Try {
    qs.newQuery(query)
      .execute()
      .asInstanceOf[SelectResults[V]]
      .iterator()
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:22,代码来源:GeodeQueryGraphLogic.scala


示例3: GeodeContinuousSourceStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache

import scala.concurrent.{Future, Promise}

class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String)
    extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {

  override protected def initialAttributes: Attributes =
    Attributes
      .name("GeodeContinuousSource")
      .and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

  val out = Outlet[V](s"geode.continuousSource")

  override def shape: SourceShape[V] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val subPromise = Promise[Done]

    (new GeodeCQueryGraphLogic[V](shape, cache, name, sql) {

      override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
        subPromise.success(Done)
      }

      val onElement: AsyncCallback[V] = getAsyncCallback[V] { element =>
        if (isAvailable(out)) {
          pushElement(out, element)
        } else
          enqueue(element)
        handleTerminaison()
      }

      //
      // This handler, will first forward initial (old) result, then new ones (continuous).
      //
      setHandler(
        out,
        new OutHandler {
          override def onPull() = {
            if (initialResultsIterator.hasNext)
              push(out, initialResultsIterator.next())
            else
              dequeue() foreach { e =>
                pushElement(out, e)
              }
            handleTerminaison()
          }
        }
      )

    }, subPromise.future)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:60,代码来源:GeodeContinuousSourceStage.scala


示例4: GeodeFiniteSourceStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache

import scala.concurrent.{Future, Promise}

class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String)
    extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {

  override protected def initialAttributes: Attributes =
    Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

  val out = Outlet[V]("geode.finiteSource")

  override def shape: SourceShape[V] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val subPromise = Promise[Done]

    (new GeodeQueryGraphLogic[V](shape, cache, sql) {

      override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
        subPromise.success(Done)
      }

      setHandler(
        out,
        new OutHandler {
          override def onPull() =
            if (initialResultsIterator.hasNext)
              push(out, initialResultsIterator.next())
            else
              completeStage()
        }
      )

    }, subPromise.future)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:43,代码来源:GeodeFiniteSourceStage.scala


示例5: IdentityValidation

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation

import akka.NotUsed
import akka.stream.scaladsl.GraphDSL
import akka.stream.{ActorMaterializer, Graph, SourceShape}
import org.hpi.esb.datavalidator.configuration.Config
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.hpi.esb.datavalidator.kafka.TopicHandler
import org.hpi.esb.datavalidator.validation.graphstage.ZipWhileEitherAvailable

class IdentityValidation(inTopicHandler: TopicHandler,
                         outTopicHandler: TopicHandler,
                         materializer: ActorMaterializer)
  extends Validation[SimpleRecord](inTopicHandler, outTopicHandler, materializer) {

  override val valueName = "SimpleRecords"
  override val queryName = "Identity Query"

  def createSource(): Graph[SourceShape[(Option[SimpleRecord], Option[SimpleRecord])], NotUsed] = {

    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val zip = builder.add(ZipWhileEitherAvailable[SimpleRecord]())

      inTopicHandler.topicSource ~> take(inNumberOfMessages) ~> toSimpleRecords ~> zip.in0
      outTopicHandler.topicSource ~> take(outNumberOfMessages) ~> toSimpleRecords ~> zip.in1

      SourceShape(zip.out)
    }
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:33,代码来源:IdentityValidation.scala


示例6: PairsShape_

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{GraphDSL, Sink, Source, Zip}
import GraphDSL.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object PairsShape_ extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val pairs = Source.fromGraph(GraphDSL.create() { implicit builder =>
    val zip = builder.add(Zip[Int, Int]())

    def ints = Source.fromIterator(() => Iterator from(1))

    ints.filter(_ % 2 != 0) ~> zip.in0
    ints.filter(_ % 2 == 0) ~> zip.in1

    SourceShape(zip.out)
  })

  val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)
  val r = Await.result(firstPair, 300 millis)
  println(r)

  system.terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:32,代码来源:PairsShape_.scala


示例7: MaterializedValue_

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape, SourceShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import GraphDSL.Implicits._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object MaterializedValue_ extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(
    GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
      FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
    })
  val r = Source(1 to 10)
    .via(foldFlow)
    .runWith(Sink.head)
  println(Await.result(r, 200 millis))

  val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(
    GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
      Source(1 to 10) ~> fold
      SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
    })
  val r2 = cyclicFold
    .via(foldFlow)
    .runWith(Sink.head)
  println(Await.result(r2, 200 millis))

  system.terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:37,代码来源:MaterializedValue_.scala


示例8: DebugGen

//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream

import akka.stream.{Attributes, SourceShape}
import de.sciss.fscape.stream.impl.{GenChunkImpl, GenIn0DImpl, StageImpl, NodeImpl}

import scala.util.Random

object DebugGen {
  def apply()(implicit b: Builder): OutD = {
    val stage0  = new Stage
    val stage   = b.add(stage0)
    stage.out
  }

  private final val name = "DebugGen"

  private type Shape = SourceShape[BufD]

  private final class Stage(implicit ctrl: Control) extends StageImpl[Shape](name) {

    val shape = new SourceShape(
      out = OutD(s"$name.out")
    )

    def createLogic(attr: Attributes) = new Logic(shape)
  }

  private final class Logic(shape: Shape)(implicit ctrl: Control)
    extends NodeImpl(name, shape)
      with GenChunkImpl[BufD, BufD, Shape]
      with GenIn0DImpl {

    private[this] val rnd: Random = ctrl.mkRandom()

    protected def processChunk(inOff: Int, outOff: Int, chunk: Int): Unit = {
      val buf   = bufOut0.buf
      var off   = outOff
      val stop  = off + chunk
      while (off < stop) {
        buf(off) = rnd.nextDouble() * 2 - 1
        off += 1
      }
    }
  }
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:47,代码来源:DebugGen.scala


示例9: WhiteNoise

//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream

import akka.stream.{Attributes, SourceShape}
import de.sciss.fscape.stream.impl.{GenChunkImpl, GenIn0DImpl, StageImpl, NodeImpl}

import scala.util.Random

object WhiteNoise {
  def apply()(implicit b: Builder): OutD = {
    val stage0  = new Stage
    val stage   = b.add(stage0)
    stage.out
  }

  private final val name = "WhiteNoise"

  private type Shape = SourceShape[BufD]

  private final class Stage(implicit ctrl: Control) extends StageImpl[Shape](name) {

    val shape = new SourceShape(
      out = OutD(s"$name.out")
    )

    def createLogic(attr: Attributes) = new Logic(shape)
  }

  private final class Logic(shape: Shape)(implicit ctrl: Control)
    extends NodeImpl(name, shape)
      with GenChunkImpl[BufD, BufD, Shape]
      with GenIn0DImpl {

    private[this] val rnd: Random = ctrl.mkRandom()

    protected def processChunk(inOff: Int, outOff: Int, chunk: Int): Unit = {
      val buf   = bufOut0.buf
      var off   = outOff
      val stop  = off + chunk
      while (off < stop) {
        buf(off) = rnd.nextDouble() * 2 - 1
        off += 1
      }
    }
  }
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:47,代码来源:WhiteNoise.scala


示例10: HelloAkkaStreamsSource

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage._

class HelloAkkaStreamsSource extends GraphStage[SourceShape[String]] {

  val out: Outlet[String] = Outlet("SystemInputSource")
  override val shape: SourceShape[String] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      setHandler(out, new OutHandler {
        override def onPull() = {
          val line = "Hello World Akka Streams!"
          push(out, line)
        }
      })
    }
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:22,代码来源:HelloAkkaStreamsSource.scala


示例11: ZMQSubSocket

//设置package包名称以及导入依赖的类
package com.mintbeans.rzmq

import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.util.ByteString
import com.mintbeans.rzmq.ZMQErrors._
import com.mintbeans.rzmq.ZMQMessages._
import com.typesafe.scalalogging.LazyLogging
import org.zeromq.{ ZContext, ZMQ, ZMsg }

import scala.collection.JavaConverters._

private[rzmq] class ZMQSubSocket(endpoint: String, topic: String) extends GraphStage[SourceShape[ZMQMessage]] with LazyLogging {

  val out: Outlet[ZMQMessage] = Outlet("ZMQSubSocket")
  override val shape: SourceShape[ZMQMessage] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    logger.info("Initializing ZMQ context.")
    val context = new ZContext(1)
    logger.info(s"Connecting SUB socket to ${endpoint}")
    val socket = {
      val s = context.createSocket(ZMQ.SUB)
      s.subscribe(topic.getBytes(ZMQ.CHARSET))
      s.connect(endpoint)
      s
    }

    setHandler(out, outHandler())

    override def postStop() = {
      logger.info("Closing socket.")
      context.destroySocket(socket)
      logger.info("Closing ZMQ context.")
      context.close()

      super.postStop()
    }

    def outHandler() = new OutHandler {
      override def onPull(): Unit = {
        logger.debug("Reading message...")
        Option(ZMsg.recvMsg(socket)).map { zMsg =>
          zMsg.asScala.toList.map(zFrame => ByteString(zFrame.getData))
        }.map {
          case topic :: payload :: Nil => {
            logger.debug(s"Received topic (${topic}) message: ${payload}")
            push(out, ZMQMessage(payload))
          }
          case _ => fail(out, new MessageFormatException("Invalid message format"))
        }
      }

      override def onDownstreamFinish(): Unit = {
        logger.info("Downstream finish.")
        super.onDownstreamFinish()
      }
    }

  }
} 
开发者ID:sbilinski,项目名称:reactive-zeromq,代码行数:62,代码来源:ZMQSubSocket.scala


示例12: ZMQPullSocket

//设置package包名称以及导入依赖的类
package com.mintbeans.rzmq

import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream.{ Attributes, Outlet, SourceShape }
import akka.util.ByteString
import com.mintbeans.rzmq.ZMQErrors._
import com.mintbeans.rzmq.ZMQMessages._
import com.typesafe.scalalogging.LazyLogging
import org.zeromq.{ ZContext, ZMQ, ZMsg }

import scala.collection.JavaConverters._

private[rzmq] class ZMQPullSocket(endpoint: String) extends GraphStage[SourceShape[ZMQMessage]] with LazyLogging {

  val out: Outlet[ZMQMessage] = Outlet("ZMQPullSocket")
  override val shape: SourceShape[ZMQMessage] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    logger.info("Initializing ZMQ context.")
    val context = new ZContext(1)
    logger.info(s"Binding PULL socket to ${endpoint}")
    val socket = {
      val s = context.createSocket(ZMQ.PULL)
      s.bind(endpoint)
      s
    }

    setHandler(out, outHandler())

    override def postStop() = {
      logger.info("Closing socket.")
      context.destroySocket(socket)
      logger.info("Closing ZMQ context.")
      context.close()

      super.postStop()
    }

    def outHandler() = new OutHandler {
      override def onPull(): Unit = {
        logger.debug("Reading message...")
        Option(ZMsg.recvMsg(socket)).map { zMsg =>
          zMsg.asScala.toList.map(zFrame => ByteString(zFrame.getData))
        }.map {
          case payload :: Nil => {
            logger.debug(s"Received message: ${payload}")
            push(out, ZMQMessage(payload))
          }
          case _ => fail(out, new MessageFormatException("Invalid message format"))
        }
      }

      override def onDownstreamFinish(): Unit = {
        logger.info("Downstream finish.")
        super.onDownstreamFinish()
      }
    }

  }
} 
开发者ID:sbilinski,项目名称:reactive-zeromq,代码行数:61,代码来源:ZMQPullSocket.scala


示例13: StreamingSpec

//设置package包名称以及导入依赖的类
package com.beachape.sparkka

import akka.actor.ActorSystem
import akka.stream.{ SourceShape, ActorMaterializer }
import akka.stream.scaladsl._
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.{ Matchers, FunSpec }
import org.scalatest.concurrent.{ Eventually, ScalaFutures }


class StreamingSpec extends FunSpec with ScalaFutures with Matchers with Eventually {

  override implicit val patienceConfig: PatienceConfig = PatienceConfig(Span(30, Seconds), Span(150, Millis))

  describe("streamConnection") {

    it("should allow both the original flow and the connected InputDStream to receive all expected values") {
      implicit val actorSystem = ActorSystem()
      implicit val materializer = ActorMaterializer()
      implicit val ssc = LocalContext.ssc

      // InputDStream can then be used to build elements of the graph that require integration with Spark
      val (inputDStream, feedDInput) = Streaming.connection[Int]()
      val source = Source.fromGraph(GraphDSL.create() { implicit builder =>

        import GraphDSL.Implicits._

        val source = Source(1 to 10)

        val bCast = builder.add(Broadcast[Int](2))
        val merge = builder.add(Merge[Int](2))

        val add1 = Flow[Int].map(_ + 1)
        val times3 = Flow[Int].map(_ * 3)
        source ~> bCast ~> add1 ~> merge
        bCast ~> times3 ~> feedDInput ~> merge

        SourceShape(merge.out)
      })

      val reducedFlow = source.runWith(Sink.fold(0)(_ + _))
      whenReady(reducedFlow)(_ shouldBe 230)

      val sharedVar = ssc.sparkContext.accumulator(0)
      inputDStream.foreachRDD { rdd =>
        rdd.foreach { i =>
          sharedVar += i
        }
      }
      ssc.start()
      eventually(sharedVar.value shouldBe 165)
    }

  }

} 
开发者ID:lloydmeta,项目名称:sparkka-streams,代码行数:57,代码来源:StreamingSpec.scala


示例14: SessionStream

//设置package包名称以及导入依赖的类
package com.auginte.eventsourced

import akka.actor.ActorRef
import akka.stream.SourceShape
import akka.stream.scaladsl.{GraphDSL, Merge, Source}


case class SessionStream(storage: Storage, project: Project, realTimeMessages: ActorRef, uuid: UUID = SessionStream.newUuid) {
  lazy val stream = GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val merge = b.add(Merge[String](2)) // Blocking operation

    storage.readAll(project) ~> merge
    RealTimeMessages.source(realTimeMessages) ~> merge

    SourceShape(merge.out)
  }
}

object SessionStream {
  def newUuid: UUID = java.util.UUID.randomUUID.toString
} 
开发者ID:Auginte,项目名称:event-sourced-notepad,代码行数:23,代码来源:SessionStream.scala



注:本文中的akka.stream.SourceShape类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala AllowedHostsFilter类代码示例发布时间:2022-05-23
下一篇:
Scala RequestBuilding类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap