• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Keep类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.scaladsl.Keep的典型用法代码示例。如果您正苦于以下问题:Scala Keep类的具体用法?Scala Keep怎么用?Scala Keep使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Keep类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Pusher

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.pusher

import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.handler.ssl.SslContext
import org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
import reactivehub.akka.stream.apns.Environment._
import reactivehub.akka.stream.apns.TlsUtil._
import reactivehub.akka.stream.apns._
import reactivehub.akka.stream.apns.marshallers.SprayJsonSupport

object Pusher extends SprayJsonSupport {
  val kafka = "192.168.99.100:9092"
  val clientId = "pusher1"
  val consumerGroup = "pusher"
  val topics = Set("notifications")

  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  import system.dispatcher

  def main(args: Array[String]): Unit = {
    val group = new NioEventLoopGroup()
    val apns = ApnsExt(system).connection[Long](Development, sslContext, group)

    Consumer.atMostOnceSource(consumerSettings)
      .map(msg => msg.key -> toNotification(msg.value))
      .filter(_._2.deviceToken.bytes.length < 100)
      .viaMat(apns)(Keep.right)
      .log("pusher", _.toString())
      .to(Sink.ignore).run()
      .onComplete { _ =>
        group.shutdownGracefully()
        system.terminate()
      }
  }

  private def sslContext: SslContext =
    loadPkcs12FromResource("/cert.p12", "password")

  private def consumerSettings: ConsumerSettings[Long, PushData] =
    ConsumerSettings(system, ScalaLongDeserializer, PushDataDeserializer, topics)
      .withBootstrapServers(kafka)
      .withClientId(clientId)
      .withGroupId(consumerGroup)
      .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")

  private def toNotification(pushData: PushData): Notification = {
    var builder = Payload.Builder()
    pushData.alert.foreach(alert => builder = builder.withAlert(alert))
    pushData.badge.foreach(badge => builder = builder.withBadge(badge))
    Notification(DeviceToken(pushData.token), builder.result)
  }
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:60,代码来源:Pusher.scala


示例2: HTableStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase.javadsl

import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.internal.HBaseFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put

import scala.collection.immutable
import scala.concurrent.Future

object HTableStage {

  def table[T](conf: Configuration,
               tableName: TableName,
               columnFamilies: java.util.List[String],
               converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
    import scala.compat.java8.FunctionConverters._
    import scala.collection.JavaConverters._
    HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
  }

  def sink[A](config: HTableSettings[A]): akka.stream.javadsl.Sink[A, Future[Done]] =
    Flow[A].via(flow(config)).toMat(Sink.ignore)(Keep.right).asJava

  def flow[A](settings: HTableSettings[A]): akka.stream.javadsl.Flow[A, A, NotUsed] =
    Flow.fromGraph(new HBaseFlowStage[A](settings)).asJava

} 
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:HTableStage.scala


示例3: CassandraSink

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.cassandra.scaladsl

import akka.Done
import akka.stream.scaladsl.{Flow, Keep, Sink}
import com.datastax.driver.core.{BoundStatement, PreparedStatement, Session}

import scala.concurrent.{ExecutionContext, Future}

import akka.stream.alpakka.cassandra.GuavaFutures._

object CassandraSink {
  def apply[T](
      parallelism: Int,
      statement: PreparedStatement,
      statementBinder: (T, PreparedStatement) => BoundStatement
  )(implicit session: Session, ex: ExecutionContext): Sink[T, Future[Done]] =
    Flow[T]
      .mapAsyncUnordered(parallelism)(t ? session.executeAsync(statementBinder(t, statement)).asScala())
      .toMat(Sink.ignore)(Keep.right)
} 
开发者ID:akka,项目名称:alpakka,代码行数:21,代码来源:CassandraSink.scala


示例4: MovieListPipeline

//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines

import java.nio.file.Paths

import akka.NotUsed
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import com.stacktrace.yo.scrapeline.core.ScrapeClient.jsoup
import com.stacktrace.yo.scrapeline.core._
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.model.Document
import net.ruippeixotog.scalascraper.scraper.ContentExtractors.elementList

import scala.concurrent.Future

class MovieListPipeline(implicit val m: ActorMaterializer) {

  def getPipelineSource: Source[jsoup.DocumentType, NotUsed] = Source.single(ScrapeClient.scrape("http://www.the-numbers.com/movie/budgets/all"))

  def getParseFlow: Flow[Document, MovieNameAndDetailUrl, NotUsed] = {
    Flow[Document]
      .mapConcat(doc => {
        val table = doc >> elementList("table tr")
        val movieLinkTuples = table.flatMap(tr => {
          val name = tr >> elementList("tr b a")
          name.map(
            link => {
              MovieNameAndDetailUrl(link.text, "http://www.the-numbers.com/" + link.attr("href"))
            }
          )
        })
        movieLinkTuples
      })
  }

  def getPipeOut: Sink[MovieNameAndDetailUrl, Future[IOResult]] = Flow[MovieNameAndDetailUrl]
    .map(s => ByteString(s.name + "\n"))
    .toMat(FileIO.toPath(Paths.get("movie.txt")))(Keep.right)

  def buildAndRun: Future[IOResult] = {
    getPipelineSource
      .via(getParseFlow)
      .runWith(getPipeOut)
  }

} 
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:49,代码来源:MovieListPipeline.scala


示例5: PrintMoreNumbers

//设置package包名称以及导入依赖的类
package sample.stream_actor_simple

import akka.actor.Actor
import akka.stream.{ActorMaterializer, KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.duration._

class PrintMoreNumbers(implicit materializer: ActorMaterializer) extends Actor {
  private implicit val executionContext = context.system.dispatcher

  private val (killSwitch: UniqueKillSwitch, done) =
    Source.tick(0.seconds, 1.second, 1)
      .scan(0)(_ + _)
      .map(_.toString)
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  done.map(_ => self ! "done")

  override def receive: Receive = {
    //When the actor is stopped, it will also stop the stream
    case "stop" =>
      println("Stopping")
      killSwitch.shutdown()
    case "done" =>
      println("Done")
      context.stop(self)
      context.system.terminate()
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:33,代码来源:PrintMoreNumbers.scala


示例6: PacketConsumer

//设置package包名称以及导入依赖的类
package edu.uw.at.iroberts.wirefugue.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.typesafe.config.ConfigFactory
import edu.uw.at.iroberts.wirefugue.kafka.producer.KafkaKey
import edu.uw.at.iroberts.wirefugue.kafka.serdes.{PacketDeserializer, PacketSerde}
import edu.uw.at.iroberts.wirefugue.pcap.Packet
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.IntegerDeserializer

import scala.concurrent.Await
import scala.concurrent.duration._


object PacketConsumer extends App {
  type PacketRecord = ConsumerRecord[KafkaKey, Array[Byte]]
  val config = ConfigFactory.load("application.conf")

  implicit val system = ActorSystem("stream-consumer-system", config)
  implicit val materializer = ActorMaterializer()

  val consumerSettings = ConsumerSettings(system, new IntegerDeserializer, new PacketDeserializer)
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  // Separate streams for each partition
  val maxPartitions = 100
  val consumerGroup = Consumer.plainPartitionedSource(consumerSettings, Subscriptions.topics("packets"))

  val done = consumerGroup.map {
    case (topicPartition, source) =>
      val p: Int = topicPartition.partition
      source
        .map { (cr: ConsumerRecord[Integer, Packet]) => cr.value() }
        .filter(_.ip.isDefined)
        .toMat(Sink.foreach(packet => println(s"[$p] $packet")))(Keep.both)
        .run()
  }
    .mapAsyncUnordered(maxPartitions)(_._2)
    .runWith(Sink.ignore)

  Await.result(done, Duration.Inf)

  system.terminate()
} 
开发者ID:robertson-tech,项目名称:wirefugue,代码行数:50,代码来源:PacketConsumer.scala


示例7: encodeHex

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3

import java.security.MessageDigest
import javax.xml.bind.DatatypeConverter

import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.util.ByteString

import scala.concurrent.Future

package object auth {
  def encodeHex(bytes: Array[Byte]): String = DatatypeConverter.printHexBinary(bytes).toLowerCase

  def encodeHex(bytes: ByteString): String = encodeHex(bytes.toArray)

  def digest(algorithm: String = "SHA-256"): Sink[ByteString, Future[ByteString]] =
    Flow[ByteString]
      .fold(MessageDigest.getInstance(algorithm)) {
        case (digest, bytes) =>
          digest.update(bytes.asByteBuffer)
          digest
      }
      .map(d => ByteString(d.digest()))
      .toMat(Sink.head[ByteString])(Keep.right)
} 
开发者ID:akka,项目名称:alpakka,代码行数:26,代码来源:package.scala


示例8: AccumulateWhileUnchangedTest

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.scalatest.FunSuite

import scala.collection.immutable


class AccumulateWhileUnchangedTest extends FunSuite {

  implicit val system = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
  val windowSize = 1000
  def windowStart(timestamp: Long): Long = {
    timestamp - (timestamp % windowSize)
  }

  val numberOfElements = 3000

  test("accumulation of two windows on element stream") {

    val firstWindowElements = immutable.Seq.range(1, 999, 10).map(t => SimpleRecord(1)(t))
    val secondWindowElements = immutable.Seq.range(1000, 1999, 10).map(t => SimpleRecord(1)(t))
    val records = firstWindowElements ++ secondWindowElements

    val s = TestSink.probe[Seq[SimpleRecord]]

    val (_, sink) = Source(records)
      .via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
      .toMat(s)(Keep.both)
      .run()

    sink.request(numberOfElements)
    sink.expectNext(firstWindowElements, secondWindowElements)
    sink.expectComplete()
  }

  test("accumulation on empty stream") {

    val s = TestSink.probe[Seq[SimpleRecord]]

    val records = List[SimpleRecord]()

    val (_, sink) = Source(records)
      .via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
      .toMat(s)(Keep.both)
      .run()

    sink.request(numberOfElements)
    sink.expectComplete()
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:57,代码来源:AccumulateWhileUnchangedTest.scala


示例9: IgnoreLastElementsTest

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.FunSuite

import scala.collection.immutable

class IgnoreLastElementsTest extends FunSuite {

  implicit val system = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()(system)

  test("happy list") {

    val start = 0
    val end = 10
    val values = immutable.Seq.range(start, end)
    val ignoreCount = 2

    val s = TestSink.probe[Int]

    val (_, sink) = Source(values)
      .via(new IgnoreLastElements[Int](ignoreCount))
      .toMat(s)(Keep.both)
      .run()

    val numberOfElements = end
    sink.request(numberOfElements)
    values.dropRight(ignoreCount).foreach(v => sink.expectNext(v))
    sink.expectComplete()
  }

  test("empty list") {

    val s = TestSink.probe[Int]

    val (_, sink) = Source(List[Int]())
      .via(new IgnoreLastElements[Int](ignoreCount = 2))
      .toMat(s)(Keep.both)
      .run()

    val numberOfElements = 1
    sink.request(numberOfElements)
    sink.expectComplete()
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:50,代码来源:IgnoreLastElementsTest.scala


示例10: running

//设置package包名称以及导入依赖的类
package producers

import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.{Done, NotUsed}
import broker.ActorBroker
import config.AppConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.concurrent.Future

trait Producerable extends ActorBroker {
  val config: AppConfig
  implicit val materializer = ActorMaterializer()

  val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers(s"${config.kafkaConfig.uri}:${config.kafkaConfig.port}")

  def running(): Receive = {
    case Stop =>
      log.info("Stopping Kafka producer stream and actor")
      context.stop(self)
  }

  def sendToSink(message: String): Unit = {
    log.info(s"Attempting to produce message on topic $topicName")
    val kafkaSink = Producer.plainSink(producerSettings)

    val stringToProducerRecord: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String](topicName, message)
    val (a, future): (NotUsed, Future[Done]) = Source.fromFuture(Future(stringToProducerRecord))
      .toMat(kafkaSink)(Keep.both)
      .run()
    future.onFailure {
      case ex =>
        log.error("Stream failed due to error, restarting", ex)
        throw ex
    }
    context.become(running())
    log.info(s"Writer now running, writing random numbers to topic $topicName")
  }


  case object Stop
} 
开发者ID:jguido,项目名称:reactive-kafka-registration,代码行数:48,代码来源:Producerable.scala


示例11: EventEncoderSpec

//设置package包名称以及导入依赖的类
package client.protocol

import java.security.KeyPairGenerator
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.cryptoutility.protocol.Events._
import com.cryptoutility.protocol.EventSerializer
import org.scalatestplus.play.PlaySpec
import play.api.http.websocket.BinaryMessage

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps


class EventEncoderSpec extends PlaySpec{

  implicit val system = ActorSystem("test-system")
  implicit val mat = ActorMaterializer()

  def publicKey = {
    KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic
  }

  "event encode" should{
    "encode Event to a byte stream" in {
      val expected = Initialized(isNew = false, UserInfo("James", "Carl", "[email protected]", publicKey, UUID.randomUUID().toString))

      val f =
        Source.single(expected)
          .via(new EventEncoder)
            .toMat(Sink.head)(Keep.right).run()

      val bytes = Await.result(f, 500 millis).asInstanceOf[BinaryMessage].data.toArray
      val actual = EventSerializer.deserialize(bytes)
      actual mustBe expected
    }

  }

} 
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:45,代码来源:EventEncoderSpec.scala


示例12: EventDecoderSpec

//设置package包名称以及导入依赖的类
package client.protocol

import java.security.KeyPairGenerator
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import com.cryptoutility.protocol.Events._
import com.cryptoutility.protocol.EventSerializer
import org.scalatestplus.play.PlaySpec
import play.api.http.websocket.BinaryMessage

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random


class EventDecoderSpec extends PlaySpec{

  implicit val system = ActorSystem("test-system")
  implicit val mat = ActorMaterializer()
  val id = () => UUID.randomUUID().toString

  def publicKey = {
    KeyPairGenerator.getInstance("RSA").generateKeyPair().getPublic
  }

  "event decode" should{
    "decode byte streams to an event" in {
      val expected = Initialized(true, UserInfo("James", "Carl", "[email protected]", publicKey, id()))
      val serialized = EventSerializer.serialize(expected)

      val f =
        Source.single(BinaryMessage(ByteString(serialized)))
          .via(new EventDecoder)
            .toMat(Sink.head)(Keep.right).run()

      val actual = Await.result(f, 500 millis)

      actual mustBe expected
    }
    "of an invalid byte stream" in {
      a [InvalidFormatException] should be thrownBy{
        val data = new Array[Byte](256)
        Random.nextBytes(data)
        EventSerializer.deserialize(data)
      }
    }
  }

} 
开发者ID:ejosiah,项目名称:crypto-utility,代码行数:55,代码来源:EventDecoderSpec.scala


示例13: KafkaWriter

//设置package包名称以及导入依赖的类
package wiii

import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}


object KafkaWriter {
    def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}

class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
    override def preStart(): Unit = initWriter()
    override def receive: Receive = {
        case _ =>
    }

    def initWriter(): Unit = {
        val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
            bootstrapServers = "localhost:9092",
            topic = topicName,
            valueSerializer = TweetSerializer
        ))
        val subscriber = context.actorOf(subscriberProps)
        val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

        val factory = new TwitterStreamFactory()
        val twitterStream = factory.getInstance()
        twitterStream.addListener(new StatusForwarder(actorRef))
        twitterStream.filter(new FilterQuery("espn"))

        Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
        .runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
    }
}

class StatusForwarder(publisher: ActorRef) extends StatusListener {
    def onStatus(status: Status): Unit = publisher ! status

    //\\ nop all the others for now  //\\
    def onStallWarning(warning: StallWarning): Unit = {}
    def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
    def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
    def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
    def onException(ex: Exception): Unit = {}
} 
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala


示例14: FullStream

//设置package包名称以及导入依赖的类
package com.yannick_cw.elastic_indexer4s.indexing_logic

import akka.NotUsed
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.yannick_cw.elastic_indexer4s.Index_results.{IndexError, StageSucceeded, StageSuccess}
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

object FullStream extends LazyLogging {

  private def countAntLogSink[A](logPer: FiniteDuration): Sink[A, Future[Int]] = Flow[A]
    .groupedWithin(Int.MaxValue, logPer)
    .map(_.length)
    .map { elementsPerTime =>
      logger.info(s"Indexed $elementsPerTime elements last $logPer")
      elementsPerTime
    }.toMat(Sink.reduce[Int](_ + _))(Keep.right)

  def run[A](source: Source[A, NotUsed], sink: Sink[A, Future[Unit]], logSpeedInterval: FiniteDuration)
    (implicit materializer: ActorMaterializer, ex: ExecutionContext): Future[Either[IndexError, StageSucceeded]] =
    (for {
      count <- source
        .alsoToMat(countAntLogSink(logSpeedInterval))(Keep.right)
        .toMat(sink)(Keep.both)
        .mapMaterializedValue{ case(fCount, fDone) => fDone.flatMap(_ => fCount) }
        .run()
    } yield Right(StageSuccess(s"Indexed $count documents successfully")))
      .recover { case NonFatal(t) =>
        Left(IndexError("Writing documents failed.", Some(t)))
      }
} 
开发者ID:yannick-cw,项目名称:elastic-indexer4s,代码行数:36,代码来源:FullStream.scala


示例15: validate

//设置package包名称以及导入依赖的类
package csw.services.config.server.files

import java.nio.file.{Path, Paths}

import akka.stream.scaladsl.{FileIO, Keep}
import csw.services.config.api.models.ConfigData
import csw.services.config.server.commons.ConfigServerLogger
import csw.services.config.server.{ActorRuntime, Settings}

import scala.async.Async._
import scala.concurrent.Future


  def validate(id: String, path: Path): Future[Boolean] = async {
    id == await(Sha1.fromPath(path))
  }

  def saveAndSha(configData: ConfigData): Future[(Path, String)] = async {
    val path = await(fileRepo.createTempFile("config-service-overize-", ".tmp"))
    val (resultF, shaF) = configData.source
      .alsoToMat(FileIO.toPath(path))(Keep.right)
      .toMat(Sha1.sink)(Keep.both)
      .run()
    await(resultF).status.get
    (path, await(shaF))
  }

} 
开发者ID:tmtsoftware,项目名称:csw-prod,代码行数:29,代码来源:AnnexFileService.scala


示例16: Sha1

//设置package包名称以及导入依赖的类
package csw.services.config.server.files

import java.nio.file.Path
import java.security.MessageDigest

import akka.stream.Materializer
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString
import csw.services.config.api.models.ConfigData

import scala.concurrent.Future


object Sha1 {

  private def fromSource(source: Source[ByteString, Any])(implicit mat: Materializer): Future[String] =
    source.runWith(sink)

  def fromConfigData(configData: ConfigData)(implicit mat: Materializer): Future[String] =
    fromSource(configData.source)

  def fromPath(path: Path)(implicit mat: Materializer): Future[String] =
    fromSource(FileIO.fromPath(path))

  //Keep this a def so that the digester is created anew each time.
  def sink: Sink[ByteString, Future[String]] = {
    val sha1Digester = MessageDigest.getInstance("SHA-1")
    Flow[ByteString]
      .fold(sha1Digester) { (digester, bs) =>
        digester.update(bs.toArray)
        digester
      }
      .mapConcat(_.digest().toList)
      .map(_ & 0xFF)
      .map("%02x" format _)
      .toMat(Sink.fold("")(_ + _))(Keep.right)
  }

} 
开发者ID:tmtsoftware,项目名称:csw-prod,代码行数:40,代码来源:Sha1.scala


示例17: queueRequest

//设置package包名称以及导入依赖的类
package rest.client

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import rest.client.HttpRequestQueue._

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

trait HttpRequestQueue {

  implicit val system: ActorSystem
  implicit val mat: ActorMaterializer
  implicit val ec: ExecutionContext

  val QueueSize: Int

  val poolClientFlow: Flow[(HttpRequest, Promise[HttpResponse]),
                           (Try[HttpResponse], Promise[HttpResponse]),
                           Http.HostConnectionPool]

  private val queue =
    Source
      .queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.backpressure)
      .via(poolClientFlow)
      .toMat(Sink.foreach({
        case ((Success(httpResponse), promisedResponse)) => promisedResponse.success(httpResponse)
        case ((Failure(throwable), promisedResponse))    => promisedResponse.failure(throwable)
      }))(Keep.left)
      .run()

  def queueRequest(request: HttpRequest): Future[HttpResponse] = {
    val responsePromise = Promise[HttpResponse]()
    queue
      .offer(request -> responsePromise)
      .map {
        case QueueOfferResult.Enqueued    => responsePromise
        case QueueOfferResult.Dropped     => responsePromise.failure(new RuntimeException(QUEUE_OVERFLOW))
        case QueueOfferResult.QueueClosed => responsePromise.failure(new RuntimeException(QUEUE_CLOSED))
        case QueueOfferResult.Failure(ex) => responsePromise.failure(ex)
      }
      .flatMap(_.future)
  }
}

object HttpRequestQueue {
  private val QUEUE_OVERFLOW: String = "Queue overflowed. Try again later."
  private val QUEUE_CLOSED: String   = "Queue was closed (pool shut down) while running the request. Try again later."
} 
开发者ID:lymr,项目名称:fun-chat,代码行数:53,代码来源:HttpRequestQueue.scala


示例18: KafkaPushQueue

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager

import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.kafka.clients.producer.ProducerRecord
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future

class KafkaPushQueue(topic: String, settings: ProducerSettings[Long, PushData])
  extends PushQueue {

  override def pushDataSink: Sink[(Long, PushData), Future[Long]] =
    Flow[(Long, PushData)]
      .map {
        case (key, value) => Message(
          new ProducerRecord[Long, PushData]("notifications", key, value), key)
      }
      .via(Producer.flow(settings))
      .toMat(Sink.fold(0L)({ case (acc, _) => acc + 1}))(Keep.right)
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:23,代码来源:KafkaPushQueue.scala


示例19: HomeControllerSpec

//设置package包名称以及导入依赖的类
package controllers

import actors.TestKitSpec
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.testkit.TestProbe
import org.scalatest.MustMatchers
import play.api.libs.json.{JsValue, Json}

import scala.concurrent.ExecutionContext

class HomeControllerSpec extends TestKitSpec with MustMatchers {

  "createWebSocketFlow" should {

    "create a websocket flow and send a message through" in {
      implicit val materializer = ActorMaterializer()(system)
      implicit val ec: ExecutionContext = system.dispatcher

      val stocksActor = TestProbe("stocksActor")
      val userParentActor = TestProbe("userParentActor")
      val userActor = TestProbe("userActor")

      // http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-testkit.html
      val publisher = akka.stream.testkit.TestPublisher.probe[JsValue]()

      // instantiate the controller...
      val controller = new HomeController(stocksActor.ref, userParentActor.ref)

      // call method under test...
      val flowUnderTest = controller.createWebSocketFlow(publisher, userActor.ref)

      // create a test source and sink around the flow
      val (pub, sub) = TestSource.probe[JsValue]
        .via(flowUnderTest)
        .toMat(TestSink.probe[JsValue])(Keep.both)
        .run()

      val jsvalue = Json.obj("herp" -> "derp")

      // check that a message sent in will come out the other end
      sub.request(n = 1)
      publisher.sendNext(jsvalue)
      sub.expectNext(jsvalue)
    }

  }


} 
开发者ID:ChrisCooper,项目名称:customer-feedback-screen,代码行数:52,代码来源:HomeControllerSpec.scala


示例20: produceRecord

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka.testkit

import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

import scala.concurrent.Future

trait KafkaTest {
  def produceRecord[K, V](topic: String, keySerializer: Serializer[K], valueSerializer: Serializer[V], key: K, value: V)(implicit system: ActorSystem, mat: Materializer): Future[Done] = {
    val producerSettings = ProducerSettings(system, keySerializer, valueSerializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))

    Source.single(new ProducerRecord("mail.command.send", key, value))
      .toMat(Producer.plainSink(producerSettings))(Keep.right)
      .run()
  }

  def consumeRecordPF[K, V, Out](topic: String, keyDeserializer: Deserializer[K], valueDeserializer: Deserializer[V])(pf: PartialFunction[ConsumerRecord[K, V], Out])(implicit system: ActorSystem, mat: Materializer): Future[Out] = {
    val consumerSettings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
      .withBootstrapServers(system.settings.config.getString("kafka.bootstrap-servers"))
      .withGroupId(UUID.randomUUID.toString)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
      .filter(pf.isDefinedAt)
      .map(pf)
      .toMat(Sink.head)(Keep.right)
      .run()
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:40,代码来源:KafkaTest.scala



注:本文中的akka.stream.scaladsl.Keep类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ArrayBuffer类代码示例发布时间:2022-05-23
下一篇:
Scala Matcher类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap