• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Sink类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.scaladsl.Sink的典型用法代码示例。如果您正苦于以下问题:Scala Sink类的具体用法?Scala Sink怎么用?Scala Sink使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Sink类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ProcessingKafkaApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.duration._

object ProcessingKafkaApplication extends App {
  implicit val actorSystem = ActorSystem("SimpleStream")
  implicit val actorMaterializer = ActorMaterializer()

  val bootstrapServers = "localhost:9092"
  val kafkaTopic = "akka_streams_topic"
  val partition = 0
  val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))

  val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("akka_streams_group")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
    val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
    val kafkaSink = Producer.plainSink(producerSettings)
    val printlnSink = Sink.foreach(println)

    val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
    val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())

    tickSource  ~> mapToProducerRecord   ~> kafkaSink
    kafkaSource ~> mapFromConsumerRecord ~> printlnSink

    ClosedShape
  })

  runnableGraph.run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala


示例2: Pusher

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.pusher

import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
import reactivehub.akka.stream.apns.Environment._
import reactivehub.akka.stream.apns.TlsUtil._
import reactivehub.akka.stream.apns._
import reactivehub.akka.stream.apns.marshallers.SprayJsonSupport

object Pusher extends SprayJsonSupport {
  val kafka = "192.168.99.100:9092"
  val clientId = "pusher1"
  val consumerGroup = "pusher"
  val topics = Set("notifications")

  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  import system.dispatcher

  def main(args: Array[String]): Unit = {
    val group = new NioEventLoopGroup()
    val apns = ApnsExt(system).connection[Long](Development, sslContext, group)

    Consumer.atMostOnceSource(consumerSettings)
      .map(msg => msg.key -> toNotification(msg.value))
      .filter(_._2.deviceToken.bytes.length < 100)
      .viaMat(apns)(Keep.right)
      .log("pusher", _.toString())
      .to(Sink.ignore).run()
      .onComplete { _ =>
        group.shutdownGracefully()
        system.terminate()
      }
  }

  private def sslContext: SslContext =
    loadPkcs12FromResource("/cert.p12", "password")

  private def consumerSettings: ConsumerSettings[Long, PushData] =
    ConsumerSettings(system, ScalaLongDeserializer, PushDataDeserializer, topics)
      .withBootstrapServers(kafka)
      .withClientId(clientId)
      .withGroupId(consumerGroup)
      .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")

  private def toNotification(pushData: PushData): Notification = {
    var builder = Payload.Builder()
    pushData.alert.foreach(alert => builder = builder.withAlert(alert))
    pushData.badge.foreach(badge => builder = builder.withBadge(badge))
    Notification(DeviceToken(pushData.token), builder.result)
  }
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:60,代码来源:Pusher.scala


示例3: HTableStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase.javadsl

import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.internal.HBaseFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put

import scala.collection.immutable
import scala.concurrent.Future

object HTableStage {

  def table[T](conf: Configuration,
               tableName: TableName,
               columnFamilies: java.util.List[String],
               converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
    import scala.compat.java8.FunctionConverters._
    import scala.collection.JavaConverters._
    HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
  }

  def sink[A](config: HTableSettings[A]): akka.stream.javadsl.Sink[A, Future[Done]] =
    Flow[A].via(flow(config)).toMat(Sink.ignore)(Keep.right).asJava

  def flow[A](settings: HTableSettings[A]): akka.stream.javadsl.Flow[A, A, NotUsed] =
    Flow.fromGraph(new HBaseFlowStage[A](settings)).asJava

} 
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:HTableStage.scala


示例4: PlayerServiceImpl

//设置package包名称以及导入依赖的类
package com.chriswk.gameranker.player.impl

import java.util.UUID

import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.chriswk.gameranker.player.api
import com.chriswk.gameranker.player.api.PlayerService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.transport.NotFound
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry

import scala.concurrent.ExecutionContext

class PlayerServiceImpl(registry: PersistentEntityRegistry, system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) extends PlayerService {
  private val currentIdsQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

  override def createPlayer = ServiceCall { createPlayer =>
    val playerId = UUID.randomUUID()
    refFor(playerId).ask(CreatePlayer(createPlayer.name)).map { _ =>
      api.Player(playerId, createPlayer.name)
    }
  }

  override def getPlayer(playerId: UUID) = ServiceCall { _ =>
    refFor(playerId).ask(GetPlayer).map {
      case Some(player) => api.Player(playerId, player.name)
      case None => throw NotFound(s"Player with id $playerId")
    }

  }

  private def refFor(playerId: UUID) = registry.refFor[PlayerEntity](playerId.toString)

  override def getPlayers = ServiceCall { _ =>
    currentIdsQuery.currentPersistenceIds()
      .filter(_.startsWith("PlayerEntity|"))
      .mapAsync(4) { id =>
        val entityId = id.split("\\|", 2).last
        registry.refFor[PlayerEntity](entityId)
          .ask(GetPlayer)
          .map(_.map(player => api.Player(UUID.fromString(entityId), player.name)))
      }
      .collect {
        case Some(p) => p
      }
      .runWith(Sink.seq)
  }
} 
开发者ID:chriswk,项目名称:gameranker,代码行数:53,代码来源:PlayerServiceImpl.scala


示例5: MovieListPipeline

//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines

import java.nio.file.Paths

import akka.NotUsed
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import com.stacktrace.yo.scrapeline.core.ScrapeClient.jsoup
import com.stacktrace.yo.scrapeline.core._
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.model.Document
import net.ruippeixotog.scalascraper.scraper.ContentExtractors.elementList

import scala.concurrent.Future

class MovieListPipeline(implicit val m: ActorMaterializer) {

  def getPipelineSource: Source[jsoup.DocumentType, NotUsed] = Source.single(ScrapeClient.scrape("http://www.the-numbers.com/movie/budgets/all"))

  def getParseFlow: Flow[Document, MovieNameAndDetailUrl, NotUsed] = {
    Flow[Document]
      .mapConcat(doc => {
        val table = doc >> elementList("table tr")
        val movieLinkTuples = table.flatMap(tr => {
          val name = tr >> elementList("tr b a")
          name.map(
            link => {
              MovieNameAndDetailUrl(link.text, "http://www.the-numbers.com/" + link.attr("href"))
            }
          )
        })
        movieLinkTuples
      })
  }

  def getPipeOut: Sink[MovieNameAndDetailUrl, Future[IOResult]] = Flow[MovieNameAndDetailUrl]
    .map(s => ByteString(s.name + "\n"))
    .toMat(FileIO.toPath(Paths.get("movie.txt")))(Keep.right)

  def buildAndRun: Future[IOResult] = {
    getPipelineSource
      .via(getParseFlow)
      .runWith(getPipeOut)
  }

} 
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:49,代码来源:MovieListPipeline.scala


示例6: offerBlocking

//设置package包名称以及导入依赖的类
package sample.stream

import java.lang.Exception

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source, SourceQueue, SourceQueueWithComplete}

import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.duration._
import scala.util.Random


      def offerBlocking(elem: T, maxWait: Duration = 1.seconds): Future[QueueOfferResult] =
        synchronized {
          //offer returns a Future, which completes with the result of the enqueue operation
          //must only be used from a single thread
          val result = q.offer(elem)
          Await.ready(result, maxWait)
          result
        }
    }

    def asyncOp(userID: Long): Future[String] = {
      Thread.sleep(10) //without waiting time we see a lot of "Future(<not completed>)"
      try {
        if (Random.nextInt() % 2 == 0) {
          println("asyncOp: random exception")
          throw new RuntimeException("random exception")
        }
      } catch {
        case ex: RuntimeException => Future {
          "Exception"
        }
      }
      Future (s"user: $userID")
    }


    val targetQueue =
      Source.queue[Future[String]](Int.MaxValue, OverflowStrategy.backpressure)
        .to(Sink.foreach(println))
        .run()

    val targetSyncQueue = new SyncQueue(targetQueue)

    Source(1 to Int.MaxValue)
      //.throttle(1000, 1.second, 1, ThrottleMode.shaping)
      .mapAsync(1)(x => targetSyncQueue.offerBlocking(asyncOp(x)))
      .runWith(Sink.ignore)
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:53,代码来源:SendingMessagesToStream.scala


示例7: FlowFromGraph

//设置package包名称以及导入依赖的类
package sample.graphDSL

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Sink, Source}
import akka.stream.{ActorMaterializer, FlowShape, UniformFanInShape, UniformFanOutShape}


object FlowFromGraph {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("FlowFromGraph")
    implicit val ec = system.dispatcher
    implicit val materializer = ActorMaterializer()

    val processorFlow1: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
    val processorFlow2: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 3)
    val listOfFlows = List(processorFlow1, processorFlow2)

    def compoundFlowFrom[T](indexFlows: Seq[Flow[T, T, NotUsed]]): Flow[T, T, NotUsed] = {
      require(indexFlows.nonEmpty, "Cannot create compound flow without any flows to combine")

      Flow.fromGraph(GraphDSL.create() { implicit b =>
        import akka.stream.scaladsl.GraphDSL.Implicits._

        val broadcast: UniformFanOutShape[T, T] = b.add(Broadcast(indexFlows.size))
        val merge: UniformFanInShape[T, T] = b.add(Merge(indexFlows.size))

        indexFlows.foreach(broadcast ~> _ ~> merge)

        FlowShape(broadcast.in, merge.out)
      })
    }

    val compoundFlow = compoundFlowFrom(listOfFlows)

    Source(1 to 10)
      .via(compoundFlow)
      .runWith(Sink.foreach(println(_)))
      .onComplete(_ => system.terminate())
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:43,代码来源:FlowFromGraph.scala


示例8: Main

//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.sensor

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Sink}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.pcap.PcapFileRaw.LinkType
import edu.uw.at.iroberts.wirefugue.pcap._
import edu.uw.at.iroberts.wirefugue.kafka.producer.{KafkaKey, PacketProducer}
import edu.uw.at.iroberts.wirefugue.kafka.serdes.PacketSerializer
import edu.uw.at.iroberts.wirefugue.protocol.overlay.{Ethernet, IPV4Datagram}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, IntegerSerializer}

import scala.concurrent.Await
import scala.concurrent.duration._


object Main {

  def main(args: Array[String]): Unit = {
    if (args.length < 1) {
      println("Please specify a filename as the first argument")
      System.exit(1)
    }

    val config = ConfigFactory.load("application.conf")

    implicit val system = ActorSystem("stream-producer-system", config)
    implicit val materializer = ActorMaterializer()

    val producerSettings = ProducerSettings[Integer, Packet](system, None, None)

    val doneF = PcapSource(Paths.get(args(0)).toUri)
      .filter( p => p.network == LinkType.ETHERNET && p.ip.isDefined )
      .map( packet => new ProducerRecord[Integer, Packet]("packets", packet.key.##, packet))
      .map( pr => new ProducerMessage.Message[Integer, Packet, Unit](pr, ()))
      .via(Producer.flow(producerSettings))
      .runWith(Sink.foreach(println))

    try {
      Await.ready(doneF, 10 seconds)
    }
    finally {
      system.terminate()
    }
  }
} 
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:53,代码来源:Main.scala


示例9: stringPersister

//设置package包名称以及导入依赖的类
package services

import javax.inject.{Inject, Singleton}

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink}
import play.api.Logger
import play.api.libs.concurrent.Execution.Implicits._

import scala.concurrent.Future
import scala.util.{Failure, Success}


  def stringPersister(pf: String => Future[Unit]): Flow[String, String, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val persistenceSink = Sink.foreach[String] { content =>
        val f = pf(content)
        f.onComplete {
          case Success(u) => Logger.debug(s"Persisted content: '$content'")
          case Failure(t) => Logger.error(s"Failed to persist content: '$content", t)
        }
      }

      val bcast = builder.add(Broadcast[String](2))
      bcast.out(1) ~> persistenceSink

      FlowShape(bcast.in, bcast.out(0))
    })
} 
开发者ID:snackCake,项目名称:TweetStreamChallenge,代码行数:33,代码来源:PersistenceService.scala


示例10: toPath

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp.scaladsl

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.ftp.impl.{FtpLike, FtpSourceFactory, FtpSourceParams, FtpsSourceParams, SftpSourceParams}
import akka.stream.alpakka.ftp.{FtpFile, RemoteFileSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import net.schmizz.sshj.SSHClient
import org.apache.commons.net.ftp.FTPClient
import scala.concurrent.Future

sealed trait FtpApi[FtpClient] { _: FtpSourceFactory[FtpClient] =>

  
  def toPath(
      path: String,
      connectionSettings: S,
      append: Boolean = false
  ): Sink[ByteString, Future[IOResult]] =
    Sink.fromGraph(createIOSink(path, connectionSettings, append))

  protected[this] implicit def ftpLike: FtpLike[FtpClient, S]
}

object Ftp extends FtpApi[FTPClient] with FtpSourceParams
object Ftps extends FtpApi[FTPClient] with FtpsSourceParams
object Sftp extends FtpApi[SSHClient] with SftpSourceParams 
开发者ID:akka,项目名称:alpakka,代码行数:29,代码来源:FtpApi.scala


示例11: listFiles

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp

import akka.NotUsed
import akka.stream.alpakka.ftp.FtpCredentials.AnonFtpCredentials
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import java.net.InetAddress

trait BaseSftpSpec extends SftpSupportImpl with BaseSpec {

  //#create-settings
  val settings = SftpSettings(
    InetAddress.getByName("localhost"),
    getPort,
    AnonFtpCredentials,
    strictHostKeyChecking = false,
    knownHosts = None,
    sftpIdentity = None
  )
  //#create-settings

  protected def listFiles(basePath: String): Source[FtpFile, NotUsed] =
    Sftp.ls(basePath, settings)

  protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]] =
    Sftp.fromPath(path, settings)

  protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
    Sftp.toPath(path, settings, append)
} 
开发者ID:akka,项目名称:alpakka,代码行数:35,代码来源:BaseSftpSpec.scala


示例12: listFiles

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ftp

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Inside, Matchers, WordSpecLike}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

trait BaseSpec
    extends WordSpecLike
    with Matchers
    with BeforeAndAfter
    with BeforeAndAfterAll
    with ScalaFutures
    with IntegrationPatience
    with Inside
    with AkkaSupport
    with FtpSupport {

  protected def listFiles(basePath: String): Source[FtpFile, NotUsed]

  protected def retrieveFromPath(path: String): Source[ByteString, Future[IOResult]]

  protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]]

  protected def startServer(): Unit

  protected def stopServer(): Unit

  after {
    cleanFiles()
  }

  override protected def beforeAll() = {
    super.beforeAll()
    startServer()
  }

  override protected def afterAll() = {
    stopServer()
    Await.ready(getSystem.terminate(), 42.seconds)
    super.afterAll()
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:49,代码来源:BaseSpec.scala


示例13: IronMqPullStageSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ironmq

import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.ExecutionContext.Implicits.global

class IronMqPullStageSpec extends UnitSpec with IronMqFixture with AkkaStreamFixture {

  "IronMqSourceStage" when {
    "there are messages" should {
      "consume all messages" in {
        val queue = givenQueue()
        val messages = (1 to 100).map(i => PushMessage(s"test-$i"))
        ironMqClient.pushMessages(queue.name, messages: _*).futureValue

        val source = Source.fromGraph(new IronMqPullStage(queue.name, IronMqSettings()))
        val receivedMessages = source.take(100).runWith(Sink.seq).map(_.map(_.message.body)).futureValue
        val expectedMessages = messages.map(_.body)

        receivedMessages should contain theSameElementsInOrderAs expectedMessages
      }
    }
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:25,代码来源:IronMqPullStageSpec.scala


示例14: MemoryBufferSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures

class MemoryBufferSpec(_system: ActorSystem)
    extends TestKit(_system)
    with FlatSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures {

  def this() = this(ActorSystem("MemoryBufferSpec"))

  implicit val defaultPatience =
    PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))

  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))

  "MemoryBuffer" should "emit a chunk on its output containg the concatenation of all input values" in {
    val result = Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
      .via(new MemoryBuffer(200))
      .runWith(Sink.seq)
      .futureValue

    result should have size (1)
    val chunk = result.head
    chunk.size should be(14)
    chunk.data.runWith(Sink.seq).futureValue should be(Seq(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)))
  }

  it should "fail if more than maxSize bytes are fed into it" in {
    whenReady(
      Source(Vector(ByteString(1, 2, 3, 4, 5), ByteString(6, 7, 8, 9, 10, 11, 12), ByteString(13, 14)))
        .via(new MemoryBuffer(10))
        .runWith(Sink.seq)
        .failed
    ) { e =>
      e shouldBe a[IllegalStateException]
    }
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:49,代码来源:MemoryBufferSpec.scala


示例15: DynamoClientImpl

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.dynamodb.impl

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.MediaType.NotCompressible
import akka.http.scaladsl.model.{ContentType, MediaType}
import akka.stream.Materializer
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.impl.AwsClient.{AwsConnect, AwsRequestMetadata}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.AmazonServiceException
import com.amazonaws.http.HttpResponseHandler

class DynamoClientImpl(
    val settings: DynamoSettings,
    val errorResponseHandler: HttpResponseHandler[AmazonServiceException]
)(implicit protected val system: ActorSystem, implicit protected val materializer: Materializer)
    extends AwsClient[DynamoSettings] {

  override protected val service = "dynamodb"
  override protected val defaultContentType =
    ContentType.Binary(MediaType.customBinary("application", "x-amz-json-1.0", NotCompressible))
  override protected implicit val ec = system.dispatcher

  override protected val connection: AwsConnect =
    if (settings.port == 443)
      Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host)(materializer)
    else
      Http().cachedHostConnectionPool[AwsRequestMetadata](settings.host, settings.port)(materializer)

  def single(op: AwsOp) = Source.single(op).via(flow).map(_.asInstanceOf[op.B]).runWith(Sink.head)

} 
开发者ID:akka,项目名称:alpakka,代码行数:34,代码来源:DynamoClientImpl.scala


示例16: GeodeFiniteSourceSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.scaladsl

import akka.stream.scaladsl.Sink
import org.slf4j.LoggerFactory

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

class GeodeFiniteSourceSpec extends GeodeBaseSpec {

  private val log = LoggerFactory.getLogger(classOf[GeodeFiniteSourceSpec])

  "Geode finite source" should {
    it { geodeSettings =>
      "retrieves finite elements from geode" in {

        val reactiveGeode = new ReactiveGeode(geodeSettings)

        //#query
        val source =
          reactiveGeode
            .query[Person](s"select * from /persons order by id")
            .runWith(Sink.foreach(e => log.debug(s"$e")))
        //#query
        Await.ready(source, 10 seconds)

        val animals =
          reactiveGeode
            .query[Animal](s"select * from /animals order by id")
            .runWith(Sink.foreach(e => log.debug(s"$e")))

        Await.ready(animals, 10 seconds)

        val complexes =
          reactiveGeode
            .query[Complex](s"select * from /complexes order by id")
            .runWith(Sink.foreach(e => log.debug(s"$e")))

        Await.ready(complexes, 10 seconds)

        reactiveGeode.close()
      }
    }
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:47,代码来源:GeodeFiniteSourceSpec.scala


示例17: GeodeContinuousSourceSpec

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.scaladsl

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Sink}
import org.slf4j.LoggerFactory

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

class GeodeContinuousSourceSpec extends GeodeBaseSpec {

  private val log = LoggerFactory.getLogger(classOf[GeodeContinuousSourceSpec])

  "Geode continuousQuery" should {
    it { geodeSettings =>
      "retrieves continuously elements from geode" in {

        //#connection-with-pool
        val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
        //#connection-with-pool

        val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings)

        //#continuousQuery
        val source =
          reactiveGeode
            .continuousQuery[Person]('test, s"select * from /persons")
            .runWith(Sink.fold(0) { (c, p) =>
              log.debug(s"$p $c")
              if (c == 19) {
                reactiveGeode.closeContinuousQuery('test).foreach { _ =>
                  log.debug("test cQuery is closed")
                }

              }
              c + 1
            })
        //#continuousQuery

        val f = buildPersonsSource(1 to 20)
          .via(flow) //geode flow
          .runWith(Sink.ignore)

        Await.result(f, 10 seconds)

        Await.result(source, 5 seconds)

        reactiveGeode.close()
      }
    }
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:54,代码来源:GeodeContinuousSourceSpec.scala


示例18: AllTogether_

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.Await
import scala.concurrent.duration._

object AllTogether_ extends App {
  implicit val system = ActorSystem()
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val result = Source(1 to 5)
    .via(new FilterStage_.Filter(_ % 2 < 10))
    .via(new DuplicatorShape_.Duplicator())
    .via(new MapShape_.Map[Int, Int](_ / 2))
    .runWith(Sink.seq)

  val r = Await.result(result, 300 millis)
  println(r)

  system.terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:26,代码来源:AllTogether_.scala


示例19: InputCustomer

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.util.Random

object InputCustomer {
  def random(): InputCustomer = {
    InputCustomer(s"FirstName${Random.nextInt(1000)} " +
      s"LastName${Random.nextInt(1000)}")
  }
}

case class InputCustomer(name: String)

case class OutputCustomer(firstName: String, lastName: String)

object CustomersExample extends App {
  implicit val actorSystem = ActorSystem()

  import actorSystem.dispatcher

  implicit val materializer = ActorMaterializer()

  val inputCustomers = Source((1 to 100).map(_ => InputCustomer.random()))

  val normalize = Flow[InputCustomer]
    .map(c => c.name.split(" ").toList)
    .collect {
      case firstName :: lastName :: Nil => OutputCustomer(firstName, lastName)
    }

  val writeCustomers = Sink.foreach { println} // ?-conversion

  inputCustomers.via(normalize).runWith(writeCustomers).andThen {
    case _ => actorSystem.terminate()
  }
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:40,代码来源:CustomersExample.scala


示例20: SourceIsImmutable

//设置package包名称以及导入依赖的类
package akka_in_action.streams

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.Await
import scala.concurrent.duration._

object SourceIsImmutable extends App {
  implicit val system = ActorSystem("actor-system")

  import system.dispatcher

  implicit val materializer = ActorMaterializer()

  val source = Source(1 to 10)
  source.map(_ => 0)
  // has no effect on source, since it's immutable
  val runnable1 = source.runWith(Sink.fold(0)(_ + _)) // 55

  val zeroes = source.map(_ => 0)
  // returns new Source[Int], with `map()` appended
  val runnable2 = zeroes.runWith(Sink.fold(0)(_ + _)) // 0

  val results = for {
    res1 <- runnable1
    res2 <- runnable2
  } yield (res1, res2)

  val pair = Await.result(results, 500 millis)
  println(pair)
  system terminate()
} 
开发者ID:rockdragon,项目名称:fourthgala,代码行数:35,代码来源:SourceIsImmutable.scala



注:本文中的akka.stream.scaladsl.Sink类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala RunWith类代码示例发布时间:2022-05-23
下一篇:
Scala MockitoSugar类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap