本文整理汇总了Scala中akka.stream.ActorAttributes类的典型用法代码示例。如果您正苦于以下问题:Scala ActorAttributes类的具体用法?Scala ActorAttributes怎么用?Scala ActorAttributes使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActorAttributes类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: JmsSinkStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.jms
import javax.jms.{MessageProducer, TextMessage}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape}
final class JmsSinkStage(settings: JmsSinkSettings) extends GraphStage[SinkShape[JmsTextMessage]] {
private val in = Inlet[JmsTextMessage]("JmsSink.in")
override def shape: SinkShape[JmsTextMessage] = SinkShape.of(in)
override protected def initialAttributes: Attributes =
ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with JmsConnector {
private var jmsProducer: MessageProducer = _
override private[jms] def jmsSettings = settings
override def preStart(): Unit = {
jmsSession = openSession()
jmsProducer = jmsSession.session.createProducer(jmsSession.destination)
if (settings.timeToLive.nonEmpty) {
jmsProducer.setTimeToLive(settings.timeToLive.get.toMillis)
}
pull(in)
}
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem: JmsTextMessage = grab(in)
val textMessage: TextMessage = jmsSession.session.createTextMessage(elem.body)
elem.properties.foreach {
case (key, v) =>
v match {
case v: String => textMessage.setStringProperty(key, v)
case v: Int => textMessage.setIntProperty(key, v)
case v: Boolean => textMessage.setBooleanProperty(key, v)
case v: Byte => textMessage.setByteProperty(key, v)
case v: Short => textMessage.setShortProperty(key, v)
case v: Long => textMessage.setLongProperty(key, v)
case v: Double => textMessage.setDoubleProperty(key, v)
}
}
jmsProducer.send(textMessage)
pull(in)
}
}
)
override def postStop(): Unit =
Option(jmsSession).foreach(_.closeSession())
}
}
开发者ID:akka,项目名称:alpakka,代码行数:62,代码来源:JmsSinkStage.scala
示例2: GeodeContinuousSourceStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage
import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache
import scala.concurrent.{Future, Promise}
class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String)
extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {
override protected def initialAttributes: Attributes =
Attributes
.name("GeodeContinuousSource")
.and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
val out = Outlet[V](s"geode.continuousSource")
override def shape: SourceShape[V] = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val subPromise = Promise[Done]
(new GeodeCQueryGraphLogic[V](shape, cache, name, sql) {
override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
subPromise.success(Done)
}
val onElement: AsyncCallback[V] = getAsyncCallback[V] { element =>
if (isAvailable(out)) {
pushElement(out, element)
} else
enqueue(element)
handleTerminaison()
}
//
// This handler, will first forward initial (old) result, then new ones (continuous).
//
setHandler(
out,
new OutHandler {
override def onPull() = {
if (initialResultsIterator.hasNext)
push(out, initialResultsIterator.next())
else
dequeue() foreach { e =>
pushElement(out, e)
}
handleTerminaison()
}
}
)
}, subPromise.future)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:60,代码来源:GeodeContinuousSourceStage.scala
示例3: GeodeFiniteSourceStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage
import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache
import scala.concurrent.{Future, Promise}
class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String)
extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {
override protected def initialAttributes: Attributes =
Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))
val out = Outlet[V]("geode.finiteSource")
override def shape: SourceShape[V] = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val subPromise = Promise[Done]
(new GeodeQueryGraphLogic[V](shape, cache, sql) {
override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
subPromise.success(Done)
}
setHandler(
out,
new OutHandler {
override def onPull() =
if (initialResultsIterator.hasNext)
push(out, initialResultsIterator.next())
else
completeStage()
}
)
}, subPromise.future)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:43,代码来源:GeodeFiniteSourceStage.scala
示例4: StreamImporter
//设置package包名称以及导入依赖的类
package controllers.admin.gazetteers
import akka.stream.{ ActorAttributes, ClosedShape, Materializer, Supervision }
import akka.stream.scaladsl._
import akka.util.ByteString
import java.io.InputStream
import models.place.{ GazetteerRecord, PlaceService }
import play.api.Logger
import play.api.libs.json.Json
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._
class StreamImporter(implicit materializer: Materializer) {
private val BATCH_SIZE = 100
private val decider: Supervision.Decider = {
case t: Throwable =>
t.printStackTrace()
Supervision.Stop
}
def importPlaces(is: InputStream, crosswalk: String => Option[GazetteerRecord])(implicit places: PlaceService, ctx: ExecutionContext) = {
val source = StreamConverters.fromInputStream(() => is, 1024)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = Int.MaxValue, allowTruncation = false))
.map(_.utf8String)
val parser = Flow.fromFunction[String, Option[GazetteerRecord]](crosswalk)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.grouped(BATCH_SIZE)
val importer = Sink.foreach[Seq[Option[GazetteerRecord]]] { records =>
val toImport = records.flatten
Await.result(places.importRecords(toImport), 60.minutes)
}
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
source ~> parser ~> importer
ClosedShape
}).withAttributes(ActorAttributes.supervisionStrategy(decider))
graph.run()
}
}
开发者ID:pelagios,项目名称:recogito2,代码行数:51,代码来源:StreamImporter.scala
示例5: decider
//设置package包名称以及导入依赖的类
package org.wex.cmsfs.common.core
import akka.stream.{ActorAttributes, Supervision}
import org.slf4j.Logger
trait CmsfsAkkaStream {
val logger: Logger
private def decider(f: (String) => String): Supervision.Decider = {
case ex: Exception =>
logger.error(f(ex.getMessage))
Supervision.Resume
}
def supervisionStrategy(f: (String) => String) = {
ActorAttributes.supervisionStrategy(decider(f))
}
def loggerFlow[T](elem: T, mess: String): T = {
logger.info(mess)
elem
}
}
开发者ID:shinhwagk,项目名称:cmsfs,代码行数:25,代码来源:CmsfsAkkaStream.scala
示例6: nonBlockingFlow
//设置package包名称以及导入依赖的类
package services
import akka.stream.{ ActorAttributes, Attributes }
import akka.stream.scaladsl.Flow
trait ServiceBase {
protected def nonBlockingFlow[A, B, C](flow: Flow[A, B, C], signature: String): Flow[A, B, C] = {
val tagName = s"${getClass.getSimpleName}#$signature"
flow.withAttributes(Attributes.name(tagName)).log(tagName).async
}
protected def blockingFlow[A, B, C](flow: Flow[A, B, C], signature: String): Flow[A, B, C] = {
val tagName = s"${getClass.getSimpleName}#$signature"
val attribute = ActorAttributes.dispatcher("application.blocking-io-dispatcher")
flow.withAttributes(Attributes.name(tagName) and attribute).log(tagName).async
}
}
开发者ID:hayasshi,项目名称:web-app-skeleton,代码行数:20,代码来源:ServiceBase.scala
示例7: BlockingGraphStage
//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream
package impl
import akka.stream.{ActorAttributes, Attributes, Shape}
abstract class BlockingGraphStage[S <: Shape](name: String)(implicit ctrl: Control)
extends StageImpl[S](name) {
override def initialAttributes: Attributes =
if (ctrl.config.useAsync) {
Attributes.name(toString) and
ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")
} else {
Attributes.name(toString)
}
}
开发者ID:Sciss,项目名称:FScape-next,代码行数:19,代码来源:BlockingGraphStage.scala
示例8: HandlingErrorsApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.scaladsl._
object HandlingErrorsApplication extends App {
implicit val actorSystem = ActorSystem("HandlingErrors")
val streamDecider: Supervision.Decider = {
case e: IndexOutOfBoundsException =>
println("Dropping element because of IndexOufOfBoundException. Resuming.")
Supervision.Resume
case _ => Supervision.Stop
}
val flowDecider: Supervision.Decider = {
case e: IllegalArgumentException =>
println("Dropping element because of IllegalArgumentException. Restarting.")
Supervision.Restart
case _ => Supervision.Stop
}
val actorMaterializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(streamDecider)
implicit val actorMaterializer = ActorMaterializer(actorMaterializerSettings)
val words = List("Handling", "Errors", "In", "Akka", "Streams", "")
val flow = Flow[String].map(word => {
if(word.length == 0) throw new IllegalArgumentException("Empty words are not allowed")
word
}).withAttributes(ActorAttributes.supervisionStrategy(flowDecider))
Source(words).via(flow).map(array => array(2)).to(Sink.foreach(println)).run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:37,代码来源:HandlingErrorsApplication.scala
示例9: System
//设置package包名称以及导入依赖的类
package io.allquantor.system
import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision}
import scala.concurrent.ExecutionContextExecutor
object System {
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
trait Executor {
protected implicit val executor: ExecutionContextExecutor = actorSystem.dispatcher
}
trait ErrorHandling {
val errorHandling: Attributes = ActorAttributes.supervisionStrategy {
case ex: java.net.ConnectException =>
// Damn side effects!!!
println(s"Error: $ex" )
Supervision.Restart
case _ =>
println("Error!")
Supervision.Restart
}
}
}
开发者ID:allquantor,项目名称:socketpublisher,代码行数:31,代码来源:System.scala
注:本文中的akka.stream.ActorAttributes类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论