本文整理汇总了Scala中akka.stream.KillSwitches类的典型用法代码示例。如果您正苦于以下问题:Scala KillSwitches类的具体用法?Scala KillSwitches怎么用?Scala KillSwitches使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了KillSwitches类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: PrintMoreNumbers
//设置package包名称以及导入依赖的类
package sample.stream_actor_simple
import akka.actor.Actor
import akka.stream.{ActorMaterializer, KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent.duration._
class PrintMoreNumbers(implicit materializer: ActorMaterializer) extends Actor {
private implicit val executionContext = context.system.dispatcher
private val (killSwitch: UniqueKillSwitch, done) =
Source.tick(0.seconds, 1.second, 1)
.scan(0)(_ + _)
.map(_.toString)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(println))(Keep.both)
.run()
done.map(_ => self ! "done")
override def receive: Receive = {
//When the actor is stopped, it will also stop the stream
case "stop" =>
println("Stopping")
killSwitch.shutdown()
case "done" =>
println("Done")
context.stop(self)
context.system.terminate()
}
}
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:33,代码来源:PrintMoreNumbers.scala
示例2: PerpetualStreamTest
//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils
import akka.Done
import akka.actor.{ActorSystem, Props}
import akka.stream.scaladsl.{Concat, Keep, Sink, Source}
import akka.stream.{KillSwitches, ThrottleMode}
import akka.testkit.{TestKit, TestProbe}
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Millis, Seconds, Span}
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration._
class PerpetualStreamTest extends TestKit(ActorSystem()) with FlatSpecLike with Matchers with Eventually with BeforeAndAfterAll {
implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(100, Millis))
"PerpetualStream" should "work" in {
val list = ListBuffer.empty[Int]
val stream = system.actorOf(Props(new NumberGeneratingPerpetualStream(list)))
val probe = new TestProbe(system)
probe.watch(stream)
eventually(require(list.length >= 20, "Must have emitted at least 20 numbers"))
list.take(20) should be((-10 until 10).toList)
GracefulShutdownExtension(system).triggerGracefulShutdown()
probe.expectTerminated(stream)
}
override def afterAll(): Unit = TestKit.shutdownActorSystem(system)
class NumberGeneratingPerpetualStream(list: ListBuffer[Int]) extends PerpetualStream {
var count = 0
override def stream = {
val currentCount = count
count = count + 1
val source = currentCount match {
case 0 => Source.combine(Source(-10 until 0), Source.failed[Int](new RuntimeException("ERROR")))(Concat(_))
case 1 => Source(0 until 10)
case _ => Source(10 to 100000).throttle(1000, 100.millis, 100, ThrottleMode.shaping)
}
source
.via(watcher)
.viaMat(KillSwitches.single)(Keep.right)
.map { i => list.append(i); i }
.to(Sink.ignore)
.mapMaterializedValue(ks => () => {
ks.shutdown()
Future.successful(Done)
})
}
}
}
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:59,代码来源:PerpetualStreamTest.scala
示例3: Room
//设置package包名称以及导入依赖的类
package chat
import java.util.concurrent.atomic.AtomicReference
import javax.inject.{ Inject, Singleton }
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ BroadcastHub, Flow, Keep, MergeHub, Sink }
import akka.stream.{ KillSwitches, Materializer, UniqueKillSwitch }
import akka.stream.scaladsl.{ Sink, Source }
import akka.NotUsed
import scala.collection.mutable.{ Map => MutableMap }
import scala.concurrent.duration._
case class Room(roomId: String, bus: Flow[Message, Message, UniqueKillSwitch])
@Singleton
class RoomClient @Inject()(implicit val materializer: Materializer, implicit val system: ActorSystem) {
def chatRoom(roomId: String): Room = synchronized {
RoomClient.roomPool.get.get(roomId) match {
case Some(room) =>
room
case None =>
val room = create(roomId)
RoomClient.roomPool.get() += (roomId -> room)
room
}
}
private def create(roomId: String): Room = {
val (sink, source) =
MergeHub.source[Message](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
.run()
source.runWith(Sink.ignore)
val bus = Flow.fromSinkAndSource(sink, source)
.joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
.backpressureTimeout(3.seconds)
Room(roomId, bus)
}
}
object RoomClient {
val roomPool = new AtomicReference[MutableMap[String, Room]](MutableMap[String, Room]())
}
开发者ID:hys-rabbit,项目名称:PlayChat,代码行数:53,代码来源:RoomClient.scala
示例4: AkkaStreamProcess
//设置package包名称以及导入依赖的类
package aecor.distributedprocessing
import aecor.distributedprocessing.DistributedProcessing._
import aecor.effect.{ Capture, CaptureFuture }
import akka.NotUsed
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import akka.stream.{ KillSwitches, Materializer }
object AkkaStreamProcess {
final class Mk[F[_]] {
def apply[A](
source: Source[A, NotUsed],
flow: Flow[A, Unit, NotUsed]
)(implicit mat: Materializer, F0: CaptureFuture[F], F1: Capture[F]): Process[F] =
Process(run = Capture[F].capture {
val (killSwitch, terminated) = source
.viaMat(KillSwitches.single)(Keep.right)
.via(flow)
.toMat(Sink.ignore)(Keep.both)
.run()
RunningProcess(
CaptureFuture[F].captureFuture(terminated.map(_ => ())(mat.executionContext)),
() => killSwitch.shutdown()
)
})
}
def apply[F[_]]: Mk[F] = new Mk[F]
}
开发者ID:notxcain,项目名称:aecor,代码行数:29,代码来源:AkkaStreamProcess.scala
示例5: HbaseTest
//设置package包名称以及导入依赖的类
package teleporter.stream.integration.template
import akka.Done
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitch, KillSwitches}
import org.apache.hadoop.hbase.client.Put
import teleporter.integration.component.hbase.{Hbase, HbaseAction}
import teleporter.integration.core.Streams._
import teleporter.integration.core.{Message, TeleporterCenter}
import teleporter.integration.utils.Bytes._
import scala.concurrent.Future
object HbaseTest extends StreamLogic {
override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
import center.{materializer, self}
Source.single(1)
.map { o ?
val put = new Put("1")
put.addColumn("f", "q", "v")
Message.apply(HbaseAction("teleporter", put))
}
.via(Hbase.flow("/sink/test/hbase_test/hbase_test"))
.viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
.to(Sink.ignore).run()
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:29,代码来源:HbaseTest.scala
示例6: Hadoop2Elasticsearch
//设置package包名称以及导入依赖的类
//package teleporter.stream.integration.transaction
import akka.Done
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.{KillSwitch, KillSwitches}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.index.VersionType
import teleporter.integration.component.elasticsearch.ElasticSearch2
import teleporter.integration.component.hdfs.Hdfs
import teleporter.integration.core.Streams._
import teleporter.integration.core.{SourceAck, StreamContext, TeleporterCenter}
import teleporter.integration.utils.Converters._
import scala.concurrent.Future
object Hadoop2Elasticsearch extends StreamLogic {
override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
import center.{materializer, self}
val context = center.context.getContext[StreamContext](key)
val arguments = context.config.arguments
val fields = arguments[String]("fields").split(",")
val index = arguments[String]("index")
val indexType = arguments[String]("type")
Hdfs.sourceAck("/source/test/hadoop_es_test/hadoop2es/hdfs_source")
.map { m ?
m.map { bs ?
val data = fields.zip(bs.utf8String.split(",")).toMap
new IndexRequest()
.index(index)
.`type`(indexType)
.id(data("id"))
.source(data)
.versionType(VersionType.EXTERNAL)
.version(System.currentTimeMillis())
}
}
.via(ElasticSearch2.flow("/sink/test/hadoop_es_test/hadoop2es/item_es"))
.via(SourceAck.confirmFlow())
.viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
.to(Sink.ignore).run()
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:44,代码来源:Hadoop2Elasticsearch.scala
示例7: StreamExample
//设置package包名称以及导入依赖的类
//package teleporter.stream.integration.transaction
import akka.Done
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.{KillSwitch, KillSwitches}
import teleporter.integration.component.jdbc.SqlSupport._
import teleporter.integration.component.jdbc.{Jdbc, Upsert}
import teleporter.integration.core.Streams._
import teleporter.integration.core.{SourceAck, TeleporterCenter}
import scala.concurrent.Future
object StreamExample extends StreamLogic {
override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
import center.{materializer, self}
Jdbc.sourceAck("/source/ns/task/stream/source")
.map(_.map(data ? Seq(Upsert(
updateSql("table_name", "id", data),
insertIgnoreSql("table_name", data)
))).toMessage)
.via(Jdbc.flow("/source/ns/task/stream/sink"))
.via(SourceAck.confirmFlow())
.viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
.to(Sink.ignore).run()
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:28,代码来源:StreamExample.scala
示例8: SendKafka
//设置package包名称以及导入依赖的类
//package teleporter.stream.integration.transaction
import akka.Done
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitch, KillSwitches}
import org.apache.commons.lang3.RandomStringUtils
import org.apache.kafka.clients.producer.ProducerRecord
import teleporter.integration.component.Kafka
import teleporter.integration.core.Streams.StreamLogic
import teleporter.integration.core.{Message, TeleporterCenter}
import teleporter.integration.utils.Bytes._
import scala.concurrent.Future
import scala.concurrent.duration._
object SendKafka extends StreamLogic {
override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
println("--------------SendKafka---------------------")
import center.{materializer, self}
Source.tick(1.second, 1.second, "test")
.map { _ ?
val s = RandomStringUtils.randomAlphabetic(10)
println(s)
Message(data = new ProducerRecord[Array[Byte], Array[Byte]]("ppp", "1", s))
}
.via(Kafka.flow("/sink/test/test_task/kafka_send/send_sink"))
.viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
.to(Sink.ignore).run()
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:32,代码来源:SendKafka.scala
注:本文中的akka.stream.KillSwitches类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论