• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ActorAttributes类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.ActorAttributes的典型用法代码示例。如果您正苦于以下问题:Scala ActorAttributes类的具体用法?Scala ActorAttributes怎么用?Scala ActorAttributes使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ActorAttributes类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: JmsSinkStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.jms

import javax.jms.{MessageProducer, TextMessage}

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape}

final class JmsSinkStage(settings: JmsSinkSettings) extends GraphStage[SinkShape[JmsTextMessage]] {

  private val in = Inlet[JmsTextMessage]("JmsSink.in")

  override def shape: SinkShape[JmsTextMessage] = SinkShape.of(in)

  override protected def initialAttributes: Attributes =
    ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with JmsConnector {

      private var jmsProducer: MessageProducer = _

      override private[jms] def jmsSettings = settings

      override def preStart(): Unit = {
        jmsSession = openSession()
        jmsProducer = jmsSession.session.createProducer(jmsSession.destination)
        if (settings.timeToLive.nonEmpty) {
          jmsProducer.setTimeToLive(settings.timeToLive.get.toMillis)
        }
        pull(in)
      }

      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit = {
            val elem: JmsTextMessage = grab(in)
            val textMessage: TextMessage = jmsSession.session.createTextMessage(elem.body)
            elem.properties.foreach {
              case (key, v) =>
                v match {
                  case v: String => textMessage.setStringProperty(key, v)
                  case v: Int => textMessage.setIntProperty(key, v)
                  case v: Boolean => textMessage.setBooleanProperty(key, v)
                  case v: Byte => textMessage.setByteProperty(key, v)
                  case v: Short => textMessage.setShortProperty(key, v)
                  case v: Long => textMessage.setLongProperty(key, v)
                  case v: Double => textMessage.setDoubleProperty(key, v)
                }
            }
            jmsProducer.send(textMessage)
            pull(in)
          }
        }
      )

      override def postStop(): Unit =
        Option(jmsSession).foreach(_.closeSession())
    }

} 
开发者ID:akka,项目名称:alpakka,代码行数:62,代码来源:JmsSinkStage.scala


示例2: GeodeContinuousSourceStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache

import scala.concurrent.{Future, Promise}

class GeodeContinuousSourceStage[V](cache: ClientCache, name: Symbol, sql: String)
    extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {

  override protected def initialAttributes: Attributes =
    Attributes
      .name("GeodeContinuousSource")
      .and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

  val out = Outlet[V](s"geode.continuousSource")

  override def shape: SourceShape[V] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val subPromise = Promise[Done]

    (new GeodeCQueryGraphLogic[V](shape, cache, name, sql) {

      override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
        subPromise.success(Done)
      }

      val onElement: AsyncCallback[V] = getAsyncCallback[V] { element =>
        if (isAvailable(out)) {
          pushElement(out, element)
        } else
          enqueue(element)
        handleTerminaison()
      }

      //
      // This handler, will first forward initial (old) result, then new ones (continuous).
      //
      setHandler(
        out,
        new OutHandler {
          override def onPull() = {
            if (initialResultsIterator.hasNext)
              push(out, initialResultsIterator.next())
            else
              dequeue() foreach { e =>
                pushElement(out, e)
              }
            handleTerminaison()
          }
        }
      )

    }, subPromise.future)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:60,代码来源:GeodeContinuousSourceStage.scala


示例3: GeodeFiniteSourceStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.geode.internal.stage

import akka.Done
import akka.stream.stage._
import akka.stream.{ActorAttributes, Attributes, Outlet, SourceShape}
import org.apache.geode.cache.client.ClientCache

import scala.concurrent.{Future, Promise}

class GeodeFiniteSourceStage[V](cache: ClientCache, sql: String)
    extends GraphStageWithMaterializedValue[SourceShape[V], Future[Done]] {

  override protected def initialAttributes: Attributes =
    Attributes.name("GeodeFiniteSource").and(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher"))

  val out = Outlet[V]("geode.finiteSource")

  override def shape: SourceShape[V] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val subPromise = Promise[Done]

    (new GeodeQueryGraphLogic[V](shape, cache, sql) {

      override val onConnect: AsyncCallback[Unit] = getAsyncCallback[Unit] { v =>
        subPromise.success(Done)
      }

      setHandler(
        out,
        new OutHandler {
          override def onPull() =
            if (initialResultsIterator.hasNext)
              push(out, initialResultsIterator.next())
            else
              completeStage()
        }
      )

    }, subPromise.future)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:43,代码来源:GeodeFiniteSourceStage.scala


示例4: StreamImporter

//设置package包名称以及导入依赖的类
package controllers.admin.gazetteers

import akka.stream.{ ActorAttributes, ClosedShape, Materializer, Supervision }
import akka.stream.scaladsl._
import akka.util.ByteString
import java.io.InputStream
import models.place.{ GazetteerRecord, PlaceService }
import play.api.Logger
import play.api.libs.json.Json
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.duration._

class StreamImporter(implicit materializer: Materializer) {
  
  private val BATCH_SIZE = 100
  
  private val decider: Supervision.Decider = {    
    case t: Throwable => 
      t.printStackTrace()
      Supervision.Stop    
  }
  
  def importPlaces(is: InputStream, crosswalk: String => Option[GazetteerRecord])(implicit places: PlaceService, ctx: ExecutionContext) = {
    
    val source = StreamConverters.fromInputStream(() => is, 1024)
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = Int.MaxValue, allowTruncation = false))
      .map(_.utf8String)
      
    val parser = Flow.fromFunction[String, Option[GazetteerRecord]](crosswalk)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .grouped(BATCH_SIZE)
      
    val importer = Sink.foreach[Seq[Option[GazetteerRecord]]] { records =>
      val toImport = records.flatten
      Await.result(places.importRecords(toImport), 60.minutes)
    }
    
    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      
      import GraphDSL.Implicits._
      
      source ~> parser ~> importer
      
      ClosedShape
    }).withAttributes(ActorAttributes.supervisionStrategy(decider))
        
    graph.run()
  }
  
} 
开发者ID:pelagios,项目名称:recogito2,代码行数:51,代码来源:StreamImporter.scala


示例5: decider

//设置package包名称以及导入依赖的类
package org.wex.cmsfs.common.core

import akka.stream.{ActorAttributes, Supervision}
import org.slf4j.Logger

trait CmsfsAkkaStream {

  val logger: Logger

  private def decider(f: (String) => String): Supervision.Decider = {
    case ex: Exception =>
      logger.error(f(ex.getMessage))
      Supervision.Resume
  }

  def supervisionStrategy(f: (String) => String) = {
    ActorAttributes.supervisionStrategy(decider(f))
  }

  def loggerFlow[T](elem: T, mess: String): T = {
    logger.info(mess)
    elem
  }
} 
开发者ID:shinhwagk,项目名称:cmsfs,代码行数:25,代码来源:CmsfsAkkaStream.scala


示例6: nonBlockingFlow

//设置package包名称以及导入依赖的类
package services

import akka.stream.{ ActorAttributes, Attributes }
import akka.stream.scaladsl.Flow

trait ServiceBase {

  protected def nonBlockingFlow[A, B, C](flow: Flow[A, B, C], signature: String): Flow[A, B, C] = {
    val tagName = s"${getClass.getSimpleName}#$signature"
    flow.withAttributes(Attributes.name(tagName)).log(tagName).async
  }

  protected def blockingFlow[A, B, C](flow: Flow[A, B, C], signature: String): Flow[A, B, C] = {
    val tagName = s"${getClass.getSimpleName}#$signature"
    val attribute = ActorAttributes.dispatcher("application.blocking-io-dispatcher")
    flow.withAttributes(Attributes.name(tagName) and attribute).log(tagName).async
  }

} 
开发者ID:hayasshi,项目名称:web-app-skeleton,代码行数:20,代码来源:ServiceBase.scala


示例7: BlockingGraphStage

//设置package包名称以及导入依赖的类
package de.sciss.fscape
package stream
package impl

import akka.stream.{ActorAttributes, Attributes, Shape}


abstract class BlockingGraphStage[S <: Shape](name: String)(implicit ctrl: Control)
  extends StageImpl[S](name) {

  override def initialAttributes: Attributes =
    if (ctrl.config.useAsync) {
      Attributes.name(toString) and
      ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")
    } else {
      Attributes.name(toString)
    }
} 
开发者ID:Sciss,项目名称:FScape-next,代码行数:19,代码来源:BlockingGraphStage.scala


示例8: HandlingErrorsApplication

//设置package包名称以及导入依赖的类
package com.packt.chapter8

import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.scaladsl._

object HandlingErrorsApplication extends App {

  implicit val actorSystem = ActorSystem("HandlingErrors")

  val streamDecider: Supervision.Decider = {
    case e: IndexOutOfBoundsException =>
      println("Dropping element because of IndexOufOfBoundException. Resuming.")
      Supervision.Resume
    case _ => Supervision.Stop
  }

  val flowDecider: Supervision.Decider = {
    case e: IllegalArgumentException =>
      println("Dropping element because of IllegalArgumentException. Restarting.")
      Supervision.Restart
    case _ => Supervision.Stop
  }

  val actorMaterializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(streamDecider)
  implicit val actorMaterializer = ActorMaterializer(actorMaterializerSettings)

  val words = List("Handling", "Errors", "In", "Akka", "Streams", "")

  val flow = Flow[String].map(word => {
    if(word.length == 0) throw new IllegalArgumentException("Empty words are not allowed")
    word
  }).withAttributes(ActorAttributes.supervisionStrategy(flowDecider))

  Source(words).via(flow).map(array => array(2)).to(Sink.foreach(println)).run()
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:37,代码来源:HandlingErrorsApplication.scala


示例9: System

//设置package包名称以及导入依赖的类
package io.allquantor.system

import akka.actor.ActorSystem
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision}

import scala.concurrent.ExecutionContextExecutor

object System {
  implicit val actorSystem = ActorSystem()
  implicit val mat = ActorMaterializer()

  trait Executor {
    protected implicit val executor: ExecutionContextExecutor = actorSystem.dispatcher
  }

  trait ErrorHandling {

    val errorHandling: Attributes = ActorAttributes.supervisionStrategy {
      case ex: java.net.ConnectException =>
        // Damn side effects!!!
        println(s"Error: $ex" )
        Supervision.Restart
      case _ =>
        println("Error!")
        Supervision.Restart
    }

  }

} 
开发者ID:allquantor,项目名称:socketpublisher,代码行数:31,代码来源:System.scala



注:本文中的akka.stream.ActorAttributes类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ClusterSingletonManager类代码示例发布时间:2022-05-23
下一篇:
Scala window类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap