• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala KillSwitches类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.KillSwitches的典型用法代码示例。如果您正苦于以下问题:Scala KillSwitches类的具体用法?Scala KillSwitches怎么用?Scala KillSwitches使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了KillSwitches类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: PrintMoreNumbers

//设置package包名称以及导入依赖的类
package sample.stream_actor_simple

import akka.actor.Actor
import akka.stream.{ActorMaterializer, KillSwitches, UniqueKillSwitch}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.duration._

class PrintMoreNumbers(implicit materializer: ActorMaterializer) extends Actor {
  private implicit val executionContext = context.system.dispatcher

  private val (killSwitch: UniqueKillSwitch, done) =
    Source.tick(0.seconds, 1.second, 1)
      .scan(0)(_ + _)
      .map(_.toString)
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  done.map(_ => self ! "done")

  override def receive: Receive = {
    //When the actor is stopped, it will also stop the stream
    case "stop" =>
      println("Stopping")
      killSwitch.shutdown()
    case "done" =>
      println("Done")
      context.stop(self)
      context.system.terminate()
  }
} 
开发者ID:pbernet,项目名称:akka_streams_tutorial,代码行数:33,代码来源:PrintMoreNumbers.scala


示例2: PerpetualStreamTest

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils

import akka.Done
import akka.actor.{ActorSystem, Props}
import akka.stream.scaladsl.{Concat, Keep, Sink, Source}
import akka.stream.{KillSwitches, ThrottleMode}
import akka.testkit.{TestKit, TestProbe}
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Millis, Seconds, Span}

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration._

class PerpetualStreamTest extends TestKit(ActorSystem()) with FlatSpecLike with Matchers with Eventually with BeforeAndAfterAll {
  implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(100, Millis))

  "PerpetualStream" should "work" in {
    val list = ListBuffer.empty[Int]
    val stream = system.actorOf(Props(new NumberGeneratingPerpetualStream(list)))
    val probe = new TestProbe(system)
    probe.watch(stream)

    eventually(require(list.length >= 20, "Must have emitted at least 20 numbers"))
    list.take(20) should be((-10 until 10).toList)

    GracefulShutdownExtension(system).triggerGracefulShutdown()
    probe.expectTerminated(stream)
  }

  override def afterAll(): Unit = TestKit.shutdownActorSystem(system)

  class NumberGeneratingPerpetualStream(list: ListBuffer[Int]) extends PerpetualStream {
    var count = 0

    override def stream = {
      val currentCount = count
      count = count + 1

      val source = currentCount match {
        case 0 => Source.combine(Source(-10 until 0), Source.failed[Int](new RuntimeException("ERROR")))(Concat(_))
        case 1 => Source(0 until 10)
        case _ => Source(10 to 100000).throttle(1000, 100.millis, 100, ThrottleMode.shaping)
      }

      source
        .via(watcher)
        .viaMat(KillSwitches.single)(Keep.right)
        .map { i => list.append(i); i }
        .to(Sink.ignore)
        .mapMaterializedValue(ks => () => {
          ks.shutdown()
          Future.successful(Done)
        })
    }
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:59,代码来源:PerpetualStreamTest.scala


示例3: Room

//设置package包名称以及导入依赖的类
package chat

import java.util.concurrent.atomic.AtomicReference
import javax.inject.{ Inject, Singleton }

import akka.actor.ActorSystem
import akka.stream.scaladsl.{ BroadcastHub, Flow, Keep, MergeHub, Sink }
import akka.stream.{ KillSwitches, Materializer, UniqueKillSwitch }
import akka.stream.scaladsl.{ Sink, Source }
import akka.NotUsed

import scala.collection.mutable.{ Map => MutableMap }
import scala.concurrent.duration._

case class Room(roomId: String, bus: Flow[Message, Message, UniqueKillSwitch])

@Singleton
class RoomClient @Inject()(implicit val materializer: Materializer, implicit val system: ActorSystem) {

  def chatRoom(roomId: String): Room = synchronized {
    RoomClient.roomPool.get.get(roomId) match {
      case Some(room) =>
        room
      case None =>
        val room = create(roomId)
        RoomClient.roomPool.get() += (roomId -> room)
        room
    }
  }

  private def create(roomId: String): Room = {

    val (sink, source) =
      MergeHub.source[Message](perProducerBufferSize = 16)
          .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
          .run()
          
    source.runWith(Sink.ignore)

    val bus = Flow.fromSinkAndSource(sink, source)
        .joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
        .backpressureTimeout(3.seconds)

    Room(roomId, bus)
  }
}

object RoomClient {
  
  val roomPool = new AtomicReference[MutableMap[String, Room]](MutableMap[String, Room]())

} 
开发者ID:hys-rabbit,项目名称:PlayChat,代码行数:53,代码来源:RoomClient.scala


示例4: AkkaStreamProcess

//设置package包名称以及导入依赖的类
package aecor.distributedprocessing

import aecor.distributedprocessing.DistributedProcessing._
import aecor.effect.{ Capture, CaptureFuture }
import akka.NotUsed
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import akka.stream.{ KillSwitches, Materializer }

object AkkaStreamProcess {
  final class Mk[F[_]] {
    def apply[A](
      source: Source[A, NotUsed],
      flow: Flow[A, Unit, NotUsed]
    )(implicit mat: Materializer, F0: CaptureFuture[F], F1: Capture[F]): Process[F] =
      Process(run = Capture[F].capture {
        val (killSwitch, terminated) = source
          .viaMat(KillSwitches.single)(Keep.right)
          .via(flow)
          .toMat(Sink.ignore)(Keep.both)
          .run()
        RunningProcess(
          CaptureFuture[F].captureFuture(terminated.map(_ => ())(mat.executionContext)),
          () => killSwitch.shutdown()
        )
      })
  }
  def apply[F[_]]: Mk[F] = new Mk[F]
} 
开发者ID:notxcain,项目名称:aecor,代码行数:29,代码来源:AkkaStreamProcess.scala


示例5: HbaseTest

//设置package包名称以及导入依赖的类
package teleporter.stream.integration.template

import akka.Done
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitch, KillSwitches}
import org.apache.hadoop.hbase.client.Put
import teleporter.integration.component.hbase.{Hbase, HbaseAction}
import teleporter.integration.core.Streams._
import teleporter.integration.core.{Message, TeleporterCenter}
import teleporter.integration.utils.Bytes._

