• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Source类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.scaladsl.Source的典型用法代码示例。如果您正苦于以下问题:Scala Source类的具体用法?Scala Source怎么用?Scala Source使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Source类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: PartitionValidatedSpec

//设置package包名称以及导入依赖的类
package eu.svez.akka.stream.cats

import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import cats.implicits._
import eu.svez.akka.stream.cats.Stages._

class PartitionValidatedSpec extends StageSpec {

  "PartitionValidated" should "partition a flow of Validation[E, A] in two flows of E and A" in new Test {
    val src = Source(List(
      1.valid[String],
      2.valid[String],
      "BOOM!".invalid[Int],
      3.valid[String],
      "BOOM 2!".invalid[Int]
    ))

    src.runWith(testSink)

    successProbe.request(3)
    failureProbe.request(2)

    successProbe.expectNext(1)
    successProbe.expectNext(2)
    successProbe.expectNext(3)
    failureProbe.expectNext("BOOM!")
    failureProbe.expectNext("BOOM 2!")
    successProbe.expectComplete()
    failureProbe.expectComplete()
  }

  trait Test {
    val failureProbe = TestSubscriber.probe[String]()
    val successProbe = TestSubscriber.probe[Int]()

    val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val valStage = builder.add(PartitionValidated[String, Int]())

      valStage.invalid ~> Sink.fromSubscriber(failureProbe)
      valStage.valid ~> Sink.fromSubscriber(successProbe)

      SinkShape(valStage.in)
    })
  }
} 
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:51,代码来源:PartitionValidatedSpec.scala


示例3: MovieListPipeline

//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines

import java.nio.file.Paths

import akka.NotUsed
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import com.stacktrace.yo.scrapeline.core.ScrapeClient.jsoup
import com.stacktrace.yo.scrapeline.core._
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.model.Document
import net.ruippeixotog.scalascraper.scraper.ContentExtractors.elementList

import scala.concurrent.Future

class MovieListPipeline(implicit val m: ActorMaterializer) {

  def getPipelineSource: Source[jsoup.DocumentType, NotUsed] = Source.single(ScrapeClient.scrape("http://www.the-numbers.com/movie/budgets/all"))

  def getParseFlow: Flow[Document, MovieNameAndDetailUrl, NotUsed] = {
    Flow[Document]
      .mapConcat(doc => {
        val table = doc >> elementList("table tr")
        val movieLinkTuples = table.flatMap(tr => {
          val name = tr >> elementList("tr b a")
          name.map(
            link => {
              MovieNameAndDetailUrl(link.text, "http://www.the-numbers.com/" + link.attr("href"))
            }
          )
        })
        movieLinkTuples
      })
  }

  def getPipeOut: Sink[MovieNameAndDetailUrl, Future[IOResult]] = Flow[MovieNameAndDetailUrl]
    .map(s => ByteString(s.name + "\n"))
    .toMat(FileIO.toPath(Paths.get("movie.txt")))(Keep.right)

  def buildAndRun: Future[IOResult] = {
    getPipelineSource
      .via(getParseFlow)
      .runWith(getPipeOut)
  }

} 
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:49,代码来源:MovieListPipeline.scala


示例4: GridfsSource

//设置package包名称以及导入依赖的类
import java.nio.ByteBuffer

import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import org.mongodb.scala.gridfs.GridFSDownloadStream

import scala.concurrent.{ExecutionContext, Future}




case class GridfsSource(stream: GridFSDownloadStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Future[ByteBuffer]]] {
  val out: Outlet[Future[ByteBuffer]] = Outlet("File Stream")
  
  override def shape: SourceShape[Future[ByteBuffer]] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        val buffer = ByteBuffer.allocate(chunkSize)
        val loaded = stream.read(buffer).toFuture().map(_ => buffer)
        push(out, loaded)
      }

      override def onDownstreamFinish(): Unit = {
        super.onDownstreamFinish()
        stream.close()
        complete(out)
      }
    })
  }
}

object GridfsSource {
  def apply(stream: GridFSDownloadStream)(implicit ec: ExecutionContext): Source[ByteString, NotUsed] = {
    Source.fromGraph(GridfsSource(stream, 512 * 1024)).mapAsync(1)(fb => fb.map { buffer => buffer.flip(); ByteString(buffer) }).takeWhile(_.nonEmpty)
  }
} 
开发者ID:zhijun,项目名称:GridFSSource,代码行数:42,代码来源:GridFSSource.scala


示例5: PrintMoreNumbers

//设置package包名称以及导入依赖的类
package sample.stream_actor_simple

import akka.actor.Actor
import akka.stream.{ActorMaterializer, KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.duration._

class PrintMoreNumbers(implicit materializer: ActorMaterializer) extends Actor {
  private implicit val executionContext = context.system.dispatcher

  private val (killSwitch: UniqueKillSwitch, done) =
    Source.tick(0.seconds, 1.second, 1)
      .scan(0)(_ + _)
      .map(_.toString)
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  done.map(_ => self ! "done")

  override def receive: Receive = {
    //When the actor is stopped, it will also stop the stream
    case "stop" =>
      println("Stopping")
      killSwitch.shutdown()
    case "done" =>
      println("Done")
      context.stop(self)
      context.system.terminate()
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:33,代码来源:PrintMoreNumbers.scala


示例6: SimulateWindTurbines

//设置package包名称以及导入依赖的类
package sample.stream_actor

import akka.actor.ActorSystem
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import sample.WindTurbineSimulator

import scala.concurrent.duration._


object SimulateWindTurbines extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val endpoint = "ws://127.0.0.1:8080"
  val numberOfTurbines = 5
  Source(1 to numberOfTurbines)
    .throttle(
      elements = 100, //number of elements to be taken from bucket
      per = 1.second,
      maximumBurst = 100,  //capacity of bucket
      mode = ThrottleMode.shaping
    )
    .map { _ =>
      val id = java.util.UUID.randomUUID.toString

      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          WindTurbineSimulator.props(id, endpoint),
          childName = id,
          minBackoff = 1.second,
          maxBackoff = 30.seconds,
          randomFactor = 0.2
        ))

      system.actorOf(supervisor, name = s"$id-backoff-supervisor")
    }
    .runWith(Sink.ignore)
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:41,代码来源:SimulateWindTurbines.scala


示例7: FlowFromGraph

//设置package包名称以及导入依赖的类
package sample.graphDSL

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}


object FlowFromGraph {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("FlowFromGraph")
    implicit val ec = system.dispatcher
    implicit val materializer = ActorMaterializer()

    val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
    val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
    val listOfFlows = List(processorFlow1, processorFlow2)

    def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
      require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")

      Flow.fromGraph(GraphDSL.create() { implicit b =>
        import akka.stream.scaladsl.GraphDSL.Implicits._

        val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
        val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))

        indexFlows.foreach(broadcast ~> _ ~> merge)

        FlowShape(broadcast.in, merge.out)
      })
    }

    val compoundFlow = compoundFlowFrom(listOfFlows)

    Source(1 to 10)
      .via(compoundFlow)
      .runWith(Sink.foreach(println(_)))
      .onComplete(_ => system.terminate())
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala


示例8: ReadSideTestDriver

//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl



import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}

import scala.concurrent.{ExecutionContext, Future}


class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
  private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]

  override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
    val processor = processorFactory
    val eventTags = processor.aggregateTags
    val handler = processor.buildHandler()
    val future = for {
      _ <- handler.globalPrepare()
      offset <- handler.prepare(eventTags.head)
    } yield {
      handler -> offset
    }
    synchronized {
      val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
      processors += (eventTags.head.eventType -> (handlers :+ future))
    }
  }

  def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
    processors.get(event.aggregateTag.eventType) match {
      case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
      case Some(handlerFutures) =>
        for {
          handlers <- Future.sequence(handlerFutures)
          _ <- Future.sequence(handlers.map {
            case (handler: ReadSideHandler[Event], _) =>
              Source.single(new EventStreamElement(entityId, event, offset))
                .via(handler.handle())
                .runWith(Sink.ignore)
          })
        } yield {
          Done
        }
    }
  }
} 
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:52,代码来源:ReadSideTestDriver.scala


示例9: listFiles

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp

import akka.NotUsed
import akka.stream.alpakka.ftp.FtpCredentials.AnonFtpCredentials
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import java.net.InetAddress

trait BaseSftpSpec extends SftpSupportImpl with BaseSpec {

  //#create-settings
  val settings = SftpSettings(
    InetAddress.getByName("localhost"),
    getPort,
    AnonFtpCredentials,
    strictHostKeyChecking = false,
    knownHosts = None,
    sftpIdentity = None
  )
  //#create-settings

  protected def listFiles(basePath: String): Source[FtpFile, NotUsed] =
    Sftp.ls(basePath, settings)

  protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]] =
    Sftp.fromPath(path, settings)

  protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
    Sftp.toPath(path, settings, append)
} 
开发者ID:akka,项目名称:alpakka,代码行数:35,代码来源:BaseSftpSpec.scala


示例10: IronMqPullStageSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ironmq

import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.ExecutionContext.Implicits.global

class IronMqPullStageSpec extends UnitSpec with IronMqFixture with AkkaStreamFixture {

  "IronMqSourceStage" when {
    "there are messages" should {
      "consume all messages" in {
        val queue = givenQueue()
        val messages = (1 to 100).map(i => PushMessage(s"test-$i"))
        ironMqClient.pushMessages(queue.name, messages: _*).futureValue

        val source = Source.fromGraph(new IronMqPullStage(queue.name, IronMqSettings()))
        val receivedMessages = source.take(100).runWith(Sink.seq).map(_.map(_.message.body)).futureValue
        val expectedMessages = messages.map(_.body)

        receivedMessages should contain theSameElementsInOrderAs expectedMessages
      }
    }
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:25,代码来源:IronMqPullStageSpec.scala


示例11: MemoryBuffer

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString


private[alpakka] final class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
  val in = Inlet[ByteString]("MemoryBuffer.in")
  val out = Outlet[Chunk]("MemoryBuffer.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      var buffer = ByteString.empty
      override def onPull(): Unit = if (isClosed(in)) emit() else pull(in)

      override def onPush(): Unit = {
        val elem = grab(in)
        if (buffer.size + elem.size > maxSize) {
          failStage(new IllegalStateException("Buffer size of " + maxSize + " bytes exceeded."))
        } else {
          buffer ++= elem
          pull(in)
        }
      }

      override def onUpstreamFinish(): Unit = {
        if (isAvailable(out)) emit()
        completeStage()
      }

      def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
      setHandlers(in, out, this)
    }

} 
开发者ID:akka,项目名称:alpakka,代码行数:39,代码来源:MemoryBuffer.scala


示例12: MemoryBufferSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures

class MemoryBufferSpec(_system: ActorSystem)
    extends TestKit(_system)
    with FlatSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures {

  def this() = this(ActorSystem("MemoryBufferSpec"))

  implicit val defaultPatience =
    PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))

  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))

  "MemoryBuffer" should "emit a chunk on its output containg the concatenation of all input values" in {
    val result = Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
      .via(new MemoryBuffer(200))
      .runWith(Sink.seq)
      .futureValue

    result should have size (1)
    val chunk = result.head
    chunk.size should be(14)
    chunk.data.runWith(Sink.seq).futureValue should be(Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)))
  }

  it should "fail if more than maxSize bytes are fed into it" in {
    whenReady(
      Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
        .via(new MemoryBuffer(10))
        .runWith(Sink.seq)
        .failed
    ) { e =>
      e shouldBe a[IllegalStateException]
    }
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:49,代码来源:MemoryBufferSpec.scala


示例13: SplitAfterSizeSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl

import akka.testkit.TestKit
import akka.stream.ActorMaterializerSettings
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import org.scalatest.Matchers
import org.scalatest.FlatSpecLike
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import akka.stream.scaladsl.Sink
import org.scalatest.time.{Millis, Seconds, Span}
import scala.concurrent.duration._

class SplitAfterSizeSpec(_system: ActorSystem)
    extends TestKit(_system)
    with FlatSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures {

  def this() = this(ActorSystem("SplitAfterSizeSpec"))
  implicit val defaultPatience =
    PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))

  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))

  "SplitAfterSize" should "yield a single empty substream on no input" in {
    Source
      .empty[ByteString]
      .via(
        SplitAfterSize(10)(Flow[ByteString]).concatSubstreams
      )
      .runWith(Sink.seq)
      .futureValue should be(Seq.empty)
  }

  it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in {
    Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
      .via(
        SplitAfterSize(10)(Flow[ByteString]).prefixAndTail(10).map { case (prefix, tail) => prefix }.concatSubstreams
      )
      .runWith(Sink.seq)
      .futureValue should be(
      Seq(
        Seq(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12)),
        Seq(ByteString(13, 14))
      )
    )
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:56,代码来源:SplitAfterSizeSpec.scala


