本文整理汇总了Scala中akka.stream.Supervision类的典型用法代码示例。如果您正苦于以下问题:Scala Supervision类的具体用法?Scala Supervision怎么用?Scala Supervision使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Supervision类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Server
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp
import akka.actor.{ ActorSystem, Props }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, Supervision }
import com.typesafe.config.ConfigFactory
import net.ruippeixotog.scalafbp.http._
import net.ruippeixotog.scalafbp.protocol.MainProtocolActor
import net.ruippeixotog.scalafbp.runtime.{ ComponentLoader, ComponentRegistry, GraphStore }
object Server extends App with WsRuntimeHttpService with RegisterHttpService with RegistryHttpService
with UiHttpService {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
val decider: Supervision.Decider = { e =>
log.error("Unhandled exception in stream", e)
Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val config = ConfigFactory.load.getConfig("scalafbp")
val registryConfig = config.getConfig("registry")
val runtimeConfig = config.getConfig("runtime")
val runtimeId = config.getString("runtime-id")
val secret = config.getString("secret")
val host = config.getString("host")
val port = config.getInt("port")
val disableUi = config.getBoolean("disable-ui")
// the registry of components that will be made available to clients
val compRegistry = system.actorOf(ComponentRegistry.props(ComponentLoader.allInClasspath))
// an object responsible for storing and managing the graph definitions currently in the runtime
val graphStore = system.actorOf(Props(new GraphStore))
// actor that receives incoming messages (as `Message` objects) and translates them into actions using the above
// constructs
val protocolActor = system.actorOf(Props(
new MainProtocolActor(runtimeId, secret, compRegistry, graphStore, runtimeConfig)))
// all the routes offered by this server
val routes = registrationRoutes ~ registryRoutes ~ wsRuntimeRoutes ~ uiRoutes
Http().bindAndHandle(routes, host, port).foreach { binding =>
log.info(s"Bound to ${binding.localAddress}")
onBind(binding)
}
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:58,代码来源:Server.scala
示例2: decider
//设置package包名称以及导入依赖的类
package org.wex.cmsfs.common.core
import akka.stream.{ActorAttributes, Supervision}
import org.slf4j.Logger
trait CmsfsAkkaStream {
val logger: Logger
private def decider(f: (String) => String): Supervision.Decider = {
case ex: Exception =>
logger.error(f(ex.getMessage))
Supervision.Resume
}
def supervisionStrategy(f: (String) => String) = {
ActorAttributes.supervisionStrategy(decider(f))
}
def loggerFlow[T](elem: T, mess: String): T = {
logger.info(mess)
elem
}
}
开发者ID:shinhwagk,项目名称:cmsfs,代码行数:25,代码来源:CmsfsAkkaStream.scala
示例3: aultPatience
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.testkit.{ImplicitSender, TestKit, TestKitBase}
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.taxis99.amazon.sqs.SqsClientFactory
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time._
import scala.concurrent.{ExecutionContext, Future}
package object test {
trait BaseSpec extends FlatSpec with Matchers with OptionValues with PatienceConfiguration with RecoverMethods {
implicit val defaultPatience =
PatienceConfig(timeout = Span(3, Seconds), interval = Span(5, Millis))
}
trait StreamSpec extends AsyncFlatSpec with Matchers with OptionValues with PatienceConfiguration
with TestKitBase with ImplicitSender with BeforeAndAfterAll {
implicit lazy val system: ActorSystem = ActorSystem("test", ConfigFactory.parseString("""
akka.actor.deployment.default.dispatcher = "akka.test.calling-thread-dispatcher"
"""))
override implicit def executionContext: ExecutionContext = system.dispatcher
override implicit def patienceConfig = PatienceConfig(timeout = Span(1, Minute), interval = Span(5, Millis))
val decider: Supervision.Decider = {
case _ => Supervision.Stop
}
val settings = ActorMaterializerSettings(system).withSupervisionStrategy(decider)
implicit lazy val materializer = ActorMaterializer(settings)
def withInMemoryQueue(testCode: (AmazonSQSAsync) => Future[Assertion]): Future[Assertion] = {
val (server, aws) = SqsClientFactory.inMemory(Some(system))
// "loan" the fixture to the test
testCode(aws) andThen {
case _ => server.stopAndWait()
}
}
override def afterAll {
TestKit.shutdownActorSystem(system)
}
}
}
开发者ID:99Taxis,项目名称:common-sqs,代码行数:50,代码来源:package.scala
示例4: executionContext
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.testkit.{TestKit, TestKitBase}
import com.taxis99.amazon.sns.SnsClientFactory
import com.taxis99.amazon.sqs.SqsClientFactory
import com.typesafe.config.ConfigFactory
import org.scalatest._
import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.{Millis, Minute, Span}
import scala.concurrent.ExecutionContext
package object it {
trait IntegrationSpec extends AsyncFlatSpec with Matchers with OptionValues with PatienceConfiguration
with TestKitBase with BeforeAndAfterAll {
implicit lazy val system: ActorSystem = ActorSystem("test", ConfigFactory.parseString("""
akka.actor.deployment.default.dispatcher = "akka.test.calling-thread-dispatcher"
"""))
override implicit def executionContext: ExecutionContext = system.dispatcher
override implicit def patienceConfig = PatienceConfig(timeout = Span(1, Minute), interval = Span(5, Millis))
implicit lazy val amazonSqsConn = SqsClientFactory.atLocalhost(9324)
implicit lazy val amazonSnsConn = SnsClientFactory.atLocalhost(9292)
val decider: Supervision.Decider = {
case _ => Supervision.Stop
}
val settings = ActorMaterializerSettings(system).withSupervisionStrategy(decider)
implicit lazy val materializer = ActorMaterializer(settings)
override def afterAll {
TestKit.shutdownActorSystem(system)
}
}
}
开发者ID:99Taxis,项目名称:common-sqs,代码行数:41,代码来源:package.scala
示例5: LongActorRefPublisherSpec
//设置package包名称以及导入依赖的类
package com.example
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
class LongActorRefPublisherSpec extends TestKit(ActorSystem("test-system")) with FlatSpecLike with Matchers with BeforeAndAfterAll {
override def afterAll(): Unit = {
super.afterAll()
TestKit.shutdownActorSystem(system)
}
val decider: Supervision.Decider = {
case e => {
println(s"Stopping Stream.. ${e.getMessage}")
Supervision.Stop
}
}
implicit val materializer = ActorMaterializer.create(ActorMaterializerSettings.create(system)
.withDebugLogging(true)
.withSupervisionStrategy(decider)
.withAutoFusing(true), system)
"Advert ID Actor" should "work" in {
}
}
开发者ID:tonymurphy,项目名称:actor-publisher,代码行数:33,代码来源:LongActorRefPublisherSpec.scala
示例6: HandlingErrorsApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.scaladsl._
object HandlingErrorsApplication extends App {
implicit val actorSystem = ActorSystem("HandlingErrors")
val streamDecider: Supervision.Decider = {
case e: IndexOutOfBoundsException =>
println("Dropping element because of IndexOufOfBoundException. Resuming.")
Supervision.Resume
case _ => Supervision.Stop
}
val flowDecider: Supervision.Decider = {
case e: IllegalArgumentException =>
println("Dropping element because of IllegalArgumentException. Restarting.")
Supervision.Restart
case _ => Supervision.Stop
}
val actorMaterializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(streamDecider)
implicit val actorMaterializer = ActorMaterializer(actorMaterializerSettings)
val words = List("Handling", "Errors", "In", "Akka", "Streams", "")
val flow = Flow[String].map(word => {
if(word.length == 0) throw new IllegalArgumentException("Empty words are not allowed")
word
}).withAttributes(ActorAttributes.supervisionStrategy(flowDecider))
Source(words).via(flow).map(array => array(2)).to(Sink.foreach(println)).run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:37,代码来源:HandlingErrorsApplication.scala
示例7: StreamTest
//设置package包名称以及导入依赖的类
package stream
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Supervision.Decider
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.typesafe.config.ConfigFactory
import org.scalatest.{AsyncFunSuite, BeforeAndAfterAll, Matchers}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class StreamTest extends AsyncFunSuite with BeforeAndAfterAll with Matchers {
implicit val system = ActorSystem.create("stream", ConfigFactory.load("test.conf"))
implicit val dispatcher = system.dispatcher
val decider: Decider = Supervision.restartingDecider
val settings = ActorMaterializerSettings(system).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(settings)
val source: Source[Int, NotUsed] = Source(1 to 10)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0).map(_ * 2)
val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
override protected def afterAll(): Unit = {
Await.result(system.terminate(), 1 second)
}
test("source") {
source.runFold(0)(_ + _) map { _ shouldBe 55 }
source.runReduce(_ + _) map { _ shouldBe 55 }
}
test("source ~ sink") {
source.toMat(sink)(Keep.right).run map { _ shouldBe 55 }
source.runWith(sink) map { _ shouldBe 55 }
}
test("source ~ flow ~ sink") {
source.via(flow).toMat(sink)(Keep.right).run map { _ shouldBe 60 }
source.via(flow).runWith(sink) map { _ shouldBe 60 }
}
test("flow ~ source ~ sink") {
flow.runWith(source, sink)._2 map { _ shouldBe 60 }
}
}
开发者ID:objektwerks,项目名称:akka.streams,代码行数:48,代码来源:StreamTest.scala
示例8: Config
//设置package包名称以及导入依赖的类
package com.github.kliewkliew.cornucopia.kafka
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Producer, Consumer => ConsumerDSL}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
object Config {
object Cornucopia {
private val config = ConfigFactory.load().getConfig("cornucopia")
val minReshardWait = config.getInt("reshard.interval").seconds
val gracePeriod = config.getInt("grace.period") * 1000
val refreshTimeout = config.getInt("refresh.timeout") * 1000
val batchPeriod = config.getInt("batch.period").seconds
}
object Consumer {
private val kafkaConfig = ConfigFactory.load().getConfig("kafka")
private val kafkaServers = kafkaConfig.getString("bootstrap.servers")
private val kafkaConsumerConfig = kafkaConfig.getConfig("consumer")
private val topic = kafkaConsumerConfig.getString("topic")
private val groupId = kafkaConsumerConfig.getString("group.id")
implicit val actorSystem = ActorSystem()
// Log failures and resume processing
private val decider: Supervision.Decider = { e =>
LoggerFactory.getLogger(this.getClass).error("Failed to process event", e)
Supervision.Resume
}
private val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
private val sourceSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaServers)
.withGroupId(groupId)
private val subscription = Subscriptions.topics(topic)
private val sinkSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
.withBootstrapServers(kafkaServers)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
val cornucopiaSource = ConsumerDSL.plainSource(sourceSettings, subscription)
val cornucopiaSink = Producer.plainSink(sinkSettings)
}
}
开发者ID:kliewkliew,项目名称:cornucopia,代码行数:51,代码来源:Config.scala
示例9: System
//设置package包名称以及导入依赖的类
package io.allquantor.system
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision}
import scala.concurrent.ExecutionContextExecutor
object System {
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
trait Executor {
protected implicit val executor: ExecutionContextExecutor = actorSystem.dispatcher
}
trait ErrorHandling {
val errorHandling: Attributes = ActorAttributes.supervisionStrategy {
case ex: java.net.ConnectException =>
// Damn side effects!!!
println(s"Error: $ex" )
Supervision.Restart
case _ =>
println("Error!")
Supervision.Restart
}
}
}
开发者ID:allquantor,项目名称:socketpublisher,代码行数:31,代码来源:System.scala
示例10: alwaysResume
//设置package包名称以及导入依赖的类
package com.pragmasoft.eventaggregator.streams
import akka.stream.Supervision
import akka.stream.scaladsl.Flow
import com.typesafe.scalalogging.LazyLogging
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
trait AkkaStreamFlowOperations {
self: LazyLogging =>
protected def alwaysResume(errorMessage: String): Supervision.Decider = {
case NonFatal(exc) =>
logger.error( errorMessage, exc)
Supervision.Resume
}
protected def filterAndLogFailures[T](errorMessage: String) = Flow[Try[T]].map {
case x @ Failure(ex) =>
logger.warn(errorMessage, ex)
identity(x)
case x =>
identity(x)
}
.collect { case Success(successfulValue) => successfulValue }
.named("FilterAndLogFailures")
}
开发者ID:galarragas,项目名称:event-aggregator,代码行数:30,代码来源:AkkaStreamFlowOperations.scala
示例11: RemoraApp
//设置package包名称以及导入依赖的类
import java.io.IOException
import java.net.ConnectException
import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.codahale.metrics.jvm.{ThreadStatesGaugeSet, MemoryUsageGaugeSet, GarbageCollectorMetricSet}
import scala.util.control.NonFatal
object RemoraApp extends App with nl.grons.metrics.scala.DefaultInstrumented {
private val actorSystemName: String = "remora"
implicit val actorSystem = ActorSystem(actorSystemName)
metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
metricRegistry.registerAll(new ThreadStatesGaugeSet)
lazy val decider: Supervision.Decider = {
case _: IOException | _: ConnectException | _: TimeoutException => Supervision.Restart
case NonFatal(err: Throwable) =>
actorSystem.log.error(err, "Unhandled Exception in Stream: {}", err.getMessage)
Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))(actorSystem)
implicit val executionContext = actorSystem.dispatchers.lookup("kafka-consumer-dispatcher")
val kafkaSettings = KafkaSettings(actorSystem.settings.config)
val consumer = new RemoraKafkaConsumerGroupService(kafkaSettings)
val kafkaClientActor = actorSystem.actorOf(KafkaClientActor.props(consumer), name = "kafka-client-actor")
Api(kafkaClientActor).start()
}
开发者ID:zalando-incubator,项目名称:remora,代码行数:38,代码来源:RemoraApp.scala
示例12: SourceAck
//设置package包名称以及导入依赖的类
package teleporter.integration.core
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, Source}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import org.scalatest.FunSuite
import teleporter.integration.utils.MapBean
import scala.concurrent.Await
import scala.concurrent.duration._
class SourceAck$Test extends FunSuite {
val decider: Supervision.Decider = {
case e ? println(e); Supervision.Resume
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
def ackFlow[T](id: Long, config: MapBean): Flow[SourceMessage[MapBean, T], AckMessage[MapBean, T], NotUsed] = {
Flow.fromGraph(new SourceAck[MapBean, T](
id = id,
config = SourceAckConfig(config),
commit = coordinate ? println(s"commit $coordinate"),
finish = coordinate ? println(s"complete $coordinate")
))
}
test("throughput") {
val start = System.currentTimeMillis()
val result = Source(1 to 1000000)
.map(data ? Message.source(MapBean(Map("XY" ? data)), data))
.via(ackFlow[Int](1L, MapBean(Map(
"ack" ? Map(
"channelSize" ? 1,
"commitInterval" ? "1.minute",
"cacheSize" ? 64,
"maxAge" ? "1.minute")
)))
)
.watchTermination()(Keep.right)
.to(SourceAck.confirmSink()).run()
Await.result(result, 1.minute)
println(System.currentTimeMillis() - start)
materializer.shutdown()
Await.result(system.terminate(), 1.minute)
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:52,代码来源:SourceAck$Test.scala
示例13: FileStreams
//设置package包名称以及导入依赖的类
package teleporter.integration.component.file
import java.nio.file.Paths
import java.nio.file.StandardOpenOption._
import java.util.concurrent.locks.LockSupport
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.util.ByteString
import com.google.common.io.Resources
import com.typesafe.config.ConfigFactory
import org.apache.logging.log4j.scala.Logging
import org.scalatest.FunSuite
import teleporter.integration.component.SinkRoller.SinkRollerSetting
import teleporter.integration.component.{Cron, SinkRoller}
import teleporter.integration.core.Message
import scala.concurrent.Await
import scala.concurrent.duration._
class FileStreams$Test extends FunSuite with Logging {
implicit val system = ActorSystem("instance", ConfigFactory.load("instance.conf"))
val decider: Supervision.Decider = {
case e: Exception ? logger.warn(e.getMessage, e); Supervision.Stop
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
test("file reader") {
val fu = Source.fromGraph(new FileSource(path = Paths.get(Resources.getResource("./broker.conf").toURI), offset = 100, bufferSize = 1024))
.runForeach(bs ? println(bs.utf8String))
Await.result(fu, 100.seconds)
println("end")
}
test("file write") {
var inc = 0
val fu = Source.tick(0.seconds, 1.seconds, "fjdkfj").map { i ? inc += 1; Message(FileByteString(byteString = ByteString(inc + "\r\n"), path = Some(s"/tmp/file/test_{0,date,yyyy-MM-dd_HH_mm_ss}_{1}.txt"))) }
.via(SinkRoller.flow[Message[FileByteString], Message[FileByteString]](
setting = SinkRollerSetting(cron = Some("* * * * *"), size = Some(100)),
cronRef = Cron.cronRef("teleporter_crontab"),
catchSize = _.data.byteString.length,
catchField = _.data.path.get,
rollKeep = (in, path) ? in.map(data ? data.copy(path = Some(path))),
rollDo = path ? Message(FileByteString(ByteString.empty, Some(path), total = 0))
)).via(Flow.fromGraph(new FileSink(path = Some("/tmp/file/test.txt"), openOptions = Set(CREATE, WRITE, APPEND))))
.toMat(Sink.ignore)(Keep.right).run()
Await.result(fu, 10.minutes)
println("end")
}
test("test") {
Source.single("test").map(_ ? throw new RuntimeException()).runForeach(println)
LockSupport.park()
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:55,代码来源:FileStreams$Test.scala
示例14: transform
//设置package包名称以及导入依赖的类
package io.surfkit.typebus.module
import java.util.UUID
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import io.surfkit.typebus.Mapper
import io.surfkit.typebus.event._
import org.apache.kafka.clients.producer.ProducerRecord
import org.joda.time.DateTime
import scala.concurrent.Future
import scala.reflect.ClassTag
trait Transformer extends Module{
def transform[T <: m.Model : ClassTag](p: PartialFunction[T, Future[m.Model]]) = op(p)
def startTransformer(consumerSettings: ConsumerSettings[Array[Byte], String], producerSettings: ProducerSettings[Array[Byte], String], mapper: Mapper)(implicit system: ActorSystem) = {
import system.dispatcher
val decider: Supervision.Decider = {
case _ => Supervision.Resume // Never give up !
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val consumerToProducer = new PartialFunction[(ConsumerMessage.CommittableMessage[Array[Byte], String],PublishedEvent[_], m.Model), ProducerMessage.Message[Array[Byte], String, ConsumerMessage.CommittableOffset]]{
def apply(x: (ConsumerMessage.CommittableMessage[Array[Byte], String],PublishedEvent[_], m.Model) ) = {
ProducerMessage.Message(new ProducerRecord[Array[Byte], String](
x._3.getClass.getCanonicalName,
mapper.writeValueAsString(PublishedEvent(
eventId = UUID.randomUUID.toString,
eventType = x._3.getClass.getCanonicalName,
userIdentifier = x._2.userIdentifier,
source = x._2.source,
socketId = x._2.socketId,
publishedAt = new DateTime(),
occurredAt = new DateTime(),
correlationId = x._2.correlationId,
payload = x._3))
), x._1.committableOffset)
}
def isDefinedAt(x: (ConsumerMessage.CommittableMessage[Array[Byte], String],PublishedEvent[_], m.Model) ) = true
}
Consumer.committableSource(consumerSettings, Subscriptions.topics(listOfTopics:_*))
.mapAsyncUnordered(1) { msg =>
val publish = mapper.readValue[PublishedEvent[m.Model]](msg.record.value())
val event = publish.copy(payload = mapper.readValue[m.Model](mapper.writeValueAsString(publish.payload)) ) // FIXME: we have to write and read again .. grrr !!
println(s"event: ${event}")
handleEvent(event.payload).map( x => (msg, event, x) )
}
.map(consumerToProducer)
.runWith(Producer.commitableSink(producerSettings))
}
}
开发者ID:coreyauger,项目名称:type-bus,代码行数:60,代码来源:Transformer.scala
注:本文中的akka.stream.Supervision类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论