本文整理汇总了Scala中akka.stream.scaladsl.Source类的典型用法代码示例。如果您正苦于以下问题:Scala Source类的具体用法?Scala Source怎么用?Scala Source使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Source类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: PartitionValidatedSpec
//设置package包名称以及导入依赖的类
package eu.svez.akka.stream.cats
import akka.NotUsed
import akka.stream.SinkShape
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.stream.testkit.TestSubscriber
import cats.implicits._
import eu.svez.akka.stream.cats.Stages._
class PartitionValidatedSpec extends StageSpec {
"PartitionValidated" should "partition a flow of Validation[E, A] in two flows of E and A" in new Test {
val src = Source(List(
1.valid[String],
2.valid[String],
"BOOM!".invalid[Int],
3.valid[String],
"BOOM 2!".invalid[Int]
))
src.runWith(testSink)
successProbe.request(3)
failureProbe.request(2)
successProbe.expectNext(1)
successProbe.expectNext(2)
successProbe.expectNext(3)
failureProbe.expectNext("BOOM!")
failureProbe.expectNext("BOOM 2!")
successProbe.expectComplete()
failureProbe.expectComplete()
}
trait Test {
val failureProbe = TestSubscriber.probe[String]()
val successProbe = TestSubscriber.probe[Int]()
val testSink = Sink.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val valStage = builder.add(PartitionValidated[String, Int]())
valStage.invalid ~> Sink.fromSubscriber(failureProbe)
valStage.valid ~> Sink.fromSubscriber(successProbe)
SinkShape(valStage.in)
})
}
}
开发者ID:svezfaz,项目名称:akka-stream-fp,代码行数:51,代码来源:PartitionValidatedSpec.scala
示例3: MovieListPipeline
//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines
import java.nio.file.Paths
import akka.NotUsed
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import com.stacktrace.yo.scrapeline.core.ScrapeClient.jsoup
import com.stacktrace.yo.scrapeline.core._
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.model.Document
import net.ruippeixotog.scalascraper.scraper.ContentExtractors.elementList
import scala.concurrent.Future
class MovieListPipeline(implicit val m: ActorMaterializer) {
def getPipelineSource: Source[jsoup.DocumentType, NotUsed] = Source.single(ScrapeClient.scrape("http://www.the-numbers.com/movie/budgets/all"))
def getParseFlow: Flow[Document, MovieNameAndDetailUrl, NotUsed] = {
Flow[Document]
.mapConcat(doc => {
val table = doc >> elementList("table tr")
val movieLinkTuples = table.flatMap(tr => {
val name = tr >> elementList("tr b a")
name.map(
link => {
MovieNameAndDetailUrl(link.text, "http://www.the-numbers.com/" + link.attr("href"))
}
)
})
movieLinkTuples
})
}
def getPipeOut: Sink[MovieNameAndDetailUrl, Future[IOResult]] = Flow[MovieNameAndDetailUrl]
.map(s => ByteString(s.name + "\n"))
.toMat(FileIO.toPath(Paths.get("movie.txt")))(Keep.right)
def buildAndRun: Future[IOResult] = {
getPipelineSource
.via(getParseFlow)
.runWith(getPipeOut)
}
}
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:49,代码来源:MovieListPipeline.scala
示例4: GridfsSource
//设置package包名称以及导入依赖的类
import java.nio.ByteBuffer
import akka.NotUsed
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
import org.mongodb.scala.gridfs.GridFSDownloadStream
import scala.concurrent.{ExecutionContext, Future}
case class GridfsSource(stream: GridFSDownloadStream, chunkSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[Future[ByteBuffer]]] {
val out: Outlet[Future[ByteBuffer]] = Outlet("File Stream")
override def shape: SourceShape[Future[ByteBuffer]] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val buffer = ByteBuffer.allocate(chunkSize)
val loaded = stream.read(buffer).toFuture().map(_ => buffer)
push(out, loaded)
}
override def onDownstreamFinish(): Unit = {
super.onDownstreamFinish()
stream.close()
complete(out)
}
})
}
}
object GridfsSource {
def apply(stream: GridFSDownloadStream)(implicit ec: ExecutionContext): Source[ByteString, NotUsed] = {
Source.fromGraph(GridfsSource(stream, 512 * 1024)).mapAsync(1)(fb => fb.map { buffer => buffer.flip(); ByteString(buffer) }).takeWhile(_.nonEmpty)
}
}
开发者ID:zhijun,项目名称:GridFSSource,代码行数:42,代码来源:GridFSSource.scala
示例5: PrintMoreNumbers
//设置package包名称以及导入依赖的类
package sample.stream_actor_simple
import akka.actor.Actor
import akka.stream.{ActorMaterializer, KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
class PrintMoreNumbers(implicit materializer: ActorMaterializer) extends Actor {
private implicit val executionContext = context.system.dispatcher
private val (killSwitch: UniqueKillSwitch, done) =
Source.tick(0.seconds, 1.second, 1)
.scan(0)(_ + _)
.map(_.toString)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(println))(Keep.both)
.run()
done.map(_ => self ! "done")
override def receive: Receive = {
//When the actor is stopped, it will also stop the stream
case "stop" =>
println("Stopping")
killSwitch.shutdown()
case "done" =>
println("Done")
context.stop(self)
context.system.terminate()
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:33,代码来源:PrintMoreNumbers.scala
示例6: SimulateWindTurbines
//设置package包名称以及导入依赖的类
package sample.stream_actor
import akka.actor.ActorSystem
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import sample.WindTurbineSimulator
import scala.concurrent.duration._
object SimulateWindTurbines extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val endpoint = "ws://127.0.0.1:8080"
val numberOfTurbines = 5
Source(1 to numberOfTurbines)
.throttle(
elements = 100, //number of elements to be taken from bucket
per = 1.second,
maximumBurst = 100, //capacity of bucket
mode = ThrottleMode.shaping
)
.map { _ =>
val id = java.util.UUID.randomUUID.toString
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
WindTurbineSimulator.props(id, endpoint),
childName = id,
minBackoff = 1.second,
maxBackoff = 30.seconds,
randomFactor = 0.2
))
system.actorOf(supervisor, name = s"$id-backoff-supervisor")
}
.runWith(Sink.ignore)
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:41,代码来源:SimulateWindTurbines.scala
示例7: FlowFromGraph
//设置package包名称以及导入依赖的类
package sample.graphDSL
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}
object FlowFromGraph {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("FlowFromGraph")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
val listOfFlows = List(processorFlow1, processorFlow2)
def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")
Flow.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))
indexFlows.foreach(broadcast ~> _ ~> merge)
FlowShape(broadcast.in, merge.out)
})
}
val compoundFlow = compoundFlowFrom(listOfFlows)
Source(1 to 10)
.via(compoundFlow)
.runWith(Sink.foreach(println(_)))
.onComplete(_ => system.terminate())
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala
示例8: ReadSideTestDriver
//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl
import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}
import scala.concurrent.{ExecutionContext, Future}
class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]
override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
val processor = processorFactory
val eventTags = processor.aggregateTags
val handler = processor.buildHandler()
val future = for {
_ <- handler.globalPrepare()
offset <- handler.prepare(eventTags.head)
} yield {
handler -> offset
}
synchronized {
val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
processors += (eventTags.head.eventType -> (handlers :+ future))
}
}
def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
processors.get(event.aggregateTag.eventType) match {
case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
case Some(handlerFutures) =>
for {
handlers <- Future.sequence(handlerFutures)
_ <- Future.sequence(handlers.map {
case (handler: ReadSideHandler[Event], _) =>
Source.single(new EventStreamElement(entityId, event, offset))
.via(handler.handle())
.runWith(Sink.ignore)
})
} yield {
Done
}
}
}
}
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:52,代码来源:ReadSideTestDriver.scala
示例9: listFiles
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp
import akka.NotUsed
import akka.stream.alpakka.ftp.FtpCredentials.AnonFtpCredentials
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
import java.net.InetAddress
trait BaseSftpSpec extends SftpSupportImpl with BaseSpec {
//#create-settings
val settings = SftpSettings(
InetAddress.getByName("localhost"),
getPort,
AnonFtpCredentials,
strictHostKeyChecking = false,
knownHosts = None,
sftpIdentity = None
)
//#create-settings
protected def listFiles(basePath: String): Source[FtpFile, NotUsed] =
Sftp.ls(basePath, settings)
protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]] =
Sftp.fromPath(path, settings)
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
Sftp.toPath(path, settings, append)
}
开发者ID:akka,项目名称:alpakka,代码行数:35,代码来源:BaseSftpSpec.scala
示例10: IronMqPullStageSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ironmq
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.ExecutionContext.Implicits.global
class IronMqPullStageSpec extends UnitSpec with IronMqFixture with AkkaStreamFixture {
"IronMqSourceStage" when {
"there are messages" should {
"consume all messages" in {
val queue = givenQueue()
val messages = (1 to 100).map(i => PushMessage(s"test-$i"))
ironMqClient.pushMessages(queue.name, messages: _*).futureValue
val source = Source.fromGraph(new IronMqPullStage(queue.name, IronMqSettings()))
val receivedMessages = source.take(100).runWith(Sink.seq).map(_.map(_.message.body)).futureValue
val expectedMessages = messages.map(_.body)
receivedMessages should contain theSameElementsInOrderAs expectedMessages
}
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:25,代码来源:IronMqPullStageSpec.scala
示例11: MemoryBuffer
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
private[alpakka] final class MemoryBuffer(maxSize: Int) extends GraphStage[FlowShape[ByteString, Chunk]] {
val in = Inlet[ByteString]("MemoryBuffer.in")
val out = Outlet[Chunk]("MemoryBuffer.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var buffer = ByteString.empty
override def onPull(): Unit = if (isClosed(in)) emit() else pull(in)
override def onPush(): Unit = {
val elem = grab(in)
if (buffer.size + elem.size > maxSize) {
failStage(new IllegalStateException("Buffer size of " + maxSize + " bytes exceeded."))
} else {
buffer ++= elem
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) emit()
completeStage()
}
def emit(): Unit = emit(out, Chunk(Source.single(buffer), buffer.size), () => completeStage())
setHandlers(in, out, this)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:39,代码来源:MemoryBuffer.scala
示例12: MemoryBufferSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures
class MemoryBufferSpec(_system: ActorSystem)
extends TestKit(_system)
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
def this() = this(ActorSystem("MemoryBufferSpec"))
implicit val defaultPatience =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))
"MemoryBuffer" should "emit a chunk on its output containg the concatenation of all input values" in {
val result = Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
.via(new MemoryBuffer(200))
.runWith(Sink.seq)
.futureValue
result should have size (1)
val chunk = result.head
chunk.size should be(14)
chunk.data.runWith(Sink.seq).futureValue should be(Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)))
}
it should "fail if more than maxSize bytes are fed into it" in {
whenReady(
Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
.via(new MemoryBuffer(10))
.runWith(Sink.seq)
.failed
) { e =>
e shouldBe a[IllegalStateException]
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:49,代码来源:MemoryBufferSpec.scala
示例13: SplitAfterSizeSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.testkit.TestKit
import akka.stream.ActorMaterializerSettings
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import org.scalatest.Matchers
import org.scalatest.FlatSpecLike
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import akka.stream.scaladsl.Sink
import org.scalatest.time.{Millis, Seconds, Span}
import scala.concurrent.duration._
class SplitAfterSizeSpec(_system: ActorSystem)
extends TestKit(_system)
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
def this() = this(ActorSystem("SplitAfterSizeSpec"))
implicit val defaultPatience =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))
"SplitAfterSize" should "yield a single empty substream on no input" in {
Source
.empty[ByteString]
.via(
SplitAfterSize(10)(Flow[ByteString]).concatSubstreams
)
.runWith(Sink.seq)
.futureValue should be(Seq.empty)
}
it should "start a new stream after the element that makes it reach a maximum, but not split the element itself" in {
Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
.via(
SplitAfterSize(10)(Flow[ByteString]).prefixAndTail(10).map { case (prefix, tail) => prefix }.concatSubstreams
)
.runWith(Sink.seq)
.futureValue should be(
Seq(
Seq(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12)),
Seq(ByteString(13, 14))
)
)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:56,代码来源:SplitAfterSizeSpec.scala
示例14: KinesisSource
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.kinesis.scaladsl
import akka.NotUsed
import akka.stream.alpakka.kinesis.KinesisSourceErrors.NoShardsError
import akka.stream.alpakka.kinesis.{KinesisSourceStage, ShardSettings}
import akka.stream.scaladsl.{Merge, Source}
import com.amazonaws.services.kinesis.AmazonKinesisAsync
import com.amazonaws.services.kinesis.model.Record
object KinesisSource {
def basic(shardSettings: ShardSettings, amazonKinesisAsync: AmazonKinesisAsync): Source[Record, NotUsed] =
Source.fromGraph(new KinesisSourceStage(shardSettings, amazonKinesisAsync))
def basicMerge(shardSettings: List[ShardSettings], amazonKinesisAsync: AmazonKinesisAsync): Source[Record, NotUsed] = {
val create: ShardSettings => Source[Record, NotUsed] = basic(_, amazonKinesisAsync)
shardSettings match {
case Nil => Source.failed(NoShardsError)
case first :: Nil => create(first)
case first :: second :: Nil => Source.combine(create(first), create(second))(Merge(_))
case first :: second :: rest =>
Source.combine(create(first), create(second), rest.map(create(_)): _*)(Merge(_))
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:27,代码来源:KinesisSource.scala
示例15: index
//设置package包名称以及导入依赖的类
package au.csiro.data61.magda.indexer.search
import au.csiro.data61.magda.model.misc._
import scala.concurrent.{ ExecutionContext, Future }
import akka.stream.Materializer
import akka.actor.ActorSystem
import au.csiro.data61.magda.indexer.external.InterfaceConfig
import au.csiro.data61.magda.search.elasticsearch.{ ClientProvider, Indices }
import com.typesafe.config.Config
import java.time.Instant
import akka.stream.scaladsl.Source
import akka.NotUsed
import java.time.OffsetDateTime
import au.csiro.data61.magda.indexer.search.elasticsearch.ElasticSearchIndexer
trait SearchIndexer {
def index(source: InterfaceConfig, dataSetStream: Source[DataSet, NotUsed]): Future[SearchIndexer.IndexResult]
def snapshot(): Future[Unit]
def ready: Future[Unit]
def trim(source: InterfaceConfig, before: OffsetDateTime): Future[Unit]
}
object SearchIndexer {
case class IndexResult(successes: Long, failures: Seq[String])
def apply(clientProvider: ClientProvider, indices: Indices)(implicit config: Config, system: ActorSystem, ec: ExecutionContext, materializer: Materializer) =
new ElasticSearchIndexer(clientProvider, indices)
}
开发者ID:TerriaJS,项目名称:magda,代码行数:31,代码来源:SearchIndexer.scala
示例16: TopicHandler
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.kafka
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.hpi.esb.util.OffsetManagement
case class TopicHandler(topicName: String, numberOfMessages: Long, topicSource: Source[ConsumerRecord[String, String], Consumer.Control])
object TopicHandler {
def create(topicName: String, system: ActorSystem): TopicHandler = {
val uuid = java.util.UUID.randomUUID.toString
val consumerSettings: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("192.168.30.208:9092,192.168.30.207:9092,192.168.30.141:9092")
.withGroupId(uuid)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Int.MaxValue.toString)
.withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Int.MaxValue.toString)
.withProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20485000")
val partition = 0
val topicSource = createSource(consumerSettings, topicName, partition)
val numberOfMessages = OffsetManagement.getNumberOfMessages(topicName, partition)
new TopicHandler(topicName, numberOfMessages, topicSource)
}
def createSource(consumerSettings: ConsumerSettings[String, String], topicName: String, partition: Int) = {
val subscription = Subscriptions.assignmentWithOffset(
new TopicPartition(topicName, partition) -> 0L
)
Consumer.plainSource(consumerSettings, subscription)
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:42,代码来源:TopicHandler.scala
示例17: AccumulateWhileUnchangedTest
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.scalatest.FunSuite
import scala.collection.immutable
class AccumulateWhileUnchangedTest extends FunSuite {
implicit val system = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
val windowSize = 1000
def windowStart(timestamp: Long): Long = {
timestamp - (timestamp % windowSize)
}
val numberOfElements = 3000
test("accumulation of two windows on element stream") {
val firstWindowElements = immutable.Seq.range(1, 999, 10).map(t => SimpleRecord(1)(t))
val secondWindowElements = immutable.Seq.range(1000, 1999, 10).map(t => SimpleRecord(1)(t))
val records = firstWindowElements ++ secondWindowElements
val s = TestSink.probe[Seq[SimpleRecord]]
val (_, sink) = Source(records)
.via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
.toMat(s)(Keep.both)
.run()
sink.request(numberOfElements)
sink.expectNext(firstWindowElements, secondWindowElements)
sink.expectComplete()
}
test("accumulation on empty stream") {
val s = TestSink.probe[Seq[SimpleRecord]]
val records = List[SimpleRecord]()
val (_, sink) = Source(records)
.via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
.toMat(s)(Keep.both)
.run()
sink.request(numberOfElements)
sink.expectComplete()
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:57,代码来源:AccumulateWhileUnchangedTest.scala
示例18: IgnoreLastElementsTest
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.FunSuite
import scala.collection.immutable
class IgnoreLastElementsTest extends FunSuite {
implicit val system = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
test("happy list") {
val start = 0
val end = 10
val values = immutable.Seq.range(start, end)
val ignoreCount = 2
val s = TestSink.probe[Int]
val (_, sink) = Source(values)
.via(new IgnoreLastElements[Int](ignoreCount))
.toMat(s)(Keep.both)
.run()
val numberOfElements = end
sink.request(numberOfElements)
values.dropRight(ignoreCount).foreach(v => sink.expectNext(v))
sink.expectComplete()
}
test("empty list") {
val s = TestSink.probe[Int]
val (_, sink) = Source(List[Int]())
.via(new IgnoreLastElements[Int](ignoreCount = 2))
.toMat(s)(Keep.both)
.run()
val numberOfElements = 1
sink.request(numberOfElements)
sink.expectComplete()
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:50,代码来源:IgnoreLastElementsTest.scala
示例19: DeserializationDemo
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Source}
import com.martinseeler.dtf.stages.{ByteStringToDeltaStage, DeltaToTickStage}
object DeserializationDemo extends App {
implicit val system = ActorSystem("deserialization")
implicit val mat = ActorMaterializer()
val dtffSource = FileIO.fromPath(new File("ticks_1MM.dtff").toPath)
dtffSource
.via(new ByteStringToDeltaStage())
.via(new DeltaToTickStage())
.runForeach(println)
.onComplete(_ => system.terminate())(
scala.concurrent.ExecutionContext.global)
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:24,代码来源:DeserializationDemo.scala
示例20: running
//设置package包名称以及导入依赖的类
package producers
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.Future
trait Producerable extends ActorBroker {
val config: AppConfig
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")
def running(): Receive = {
case Stop =>
log.info("Stopping Kafka producer stream and actor")
context.stop(self)
}
def sendToSink(message: String): Unit = {
log.info(s"Attempting to produce message on topic $topicName")
val kafkaSink = Producer.plainSink(producerSettings)
val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
.toMat(kafkaSink)(Keep.both)
.run()
future.onFailure {
case ex =>
log.error("Stream failed due to error, restarting", ex)
throw ex
}
context.become(running())
log.info(s"Writer now running, writing random numbers to topic $topicName")
}
case object Stop
}
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala
注:本文中的akka.stream.scaladsl.Source类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论