示例14: KinesisSource

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.kinesis.scaladsl

import akka.NotUsed
import akka.stream.alpakka.kinesis.KinesisSourceErrors.NoShardsError
import akka.stream.alpakka.kinesis.{KinesisSourceStage, ShardSettings}
import akka.stream.scaladsl.{Merge, Source}
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.Record

object KinesisSource {

  def basic(shardSettings: ShardSettings, amazonKinesisAsync: AmazonKinesisAsync): Source[Record, NotUsed] =
    Source.fromGraph(new KinesisSourceStage(shardSettings, amazonKinesisAsync))

  def basicMerge(shardSettings: List[ShardSettings], amazonKinesisAsync: AmazonKinesisAsync): Source[Record, NotUsed] = {
    val create: ShardSettings => Source[Record, NotUsed] = basic(_, amazonKinesisAsync)
    shardSettings match {
      case Nil => Source.failed(NoShardsError)
      case first :: Nil => create(first)
      case first :: second :: Nil => Source.combine(create(first), create(second))(Merge(_))
      case first :: second :: rest =>
        Source.combine(create(first), create(second), rest.map(create(_)): _*)(Merge(_))
    }
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:27,代码来源:KinesisSource.scala


示例15: index

//设置package包名称以及导入依赖的类
package au.csiro.data61.magda.indexer.search

import au.csiro.data61.magda.model.misc._

import scala.concurrent.{ ExecutionContext, Future }
import akka.stream.Materializer
import akka.actor.ActorSystem
import au.csiro.data61.magda.indexer.external.InterfaceConfig
import au.csiro.data61.magda.search.elasticsearch.{ ClientProvider, Indices }
import com.typesafe.config.Config
import java.time.Instant
import akka.stream.scaladsl.Source
import akka.NotUsed
import java.time.OffsetDateTime
import au.csiro.data61.magda.indexer.search.elasticsearch.ElasticSearchIndexer

trait SearchIndexer {
  def index(source: InterfaceConfig, dataSetStream: Source[DataSet, NotUsed]): Future[SearchIndexer.IndexResult]
  def snapshot(): Future[Unit]
  def ready: Future[Unit]
  def trim(source: InterfaceConfig, before: OffsetDateTime): Future[Unit]

}

object SearchIndexer {
  case class IndexResult(successes: Long, failures: Seq[String])
  
  def apply(clientProvider: ClientProvider, indices: Indices)(implicit config: Config, system: ActorSystem, ec: ExecutionContext, materializer: Materializer) =
    new ElasticSearchIndexer(clientProvider, indices)
} 
开发者ID:TerriaJS,项目名称:magda,代码行数:31,代码来源:SearchIndexer.scala


示例16: TopicHandler

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.kafka

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.hpi.esb.util.OffsetManagement

case class TopicHandler(topicName: String, numberOfMessages: Long, topicSource: Source[ConsumerRecord[String, String], Consumer.Control])

object TopicHandler {

  def create(topicName: String, system: ActorSystem): TopicHandler = {

    val uuid = java.util.UUID.randomUUID.toString
    val consumerSettings: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("192.168.30.208:9092,192.168.30.207:9092,192.168.30.141:9092")
      .withGroupId(uuid)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Int.MaxValue.toString)
      .withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Int.MaxValue.toString)
      .withProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20485000")

    val partition = 0
    val topicSource = createSource(consumerSettings, topicName, partition)
    val numberOfMessages = OffsetManagement.getNumberOfMessages(topicName, partition)

    new TopicHandler(topicName, numberOfMessages, topicSource)
  }

  def createSource(consumerSettings: ConsumerSettings[String, String], topicName: String, partition: Int) = {

    val subscription = Subscriptions.assignmentWithOffset(
      new TopicPartition(topicName, partition) -> 0L
    )
    Consumer.plainSource(consumerSettings, subscription)
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:42,代码来源:TopicHandler.scala


示例17: AccumulateWhileUnchangedTest

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.scalatest.FunSuite

import scala.collection.immutable


class AccumulateWhileUnchangedTest extends FunSuite {

  implicit val system = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
  val windowSize = 1000
  def windowStart(timestamp: Long): Long = {
    timestamp - (timestamp % windowSize)
  }

  val numberOfElements = 3000

  test("accumulation of two windows on element stream") {

    val firstWindowElements = immutable.Seq.range(1, 999, 10).map(t => SimpleRecord(1)(t))
    val secondWindowElements = immutable.Seq.range(1000, 1999, 10).map(t => SimpleRecord(1)(t))
    val records = firstWindowElements ++ secondWindowElements

    val s = TestSink.probe[Seq[SimpleRecord]]

    val (_, sink) = Source(records)
      .via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
      .toMat(s)(Keep.both)
      .run()

    sink.request(numberOfElements)
    sink.expectNext(firstWindowElements, secondWindowElements)
    sink.expectComplete()
  }

  test("accumulation on empty stream") {

    val s = TestSink.probe[Seq[SimpleRecord]]

    val records = List[SimpleRecord]()

    val (_, sink) = Source(records)
      .via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
      .toMat(s)(Keep.both)
      .run()

    sink.request(numberOfElements)
    sink.expectComplete()
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:57,代码来源:AccumulateWhileUnchangedTest.scala


示例18: IgnoreLastElementsTest

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.FunSuite

import scala.collection.immutable

class IgnoreLastElementsTest extends FunSuite {

  implicit val system = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()(system)

  test("happy list") {

    val start = 0
    val end = 10
    val values = immutable.Seq.range(start, end)
    val ignoreCount = 2

    val s = TestSink.probe[Int]

    val (_, sink) = Source(values)
      .via(new IgnoreLastElements[Int](ignoreCount))
      .toMat(s)(Keep.both)
      .run()

    val numberOfElements = end
    sink.request(numberOfElements)
    values.dropRight(ignoreCount).foreach(v => sink.expectNext(v))
    sink.expectComplete()
  }

  test("empty list") {

    val s = TestSink.probe[Int]

    val (_, sink) = Source(List[Int]())
      .via(new IgnoreLastElements[Int](ignoreCount = 2))
      .toMat(s)(Keep.both)
      .run()

    val numberOfElements = 1
    sink.request(numberOfElements)
    sink.expectComplete()
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:50,代码来源:IgnoreLastElementsTest.scala


示例19: DeserializationDemo

//设置package包名称以及导入依赖的类
package com.martinseeler.dtf

import java.io.File

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Source}
import com.martinseeler.dtf.stages.{ByteStringToDeltaStage, DeltaToTickStage}

object DeserializationDemo extends App {

  implicit val system = ActorSystem("deserialization")
  implicit val mat = ActorMaterializer()

  val dtffSource = FileIO.fromPath(new File("ticks_1MM.dtff").toPath)

  dtffSource
    .via(new ByteStringToDeltaStage())
    .via(new DeltaToTickStage())
    .runForeach(println)
    .onComplete(_ => system.terminate())(
        scala.concurrent.ExecutionContext.global)
} 
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:24,代码来源:DeserializationDemo.scala


示例20: running

//设置package包名称以及导入依赖的类
package producers

import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.Future

trait Producerable extends ActorBroker {
  val config: AppConfig
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")

  def running(): Receive = {
    case Stop =>
      log.info("Stopping Kafka producer stream and actor")
      context.stop(self)
  }

  def sendToSink(message: String): Unit = {
    log.info(s"Attempting to produce message on topic $topicName")
    val kafkaSink = Producer.plainSink(producerSettings)

    val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
    val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
      .toMat(kafkaSink)(Keep.both)
      .run()
    future.onFailure {
      case ex =>
        log.error("Stream failed due to error, restarting", ex)
        throw ex
    }
    context.become(running())
    log.info(s"Writer now running, writing random numbers to topic $topicName")
  }


  case object Stop
} 
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala



注:本文中的akka.stream.scaladsl.Source类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala current类代码示例发布时间:2022-05-23
下一篇:
Scala ByteArrayOutputStream类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap