本文整理汇总了Scala中akka.stream.scaladsl.Sink类的典型用法代码示例。如果您正苦于以下问题:Scala Sink类的具体用法?Scala Sink怎么用?Scala Sink使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Sink类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: Pusher
//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.pusher
import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
import reactivehub.akka.stream.apns.Environment._
import reactivehub.akka.stream.apns.TlsUtil._
import reactivehub.akka.stream.apns._
import reactivehub.akka.stream.apns.marshallers.SprayJsonSupport
object Pusher extends SprayJsonSupport {
val kafka = "192.168.99.100:9092"
val clientId = "pusher1"
val consumerGroup = "pusher"
val topics = Set("notifications")
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
import system.dispatcher
def main(args: Array[String]): Unit = {
val group = new NioEventLoopGroup()
val apns = ApnsExt(system).connection[Long](Development, sslContext, group)
Consumer.atMostOnceSource(consumerSettings)
.map(msg => msg.key -> toNotification(msg.value))
.filter(_._2.deviceToken.bytes.length < 100)
.viaMat(apns)(Keep.right)
.log("pusher", _.toString())
.to(Sink.ignore).run()
.onComplete { _ =>
group.shutdownGracefully()
system.terminate()
}
}
private def sslContext: SslContext =
loadPkcs12FromResource("/cert.p12", "password")
private def consumerSettings: ConsumerSettings[Long, PushData] =
ConsumerSettings(system, ScalaLongDeserializer, PushDataDeserializer, topics)
.withBootstrapServers(kafka)
.withClientId(clientId)
.withGroupId(consumerGroup)
.withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
private def toNotification(pushData: PushData): Notification = {
var builder = Payload.Builder()
pushData.alert.foreach(alert => builder = builder.withAlert(alert))
pushData.badge.foreach(badge => builder = builder.withBadge(badge))
Notification(DeviceToken(pushData.token), builder.result)
}
}
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:60,代码来源:Pusher.scala
示例3: HTableStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase.javadsl
import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.internal.HBaseFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import scala.collection.immutable
import scala.concurrent.Future
object HTableStage {
def table[T](conf: Configuration,
tableName: TableName,
columnFamilies: java.util.List[String],
converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
import scala.compat.java8.FunctionConverters._
import scala.collection.JavaConverters._
HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
}
def sink[A](config: HTableSettings[A]): akka.stream.javadsl.Sink[A, Future[Done]] =
Flow[A].via(flow(config)).toMat(Sink.ignore)(Keep.right).asJava
def flow[A](settings: HTableSettings[A]): akka.stream.javadsl.Flow[A, A, NotUsed] =
Flow.fromGraph(new HBaseFlowStage[A](settings)).asJava
}
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:HTableStage.scala
示例4: PlayerServiceImpl
//设置package包名称以及导入依赖的类
package com.chriswk.gameranker.player.impl
import java.util.UUID
import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.chriswk.gameranker.player.api
import com.chriswk.gameranker.player.api.PlayerService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.transport.NotFound
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import scala.concurrent.ExecutionContext
class PlayerServiceImpl(registry: PersistentEntityRegistry, system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) extends PlayerService {
private val currentIdsQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
override def createPlayer = ServiceCall { createPlayer =>
val playerId = UUID.randomUUID()
refFor(playerId).ask(CreatePlayer(createPlayer.name)).map { _ =>
api.Player(playerId, createPlayer.name)
}
}
override def getPlayer(playerId: UUID) = ServiceCall { _ =>
refFor(playerId).ask(GetPlayer).map {
case Some(player) => api.Player(playerId, player.name)
case None => throw NotFound(s"Player with id $playerId")
}
}
private def refFor(playerId: UUID) = registry.refFor[PlayerEntity](playerId.toString)
override def getPlayers = ServiceCall { _ =>
currentIdsQuery.currentPersistenceIds()
.filter(_.startsWith("PlayerEntity|"))
.mapAsync(4) { id =>
val entityId = id.split("\\|", 2).last
registry.refFor[PlayerEntity](entityId)
.ask(GetPlayer)
.map(_.map(player => api.Player(UUID.fromString(entityId), player.name)))
}
.collect {
case Some(p) => p
}
.runWith(Sink.seq)
}
}
开发者ID:chriswk,项目名称:gameranker,代码行数:53,代码来源:PlayerServiceImpl.scala
示例5: MovieListPipeline
//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines
import java.nio.file.Paths
import akka.NotUsed
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import com.stacktrace.yo.scrapeline.core.ScrapeClient.jsoup
import com.stacktrace.yo.scrapeline.core._
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.model.Document
import net.ruippeixotog.scalascraper.scraper.ContentExtractors.elementList
import scala.concurrent.Future
class MovieListPipeline(implicit val m: ActorMaterializer) {
def getPipelineSource: Source[jsoup.DocumentType, NotUsed] = Source.single(ScrapeClient.scrape("http://www.the-numbers.com/movie/budgets/all"))
def getParseFlow: Flow[Document, MovieNameAndDetailUrl, NotUsed] = {
Flow[Document]
.mapConcat(doc => {
val table = doc >> elementList("table tr")
val movieLinkTuples = table.flatMap(tr => {
val name = tr >> elementList("tr b a")
name.map(
link => {
MovieNameAndDetailUrl(link.text, "http://www.the-numbers.com/" + link.attr("href"))
}
)
})
movieLinkTuples
})
}
def getPipeOut: Sink[MovieNameAndDetailUrl, Future[IOResult]] = Flow[MovieNameAndDetailUrl]
.map(s => ByteString(s.name + "\n"))
.toMat(FileIO.toPath(Paths.get("movie.txt")))(Keep.right)
def buildAndRun: Future[IOResult] = {
getPipelineSource
.via(getParseFlow)
.runWith(getPipeOut)
}
}
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:49,代码来源:MovieListPipeline.scala
示例6: offerBlocking
//设置package包名称以及导入依赖的类
package sample.stream
import java.lang.Exception
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source, SourceQueue, SourceQueueWithComplete}
import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.duration._
import scala.util.Random
def offerBlocking(elem: T, maxWait: Duration = 1.seconds): Future[QueueOfferResult] =
synchronized {
//offer returns a Future, which completes with the result of the enqueue operation
//must only be used from a single thread
val result = q.offer(elem)
Await.ready(result, maxWait)
result
}
}
def asyncOp(userID: Long): Future[String] = {
Thread.sleep(10) //without waiting time we see a lot of "Future(<not completed>)"
try {
if (Random.nextInt() % 2 == 0) {
println("asyncOp: random exception")
throw new RuntimeException("random exception")
}
} catch {
case ex: RuntimeException => Future {
"Exception"
}
}
Future (s"user: $userID")
}
val targetQueue =
Source.queue[Future[String]](Int.MaxValue, OverflowStrategy.backpressure)
.to(Sink.foreach(println))
.run()
val targetSyncQueue = new SyncQueue(targetQueue)
Source(1 to Int.MaxValue)
//.throttle(1000, 1.second, 1, ThrottleMode.shaping)
.mapAsync(1)(x => targetSyncQueue.offerBlocking(asyncOp(x)))
.runWith(Sink.ignore)
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:53,代码来源:SendingMessagesToStream.scala
示例7: FlowFromGraph
//设置package包名称以及导入依赖的类
package sample.graphDSL
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}
object FlowFromGraph {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("FlowFromGraph")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
val listOfFlows = List(processorFlow1, processorFlow2)
def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")
Flow.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))
indexFlows.foreach(broadcast ~> _ ~> merge)
FlowShape(broadcast.in, merge.out)
})
}
val compoundFlow = compoundFlowFrom(listOfFlows)
Source(1 to 10)
.via(compoundFlow)
.runWith(Sink.foreach(println(_)))
.onComplete(_ => system.terminate())
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala
示例8: Main
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.sensor
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Sink}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.pcap.PcapFileRaw.LinkType
import edu.uw.at.iroberts.wirefugue.pcap._
import edu.uw.at.iroberts.wirefugue.kafka.producer.{KafkaKey, PacketProducer}
import edu.uw.at.iroberts.wirefugue.kafka.serdes.PacketSerializer
import edu.uw.at.iroberts.wirefugue.protocol.overlay.{Ethernet, IPV4Datagram}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, IntegerSerializer}
import scala.concurrent.Await
import scala.concurrent.duration._
object Main {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
println("Please specify a filename as the first argument")
System.exit(1)
}
val config = ConfigFactory.load("application.conf")
implicit val system = ActorSystem("stream-producer-system", config)
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings[Integer, Packet](system, None, None)
val doneF = PcapSource(Paths.get(args(0)).toUri)
.filter( p => p.network == LinkType.ETHERNET && p.ip.isDefined )
.map( packet => new ProducerRecord[Integer, Packet]("packets", packet.key.##, packet))
.map( pr => new ProducerMessage.Message[Integer, Packet, Unit](pr, ()))
.via(Producer.flow(producerSettings))
.runWith(Sink.foreach(println))
try {
Await.ready(doneF, 10 seconds)
}
finally {
system.terminate()
}
}
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:53,代码来源:Main.scala
示例9: stringPersister
//设置package包名称以及导入依赖的类
package services
import javax.inject.{Inject, Singleton}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits._
import scala.concurrent.Future
import scala.util.{Failure, Success}
def stringPersister(pf: String => Future[Unit]): Flow[String, String, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val persistenceSink = Sink.foreach[String] { content =>
val f = pf(content)
f.onComplete {
case Success(u) => Logger.debug(s"Persisted content: '$content'")
case Failure(t) => Logger.error(s"Failed to persist content: '$content", t)
}
}
val bcast = builder.add(Broadcast[String](2))
bcast.out(1) ~> persistenceSink
FlowShape(bcast.in, bcast.out(0))
})
}
开发者ID:snackCake,项目名称:TweetStreamChallenge,代码行数:33,代码来源:PersistenceService.scala
示例10: toPath
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp.scaladsl
import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory, FtpSourceParams, FtpsSourceParams, SftpSourceParams}
import akka.stream.alpakka.ftp.{FtpFile, RemoteFileSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.FTPClient
import scala.concurrent.Future
sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] =>
def toPath(
path: String,
connectionSettings: S,
append: Boolean = false
): Sink[ByteString, Future[IOResult]] =
Sink.fromGraph(createIOSink(path, connectionSettings, append))
protected[this] implicit def ftpLike: FtpLike[FtpClient, S]
}
object Ftp extends FtpApi[FTPClient] with FtpSourceParams
object Ftps extends FtpApi[FTPClient] with FtpsSourceParams
object Sftp extends FtpApi[SSHClient] with SftpSourceParams
开发者ID:akka,项目名称:alpakka,代码行数:29,代码来源:FtpApi.scala
示例11: listFiles
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp
import akka.NotUsed
import akka.stream.alpakka.ftp.FtpCredentials.AnonFtpCredentials
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
import java.net.InetAddress
trait BaseSftpSpec extends SftpSupportImpl with BaseSpec {
//#create-settings
val settings = SftpSettings(
InetAddress.getByName("localhost"),
getPort,
AnonFtpCredentials,
strictHostKeyChecking = false,
knownHosts = None,
sftpIdentity = None
)
//#create-settings
protected def listFiles(basePath: String): Source[FtpFile, NotUsed] =
Sftp.ls(basePath, settings)
protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]] =
Sftp.fromPath(path, settings)
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
Sftp.toPath(path, settings, append)
}
开发者ID:akka,项目名称:alpakka,代码行数:35,代码来源:BaseSftpSpec.scala
示例12: listFiles
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp
import akka.NotUsed
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Inside, Matchers, WordSpecLike}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
trait BaseSpec
extends WordSpecLike
with Matchers
with BeforeAndAfter
with BeforeAndAfterAll
with ScalaFutures
with IntegrationPatience
with Inside
with AkkaSupport
with FtpSupport {
protected def listFiles(basePath: String): Source[FtpFile, NotUsed]
protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]]
protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]]
protected def startServer(): Unit
protected def stopServer(): Unit
after {
cleanFiles()
}
override protected def beforeAll() = {
super.beforeAll()
startServer()
}
override protected def afterAll() = {
stopServer()
Await.ready(getSystem.terminate(), 42.seconds)
super.afterAll()
}
}
开发者ID:akka,项目名称:alpakka,代码行数:49,代码来源:BaseSpec.scala
示例13: IronMqPullStageSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ironmq
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.ExecutionContext.Implicits.global
class IronMqPullStageSpec extends UnitSpec with IronMqFixture with AkkaStreamFixture {
"IronMqSourceStage" when {
"there are messages" should {
"consume all messages" in {
val queue = givenQueue()
val messages = (1 to 100).map(i => PushMessage(s"test-$i"))
ironMqClient.pushMessages(queue.name, messages: _*).futureValue
val source = Source.fromGraph(new IronMqPullStage(queue.name, IronMqSettings()))
val receivedMessages = source.take(100).runWith(Sink.seq).map(_.map(_.message.body)).futureValue
val expectedMessages = messages.map(_.body)
receivedMessages should contain theSameElementsInOrderAs expectedMessages
}
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:25,代码来源:IronMqPullStageSpec.scala
示例14: MemoryBufferSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures
class MemoryBufferSpec(_system: ActorSystem)
extends TestKit(_system)
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
def this() = this(ActorSystem("MemoryBufferSpec"))
implicit val defaultPatience =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))
"MemoryBuffer" should "emit a chunk on its output containg the concatenation of all input values" in {
val result = Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
.via(new MemoryBuffer(200))
.runWith(Sink.seq)
.futureValue
result should have size (1)
val chunk = result.head
chunk.size should be(14)
chunk.data.runWith(Sink.seq).futureValue should be(Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)))
}
it should "fail if more than maxSize bytes are fed into it" in {
whenReady(
Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
.via(new MemoryBuffer(10))
.runWith(Sink.seq)
.failed
) { e =>
e shouldBe a[IllegalStateException]
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:49,代码来源:MemoryBufferSpec.scala
示例15: DynamoClientImpl
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.dynamodb.impl
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.MediaType.NotCompressible
import akka.http.scaladsl.model.{ContentType, MediaType}
import akka.stream.Materializer
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.impl.AwsClient.{AwsConnect, AwsRequestMetadata}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.AmazonServiceException
import com.amazonaws.http.HttpResponseHandler
class DynamoClientImpl(
val settings: DynamoSettings,
val errorResponseHandler: HttpResponseHandler[AmazonServiceException]
)(implicit protected val system: ActorSystem, implicit protected val materializer: Materializer)
extends AwsClient[DynamoSettings] {
override protected val service = "dynamodb"
override protected val defaultContentType =
ContentType.Binary(MediaType.customBinary("application", "x-amz-json-1.0", NotCompressible))
override protected implicit val ec = system.dispatcher
override protected val connection: AwsConnect =
if (settings.port == 443)
Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host)(materializer)
else
Http().cachedHostConnectionPool[AwsRequestMetadata](settings.host, settings.port)(materializer)
def single(op: AwsOp) = Source.single(op).via(flow).map(_.asInstanceOf[op.B]).runWith(Sink.head)
}
开发者ID:akka,项目名称:alpakka,代码行数:34,代码来源:DynamoClientImpl.scala
示例16: GeodeFiniteSourceSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.scaladsl
import akka.stream.scaladsl.Sink
import org.slf4j.LoggerFactory
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
class GeodeFiniteSourceSpec extends GeodeBaseSpec {
private val log = LoggerFactory.getLogger(classOf[GeodeFiniteSourceSpec])
"Geode finite source" should {
it { geodeSettings =>
"retrieves finite elements from geode" in {
val reactiveGeode = new ReactiveGeode(geodeSettings)
//#query
val source =
reactiveGeode
.query[Person](s"select * from /persons order by id")
.runWith(Sink.foreach(e => log.debug(s"$e")))
//#query
Await.ready(source, 10 seconds)
val animals =
reactiveGeode
.query[Animal](s"select * from /animals order by id")
.runWith(Sink.foreach(e => log.debug(s"$e")))
Await.ready(animals, 10 seconds)
val complexes =
reactiveGeode
.query[Complex](s"select * from /complexes order by id")
.runWith(Sink.foreach(e => log.debug(s"$e")))
Await.ready(complexes, 10 seconds)
reactiveGeode.close()
}
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:47,代码来源:GeodeFiniteSourceSpec.scala
示例17: GeodeContinuousSourceSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.scaladsl
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Sink}
import org.slf4j.LoggerFactory
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
class GeodeContinuousSourceSpec extends GeodeBaseSpec {
private val log = LoggerFactory.getLogger(classOf[GeodeContinuousSourceSpec])
"Geode continuousQuery" should {
it { geodeSettings =>
"retrieves continuously elements from geode" in {
//#connection-with-pool
val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
//#connection-with-pool
val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings)
//#continuousQuery
val source =
reactiveGeode
.continuousQuery[Person]('test, s"select * from /persons")
.runWith(Sink.fold(0) { (c, p) =>
log.debug(s"$p $c")
if (c == 19) {
reactiveGeode.closeContinuousQuery('test).foreach { _ =>
log.debug("test cQuery is closed")
}
}
c + 1
})
//#continuousQuery
val f = buildPersonsSource(1 to 20)
.via(flow) //geode flow
.runWith(Sink.ignore)
Await.result(f, 10 seconds)
Await.result(source, 5 seconds)
reactiveGeode.close()
}
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:54,代码来源:GeodeContinuousSourceSpec.scala
示例18: AllTogether_
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.Await
import scala.concurrent.duration._
object AllTogether_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val result = Source(1 to 5)
.via(new FilterStage_.Filter(_ % 2 < 10))
.via(new DuplicatorShape_.Duplicator())
.via(new MapShape_.Map[Int, Int](_ / 2))
.runWith(Sink.seq)
val r = Await.result(result, 300 millis)
println(r)
system.terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:26,代码来源:AllTogether_.scala
示例19: InputCustomer
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.util.Random
object InputCustomer {
def random(): InputCustomer = {
InputCustomer(s"FirstName${Random.nextInt(1000)} " +
s"LastName${Random.nextInt(1000)}")
}
}
case class InputCustomer(name: String)
case class OutputCustomer(firstName: String, lastName: String)
object CustomersExample extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val materializer = ActorMaterializer()
val inputCustomers = Source((1 to 100).map(_ => InputCustomer.random()))
val normalize = Flow[InputCustomer]
.map(c => c.name.split(" ").toList)
.collect {
case firstName :: lastName :: Nil => OutputCustomer(firstName, lastName)
}
val writeCustomers = Sink.foreach { println} // ?-conversion
inputCustomers.via(normalize).runWith(writeCustomers).andThen {
case _ => actorSystem.terminate()
}
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:40,代码来源:CustomersExample.scala
示例20: SourceIsImmutable
//设置package包名称以及导入依赖的类
package akka_in_action.streams
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.Await
import scala.concurrent.duration._
object SourceIsImmutable extends App {
implicit val system = ActorSystem("actor-system")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val source = Source(1 to 10)
source.map(_ => 0)
// has no effect on source, since it's immutable
val runnable1 = source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ => 0)
// returns new Source[Int], with `map()` appended
val runnable2 = zeroes.runWith(Sink.fold(0)(_ + _)) // 0
val results = for {
res1 <- runnable1
res2 <- runnable2
} yield (res1, res2)
val pair = Await.result(results, 500 millis)
println(pair)
system terminate()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:35,代码来源:SourceIsImmutable.scala
注:本文中的akka.stream.scaladsl.Sink类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论