本文整理汇总了Scala中akka.stream.scaladsl.Keep类的典型用法代码示例。如果您正苦于以下问题:Scala Keep类的具体用法?Scala Keep怎么用?Scala Keep使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Keep类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Pusher
//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.pusher
import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
import reactivehub.akka.stream.apns.Environment._
import reactivehub.akka.stream.apns.TlsUtil._
import reactivehub.akka.stream.apns._
import reactivehub.akka.stream.apns.marshallers.SprayJsonSupport
object Pusher extends SprayJsonSupport {
val kafka = "192.168.99.100:9092"
val clientId = "pusher1"
val consumerGroup = "pusher"
val topics = Set("notifications")
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
import system.dispatcher
def main(args: Array[String]): Unit = {
val group = new NioEventLoopGroup()
val apns = ApnsExt(system).connection[Long](Development, sslContext, group)
Consumer.atMostOnceSource(consumerSettings)
.map(msg => msg.key -> toNotification(msg.value))
.filter(_._2.deviceToken.bytes.length < 100)
.viaMat(apns)(Keep.right)
.log("pusher", _.toString())
.to(Sink.ignore).run()
.onComplete { _ =>
group.shutdownGracefully()
system.terminate()
}
}
private def sslContext: SslContext =
loadPkcs12FromResource("/cert.p12", "password")
private def consumerSettings: ConsumerSettings[Long, PushData] =
ConsumerSettings(system, ScalaLongDeserializer, PushDataDeserializer, topics)
.withBootstrapServers(kafka)
.withClientId(clientId)
.withGroupId(consumerGroup)
.withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
private def toNotification(pushData: PushData): Notification = {
var builder = Payload.Builder()
pushData.alert.foreach(alert => builder = builder.withAlert(alert))
pushData.badge.foreach(badge => builder = builder.withBadge(badge))
Notification(DeviceToken(pushData.token), builder.result)
}
}
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:60,代码来源:Pusher.scala
示例2: HTableStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase.javadsl
import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.internal.HBaseFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import scala.collection.immutable
import scala.concurrent.Future
object HTableStage {
def table[T](conf: Configuration,
tableName: TableName,
columnFamilies: java.util.List[String],
converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
import scala.compat.java8.FunctionConverters._
import scala.collection.JavaConverters._
HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
}
def sink[A](config: HTableSettings[A]): akka.stream.javadsl.Sink[A, Future[Done]] =
Flow[A].via(flow(config)).toMat(Sink.ignore)(Keep.right).asJava
def flow[A](settings: HTableSettings[A]): akka.stream.javadsl.Flow[A, A, NotUsed] =
Flow.fromGraph(new HBaseFlowStage[A](settings)).asJava
}
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:HTableStage.scala
示例3: CassandraSink
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.cassandra.scaladsl
import akka.Done
import akka.stream.scaladsl.{Flow, Keep, Sink}
import com.datastax.driver.core.{BoundStatement, PreparedStatement, Session}
import scala.concurrent.{ExecutionContext, Future}
import akka.stream.alpakka.cassandra.GuavaFutures._
object CassandraSink {
def apply[T](
parallelism: Int,
statement: PreparedStatement,
statementBinder: (T, PreparedStatement) => BoundStatement
)(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
Flow[T]
.mapAsyncUnordered(parallelism)(t ? session.executeAsync(statementBinder(t, statement)).asScala())
.toMat(Sink.ignore)(Keep.right)
}
开发者ID:akka,项目名称:alpakka,代码行数:21,代码来源:CassandraSink.scala
示例4: MovieListPipeline
//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines
import java.nio.file.Paths
import akka.NotUsed
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import com.stacktrace.yo.scrapeline.core.ScrapeClient.jsoup
import com.stacktrace.yo.scrapeline.core._
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.model.Document
import net.ruippeixotog.scalascraper.scraper.ContentExtractors.elementList
import scala.concurrent.Future
class MovieListPipeline(implicit val m: ActorMaterializer) {
def getPipelineSource: Source[jsoup.DocumentType, NotUsed] = Source.single(ScrapeClient.scrape("http://www.the-numbers.com/movie/budgets/all"))
def getParseFlow: Flow[Document, MovieNameAndDetailUrl, NotUsed] = {
Flow[Document]
.mapConcat(doc => {
val table = doc >> elementList("table tr")
val movieLinkTuples = table.flatMap(tr => {
val name = tr >> elementList("tr b a")
name.map(
link => {
MovieNameAndDetailUrl(link.text, "http://www.the-numbers.com/" + link.attr("href"))
}
)
})
movieLinkTuples
})
}
def getPipeOut: Sink[MovieNameAndDetailUrl, Future[IOResult]] = Flow[MovieNameAndDetailUrl]
.map(s => ByteString(s.name + "\n"))
.toMat(FileIO.toPath(Paths.get("movie.txt")))(Keep.right)
def buildAndRun: Future[IOResult] = {
getPipelineSource
.via(getParseFlow)
.runWith(getPipeOut)
}
}
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:49,代码来源:MovieListPipeline.scala
示例5: PrintMoreNumbers
//设置package包名称以及导入依赖的类
package sample.stream_actor_simple
import akka.actor.Actor
import akka.stream.{ActorMaterializer, KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
class PrintMoreNumbers(implicit materializer: ActorMaterializer) extends Actor {
private implicit val executionContext = context.system.dispatcher
private val (killSwitch: UniqueKillSwitch, done) =
Source.tick(0.seconds, 1.second, 1)
.scan(0)(_ + _)
.map(_.toString)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(println))(Keep.both)
.run()
done.map(_ => self ! "done")
override def receive: Receive = {
//When the actor is stopped, it will also stop the stream
case "stop" =>
println("Stopping")
killSwitch.shutdown()
case "done" =>
println("Done")
context.stop(self)
context.system.terminate()
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:33,代码来源:PrintMoreNumbers.scala
示例6: PacketConsumer
//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.consumer
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.kafka.producer.KafkaKey
import edu.uw.at.iroberts.wirefugue.kafka.serdes.{PacketDeserializer, PacketSerde}
import edu.uw.at.iroberts.wirefugue.pcap.Packet
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.IntegerDeserializer
import scala.concurrent.Await
import scala.concurrent.duration._
object PacketConsumer extends App {
type PacketRecord = ConsumerRecord[KafkaKey, Array[Byte]]
val config = ConfigFactory.load("application.conf")
implicit val system = ActorSystem("stream-consumer-system", config)
implicit val materializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, new IntegerDeserializer, new PacketDeserializer)
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
// Separate streams for each partition
val maxPartitions = 100
val consumerGroup = Consumer.plainPartitionedSource(consumerSettings, Subscriptions.topics("packets"))
val done = consumerGroup.map {
case (topicPartition, source) =>
val p: Int = topicPartition.partition
source
.map { (cr: ConsumerRecord[Integer, Packet]) => cr.value() }
.filter(_.ip.isDefined)
.toMat(Sink.foreach(packet => println(s"[$p] $packet")))(Keep.both)
.run()
}
.mapAsyncUnordered(maxPartitions)(_._2)
.runWith(Sink.ignore)
Await.result(done, Duration.Inf)
system.terminate()
}
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:50,代码来源:PacketConsumer.scala
示例7: encodeHex
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3
import java.security.MessageDigest
import javax.xml.bind.DatatypeConverter
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.util.ByteString
import scala.concurrent.Future
package object auth {
def encodeHex(bytes: Array[Byte]): String = DatatypeConverter.printHexBinary(bytes).toLowerCase
def encodeHex(bytes: ByteString): String = encodeHex(bytes.toArray)
def digest(algorithm: String = "SHA-256"): Sink[ByteString, Future[ByteString]] =
Flow[ByteString]
.fold(MessageDigest.getInstance(algorithm)) {
case (digest, bytes) =>
digest.update(bytes.asByteBuffer)
digest
}
.map(d => ByteString(d.digest()))
.toMat(Sink.head[ByteString])(Keep.right)
}
开发者ID:akka,项目名称:alpakka,代码行数:26,代码来源:package.scala
示例8: AccumulateWhileUnchangedTest
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.scalatest.FunSuite
import scala.collection.immutable
class AccumulateWhileUnchangedTest extends FunSuite {
implicit val system = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
val windowSize = 1000
def windowStart(timestamp: Long): Long = {
timestamp - (timestamp % windowSize)
}
val numberOfElements = 3000
test("accumulation of two windows on element stream") {
val firstWindowElements = immutable.Seq.range(1, 999, 10).map(t => SimpleRecord(1)(t))
val secondWindowElements = immutable.Seq.range(1000, 1999, 10).map(t => SimpleRecord(1)(t))
val records = firstWindowElements ++ secondWindowElements
val s = TestSink.probe[Seq[SimpleRecord]]
val (_, sink) = Source(records)
.via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
.toMat(s)(Keep.both)
.run()
sink.request(numberOfElements)
sink.expectNext(firstWindowElements, secondWindowElements)
sink.expectComplete()
}
test("accumulation on empty stream") {
val s = TestSink.probe[Seq[SimpleRecord]]
val records = List[SimpleRecord]()
val (_, sink) = Source(records)
.via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
.toMat(s)(Keep.both)
.run()
sink.request(numberOfElements)
sink.expectComplete()
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:57,代码来源:AccumulateWhileUnchangedTest.scala
示例9: IgnoreLastElementsTest
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.FunSuite
import scala.collection.immutable
class IgnoreLastElementsTest extends FunSuite {
implicit val system = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
test("happy list") {
val start = 0
val end = 10
val values = immutable.Seq.range(start, end)
val ignoreCount = 2
val s = TestSink.probe[Int]
val (_, sink) = Source(values)
.via(new IgnoreLastElements[Int](ignoreCount))
.toMat(s)(Keep.both)
.run()
val numberOfElements = end
sink.request(numberOfElements)
values.dropRight(ignoreCount).foreach(v => sink.expectNext(v))
sink.expectComplete()
}
test("empty list") {
val s = TestSink.probe[Int]
val (_, sink) = Source(List[Int]())
.via(new IgnoreLastElements[Int](ignoreCount = 2))
.toMat(s)(Keep.both)
.run()
val numberOfElements = 1
sink.request(numberOfElements)
sink.expectComplete()
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:50,代码来源:IgnoreLastElementsTest.scala
示例10: running
//设置package包名称以及导入依赖的类
package producers
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import scala.concurrent.Future
trait Producerable extends ActorBroker {
val config: AppConfig
implicit val materializer = ActorMaterializer()
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")
def running(): Receive = {
case Stop =>
log.info("Stopping Kafka producer stream and actor")
context.stop(self)
}
def sendToSink(message: String): Unit = {
log.info(s"Attempting to produce message on topic $topicName")
val kafkaSink = Producer.plainSink(producerSettings)
val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
.toMat(kafkaSink)(Keep.both)
.run()
future.onFailure {
case ex =>
log.error("Stream failed due to error, restarting", ex)
throw ex
}
context.become(running())
log.info(s"Writer now running, writing random numbers to topic $topicName")
}
case object Stop
}
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala
示例11: EventEncoderSpec
//设置package包名称以及导入依赖的类
package client.protocol
import java.security.KeyPairGenerator
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.cryptoutility.protocol.Events._
import com.cryptoutility.protocol.EventSerializer
import org.scalatestplus.play.PlaySpec
import play.api.http.websocket.BinaryMessage
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
class EventEncoderSpec extends PlaySpec{
implicit val system = ActorSystem("test-system")
implicit val mat = ActorMaterializer()
def publicKey = {
KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic
}
"event encode" should{
"encode Event to a byte stream" in {
val expected = Initialized(isNew = false, UserInfo("James", "Carl", "[email protected]", publicKey, UUID.randomUUID().toString))
val f =
Source.single(expected)
.via(new EventEncoder)
.toMat(Sink.head)(Keep.right).run()
val bytes = Await.result(f, 500 millis).asInstanceOf[BinaryMessage].data.toArray
val actual = EventSerializer.deserialize(bytes)
actual mustBe expected
}
}
}
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:45,代码来源:EventEncoderSpec.scala
示例12: EventDecoderSpec
//设置package包名称以及导入依赖的类
package client.protocol
import java.security.KeyPairGenerator
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import com.cryptoutility.protocol.Events._
import com.cryptoutility.protocol.EventSerializer
import org.scalatestplus.play.PlaySpec
import play.api.http.websocket.BinaryMessage
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
class EventDecoderSpec extends PlaySpec{
implicit val system = ActorSystem("test-system")
implicit val mat = ActorMaterializer()
val id = () => UUID.randomUUID().toString
def publicKey = {
KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic
}
"event decode" should{
"decode byte streams to an event" in {
val expected = Initialized(true, UserInfo("James", "Carl", "[email protected]", publicKey, id()))
val serialized = EventSerializer.serialize(expected)
val f =
Source.single(BinaryMessage(ByteString(serialized)))
.via(new EventDecoder)
.toMat(Sink.head)(Keep.right).run()
val actual = Await.result(f, 500 millis)
actual mustBe expected
}
"of an invalid byte stream" in {
a [InvalidFormatException] should be thrownBy{
val data = new Array[Byte](256)
Random.nextBytes(data)
EventSerializer.deserialize(data)
}
}
}
}
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:55,代码来源:EventDecoderSpec.scala
示例13: KafkaWriter
//设置package包名称以及导入依赖的类
package wiii
import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}
object KafkaWriter {
def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}
class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
override def preStart(): Unit = initWriter()
override def receive: Receive = {
case _ =>
}
def initWriter(): Unit = {
val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = topicName,
valueSerializer = TweetSerializer
))
val subscriber = context.actorOf(subscriberProps)
val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
val factory = new TwitterStreamFactory()
val twitterStream = factory.getInstance()
twitterStream.addListener(new StatusForwarder(actorRef))
twitterStream.filter(new FilterQuery("espn"))
Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
.runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
}
}
class StatusForwarder(publisher: ActorRef) extends StatusListener {
def onStatus(status: Status): Unit = publisher ! status
//\\ nop all the others for now //\\
def onStallWarning(warning: StallWarning): Unit = {}
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
def onException(ex: Exception): Unit = {}
}
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala
示例14: FullStream
//设置package包名称以及导入依赖的类
package com.yannick_cw.elastic_indexer4s.indexing_logic
import akka.NotUsed
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.yannick_cw.elastic_indexer4s.Index_results.{IndexError, StageSucceeded, StageSuccess}
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
object FullStream extends LazyLogging {
private def countAntLogSink[A](logPer: FiniteDuration): Sink[A, Future[Int]] = Flow[A]
.groupedWithin(Int.MaxValue, logPer)
.map(_.length)
.map { elementsPerTime =>
logger.info(s"Indexed $elementsPerTime elements last $logPer")
elementsPerTime
}.toMat(Sink.reduce[Int](_ + _))(Keep.right)
def run[A](source: Source[A, NotUsed], sink: Sink[A, Future[Unit]], logSpeedInterval: FiniteDuration)
(implicit materializer: ActorMaterializer, ex: ExecutionContext): Future[Either[IndexError, StageSucceeded]] =
(for {
count <- source
.alsoToMat(countAntLogSink(logSpeedInterval))(Keep.right)
.toMat(sink)(Keep.both)
.mapMaterializedValue{ case(fCount, fDone) => fDone.flatMap(_ => fCount) }
.run()
} yield Right(StageSuccess(s"Indexed $count documents successfully")))
.recover { case NonFatal(t) =>
Left(IndexError("Writing documents failed.", Some(t)))
}
}
开发者ID:yannick-cw,项目名称:elastic-indexer4s,代码行数:36,代码来源:FullStream.scala
示例15: validate
//设置package包名称以及导入依赖的类
package csw.services.config.server.files
import java.nio.file.{Path, Paths}
import akka.stream.scaladsl.{FileIO, Keep}
import csw.services.config.api.models.ConfigData
import csw.services.config.server.commons.ConfigServerLogger
import csw.services.config.server.{ActorRuntime, Settings}
import scala.async.Async._
import scala.concurrent.Future
def validate(id: String, path: Path): Future[Boolean] = async {
id == await(Sha1.fromPath(path))
}
def saveAndSha(configData: ConfigData): Future[(Path, String)] = async {
val path = await(fileRepo.createTempFile("config-service-overize-", ".tmp"))
val (resultF, shaF) = configData.source
.alsoToMat(FileIO.toPath(path))(Keep.right)
.toMat(Sha1.sink)(Keep.both)
.run()
await(resultF).status.get
(path, await(shaF))
}
}
开发者ID:tmtsoftware,项目名称:csw-prod,代码行数:29,代码来源:AnnexFileService.scala
示例16: Sha1
//设置package包名称以及导入依赖的类
package csw.services.config.server.files
import java.nio.file.Path
import java.security.MessageDigest
import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString
import csw.services.config.api.models.ConfigData
import scala.concurrent.Future
object Sha1 {
private def fromSource(source: Source[ByteString, Any])(implicit mat: Materializer): Future[String] =
source.runWith(sink)
def fromConfigData(configData: ConfigData)(implicit mat: Materializer): Future[String] =
fromSource(configData.source)
def fromPath(path: Path)(implicit mat: Materializer): Future[String] =
fromSource(FileIO.fromPath(path))
//Keep this a def so that the digester is created anew each time.
def sink: Sink[ByteString, Future[String]] = {
val sha1Digester = MessageDigest.getInstance("SHA-1")
Flow[ByteString]
.fold(sha1Digester) { (digester, bs) =>
digester.update(bs.toArray)
digester
}
.mapConcat(_.digest().toList)
.map(_ & 0xFF)
.map("%02x" format _)
.toMat(Sink.fold("")(_ + _))(Keep.right)
}
}
开发者ID:tmtsoftware,项目名称:csw-prod,代码行数:40,代码来源:Sha1.scala
示例17: queueRequest
//设置package包名称以及导入依赖的类
package rest.client
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import rest.client.HttpRequestQueue._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
trait HttpRequestQueue {
implicit val system: ActorSystem
implicit val mat: ActorMaterializer
implicit val ec: ExecutionContext
val QueueSize: Int
val poolClientFlow: Flow[(HttpRequest, Promise[HttpResponse]),
(Try[HttpResponse], Promise[HttpResponse]),
Http.HostConnectionPool]
private val queue =
Source
.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.backpressure)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(httpResponse), promisedResponse)) => promisedResponse.success(httpResponse)
case ((Failure(throwable), promisedResponse)) => promisedResponse.failure(throwable)
}))(Keep.left)
.run()
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue
.offer(request -> responsePromise)
.map {
case QueueOfferResult.Enqueued => responsePromise
case QueueOfferResult.Dropped => responsePromise.failure(new RuntimeException(QUEUE_OVERFLOW))
case QueueOfferResult.QueueClosed => responsePromise.failure(new RuntimeException(QUEUE_CLOSED))
case QueueOfferResult.Failure(ex) => responsePromise.failure(ex)
}
.flatMap(_.future)
}
}
object HttpRequestQueue {
private val QUEUE_OVERFLOW: String = "Queue overflowed. Try again later."
private val QUEUE_CLOSED: String = "Queue was closed (pool shut down) while running the request. Try again later."
}
开发者ID:lymr,项目名称:fun-chat,代码行数:53,代码来源:HttpRequestQueue.scala
示例18: KafkaPushQueue
//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager
import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.producer.ProducerRecord
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future
class KafkaPushQueue(topic: String, settings: ProducerSettings[Long, PushData])
extends PushQueue {
override def pushDataSink: Sink[(Long, PushData), Future[Long]] =
Flow[(Long, PushData)]
.map {
case (key, value) => Message(
new ProducerRecord[Long, PushData]("notifications", key, value), key)
}
.via(Producer.flow(settings))
.toMat(Sink.fold(0L)({ case (acc, _) => acc + 1}))(Keep.right)
}
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:23,代码来源:KafkaPushQueue.scala
示例19: HomeControllerSpec
//设置package包名称以及导入依赖的类
package controllers
import actors.TestKitSpec
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.testkit.TestProbe
import org.scalatest.MustMatchers
import play.api.libs.json.{JsValue, Json}
import scala.concurrent.ExecutionContext
class HomeControllerSpec extends TestKitSpec with MustMatchers {
"createWebSocketFlow" should {
"create a websocket flow and send a message through" in {
implicit val materializer = ActorMaterializer()(system)
implicit val ec: ExecutionContext = system.dispatcher
val stocksActor = TestProbe("stocksActor")
val userParentActor = TestProbe("userParentActor")
val userActor = TestProbe("userActor")
// http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-testkit.html
val publisher = akka.stream.testkit.TestPublisher.probe[JsValue]()
// instantiate the controller...
val controller = new HomeController(stocksActor.ref, userParentActor.ref)
// call method under test...
val flowUnderTest = controller.createWebSocketFlow(publisher, userActor.ref)
// create a test source and sink around the flow
val (pub, sub) = TestSource.probe[JsValue]
.via(flowUnderTest)
.toMat(TestSink.probe[JsValue])(Keep.both)
.run()
val jsvalue = Json.obj("herp" -> "derp")
// check that a message sent in will come out the other end
sub.request(n = 1)
publisher.sendNext(jsvalue)
sub.expectNext(jsvalue)
}
}
}
开发者ID:ChrisCooper,项目名称:customer-feedback-screen,代码行数:52,代码来源:HomeControllerSpec.scala
示例20: produceRecord
//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit
import java.util.UUID
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}
import scala.concurrent.Future
trait KafkaTest {
def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
Source.single(new ProducerRecord("mail.command.send", key, value))
.toMat(Producer.plainSink(producerSettings))(Keep.right)
.run()
}
def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
.withGroupId(UUID.randomUUID.toString)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.filter(pf.isDefinedAt)
.map(pf)
.toMat(Sink.head)(Keep.right)
.run()
}
}
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala
注:本文中的akka.stream.scaladsl.Keep类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论