import scala.concurrent.Future


object HbaseTest extends StreamLogic {
  override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
    import center.{materializer, self}
    Source.single(1)
      .map { o ?
        val put = new Put("1")
        put.addColumn("f", "q", "v")
        Message.apply(HbaseAction("teleporter", put))
      }
      .via(Hbase.flow("/sink/test/hbase_test/hbase_test"))
      .viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
      .to(Sink.ignore).run()
  }
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:29,代码来源:HbaseTest.scala


示例6: Hadoop2Elasticsearch

//设置package包名称以及导入依赖的类
//package teleporter.stream.integration.transaction

import akka.Done
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.{KillSwitch, KillSwitches}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.index.VersionType
import teleporter.integration.component.elasticsearch.ElasticSearch2
import teleporter.integration.component.hdfs.Hdfs
import teleporter.integration.core.Streams._
import teleporter.integration.core.{SourceAck, StreamContext, TeleporterCenter}
import teleporter.integration.utils.Converters._

import scala.concurrent.Future


object Hadoop2Elasticsearch extends StreamLogic {
  override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
    import center.{materializer, self}
    val context = center.context.getContext[StreamContext](key)
    val arguments = context.config.arguments
    val fields = arguments[String]("fields").split(",")
    val index = arguments[String]("index")
    val indexType = arguments[String]("type")
    Hdfs.sourceAck("/source/test/hadoop_es_test/hadoop2es/hdfs_source")
      .map { m ?
        m.map { bs ?
          val data = fields.zip(bs.utf8String.split(",")).toMap
          new IndexRequest()
            .index(index)
            .`type`(indexType)
            .id(data("id"))
            .source(data)
            .versionType(VersionType.EXTERNAL)
            .version(System.currentTimeMillis())
        }
      }
      .via(ElasticSearch2.flow("/sink/test/hadoop_es_test/hadoop2es/item_es"))
      .via(SourceAck.confirmFlow())
      .viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
      .to(Sink.ignore).run()
  }
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:44,代码来源:Hadoop2Elasticsearch.scala


示例7: StreamExample

//设置package包名称以及导入依赖的类
//package teleporter.stream.integration.transaction

import akka.Done
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.{KillSwitch, KillSwitches}
import teleporter.integration.component.jdbc.SqlSupport._
import teleporter.integration.component.jdbc.{Jdbc, Upsert}
import teleporter.integration.core.Streams._
import teleporter.integration.core.{SourceAck, TeleporterCenter}

import scala.concurrent.Future


object StreamExample extends StreamLogic {
  override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
    import center.{materializer, self}
    Jdbc.sourceAck("/source/ns/task/stream/source")
      .map(_.map(data ? Seq(Upsert(
        updateSql("table_name", "id", data),
        insertIgnoreSql("table_name", data)
      ))).toMessage)
      .via(Jdbc.flow("/source/ns/task/stream/sink"))
      .via(SourceAck.confirmFlow())
      .viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
      .to(Sink.ignore).run()
  }
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:28,代码来源:StreamExample.scala


示例8: SendKafka

//设置package包名称以及导入依赖的类
//package teleporter.stream.integration.transaction

import akka.Done
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitch, KillSwitches}
import org.apache.commons.lang3.RandomStringUtils
import org.apache.kafka.clients.producer.ProducerRecord
import teleporter.integration.component.Kafka
import teleporter.integration.core.Streams.StreamLogic
import teleporter.integration.core.{Message, TeleporterCenter}
import teleporter.integration.utils.Bytes._

import scala.concurrent.Future
import scala.concurrent.duration._


object SendKafka extends StreamLogic {
  override def apply(key: String, center: TeleporterCenter): (KillSwitch, Future[Done]) = {
    println("--------------SendKafka---------------------")
    import center.{materializer, self}
    Source.tick(1.second, 1.second, "test")
      .map { _ ?
        val s = RandomStringUtils.randomAlphabetic(10)
        println(s)
        Message(data = new ProducerRecord[Array[Byte], Array[Byte]]("ppp", "1", s))
      }
      .via(Kafka.flow("/sink/test/test_task/kafka_send/send_sink"))
      .viaMat(KillSwitches.single)(Keep.right).watchTermination()(Keep.both)
      .to(Sink.ignore).run()
  }
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:32,代码来源:SendKafka.scala



注:本文中的akka.stream.KillSwitches类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Message类代码示例发布时间:2022-05-23
下一篇:
Scala Credentials类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap