• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ActorSystem类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.ActorSystem的典型用法代码示例。如果您正苦于以下问题:Scala ActorSystem类的具体用法?Scala ActorSystem怎么用?Scala ActorSystem使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ActorSystem类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Main

//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.event.Logging
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.google.inject.Guice
import service.documents.{DocumentService, DocumentServiceModule}
import service.health._

object Main extends App with HealthRoutes {
  
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher 

  val settings = Settings(system)

  val logger = Logging(system, getClass)

  private val injector = Guice.createInjector(DocumentServiceModule)
  private val docService = injector.getInstance(classOf[DocumentService])
  
  val routes = logRequestResult("", InfoLevel)(docService.docRoutes ~ healthRoutes)

  Http().bindAndHandle(routes, settings.Http.interface, settings.Http.port) map { binding =>
    logger.info(s"Server started on port {}", binding.localAddress.getPort)
  } recoverWith { case _ => system.terminate() }
} 
开发者ID:devknutst,项目名称:watermarkAkka,代码行数:30,代码来源:Main.scala


示例2: NodeTest

//设置package包名称以及导入依赖的类
package communication

import java.io.File

import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import index.{DirectoryIndex, FileIndex}


class NodeTest() extends TestKit(ActorSystem("NodeTest")) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {

  implicit val homeDirPath = "/home/mike/Programming/scala/Cloudia"


  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "Node" must {
    "send back directory index if pinged" in {
      val node: ActorRef = system.actorOf(Node.props(homeDirPath))
      node ! Ping()
      expectMsgPF() {
        case DirectoryIndex(_) => ()
      }
    }
  }

  it must {
    "send back file manifest if one was requested" in {
      val node: ActorRef = system.actorOf(Node.props(homeDirPath))
      val fileIndex = new FileIndex(new File("src/test/resources/chunkifierTest"))
      node ! Request(fileIndex)
      expectMsgPF() {
        case FileManifest(_, _) => ()
      }
    }
  }
} 
开发者ID:mprzewie,项目名称:cloudia-utils,代码行数:42,代码来源:NodeTest.scala


示例3: Settings

//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}

import scala.concurrent.Future

object Settings {
  def consumerSettings(implicit system: ActorSystem) =
    ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("CommittableSourceConsumer")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  def producerSettings(implicit system: ActorSystem) =
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")
}

object CommittableSource extends App {

  type KafkaMessage = CommittableMessage[Array[Byte], String]

  implicit val system = ActorSystem("CommittableSourceConsumerMain")
  implicit val materializer = ActorMaterializer()

  implicit val ec = system.dispatcher

  // explicit commit
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .mapAsync(1) { msg =>
      BusinessController.handleMessage(msg.record.value)
        .flatMap(response => msg.committableOffset.commitScaladsl())
        .recoverWith { case e => msg.committableOffset.commitScaladsl() }
    }
    .runWith(Sink.ignore)

}

object BusinessController {

  type Service[A, B] = A => Future[B]

  val handleMessage: Service[String, String] =
    (message) => Future.successful(message.toUpperCase)

} 
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala


示例4: Session

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.googlecloud.pubsub

import java.security.PrivateKey
import java.time.Instant

import akka.actor.ActorSystem
import akka.stream.Materializer

import scala.concurrent.Future

@akka.annotation.InternalApi
private[pubsub] class Session(clientEmail: String, privateKey: PrivateKey) {
  protected var maybeAccessToken: Option[Future[AccessTokenExpiry]] = None
  protected def now = Instant.now()
  protected val httpApi: HttpApi = HttpApi

  private def getNewToken()(implicit as: ActorSystem, materializer: Materializer): Future[AccessTokenExpiry] = {
    val accessToken = httpApi.getAccessToken(clientEmail = clientEmail, privateKey = privateKey, when = now)
    maybeAccessToken = Some(accessToken)
    accessToken
  }

  private def expiresSoon(g: AccessTokenExpiry): Boolean =
    g.expiresAt < (now.getEpochSecond + 60)

  def getToken()(implicit as: ActorSystem, materializer: Materializer): Future[String] = {
    import materializer.executionContext
    maybeAccessToken
      .getOrElse(getNewToken())
      .flatMap { result =>
        if (expiresSoon(result)) {
          getNewToken()
        } else {
          Future.successful(result)
        }
      }
      .map(_.accessToken)
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:40,代码来源:Session.scala


示例5: Start

//设置package包名称以及导入依赖的类
package sample.kernel.hello

import akka.actor.{ Actor, ActorSystem, Props }
import akka.kernel.Bootable

case object Start

class HelloActor extends Actor {
  val worldActor = context.actorOf(Props[WorldActor])

  def receive = {
    case Start => worldActor ! "Hello"
    case message: String =>
      println("Received message '%s'" format message)
  }
}

class WorldActor extends Actor {
  def receive = {
    case message: String => sender() ! (message.toUpperCase + " world!")
  }
}

class HelloKernel extends Bootable {
  val system = ActorSystem("hellokernel")

  def startup = {
    system.actorOf(Props[HelloActor]) ! Start
  }

  def shutdown = {
    system.shutdown()
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:35,代码来源:HelloKernel.scala


示例6: RatePredictor

//设置package包名称以及导入依赖的类
package com.ferhtaydn.rater

import akka.actor.ActorSystem
import com.ferhtaydn.models.PatientInfo
import org.apache.spark.ml.feature.StringIndexerModel
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.mllib.linalg.{ Matrix, Vector }
import org.apache.spark.sql.{ Row, SQLContext }

import scala.concurrent.{ ExecutionContextExecutor, Future }

class RatePredictor(system: ActorSystem, sqlContext: SQLContext,
    indexModel: StringIndexerModel, cvModel: CrossValidatorModel,
    confusionMatrix: String) {

  private val decimalFormatter = new java.text.DecimalFormat("##.##")
  private val blockingDispatcher: ExecutionContextExecutor = system.dispatchers.lookup("ml.predictor.dispatcher")

  def confusionMatrixString: Future[String] = {
    Future {
      confusionMatrix
    }(blockingDispatcher)
  }

  def predict(patientInfo: PatientInfo): Future[Either[String, Double]] = {

    Future {

      val df = sqlContext.createDataFrame(Seq(patientInfo.toRecord))
      val indexedJobDF = indexModel.transform(df)

      val result = cvModel
        .transform(indexedJobDF)
        .select("prediction", "probability").map {
          case Row(prediction: Double, probability: Vector) ?
            (probability, prediction)
        }

      result.collect().headOption match {
        case Some((prob, _)) ? Right(decimalFormatter.format(prob(1)).toDouble)
        case None            ? Left(s"No result can be predicted for the patient")
      }

    }(blockingDispatcher)
  }

} 
开发者ID:ferhtaydn,项目名称:canceRater,代码行数:48,代码来源:RatePredictor.scala


示例7: FactorialFrontend

//设置package包名称以及导入依赖的类
package sample.cluster.factorial

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.routing.FromConfig
import akka.actor.ReceiveTimeout

//#frontend
class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {

  val backend = context.actorOf(FromConfig.props(),
    name = "factorialBackendRouter")

  override def preStart(): Unit = {
    sendJobs()
    if (repeat) {
      context.setReceiveTimeout(10.seconds)
    }
  }

  def receive = {
    case (n: Int, factorial: BigInt) =>
      if (n == upToN) {
        log.debug("{}! = {}", n, factorial)
        if (repeat) sendJobs()
        else context.stop(self)
      }
    case ReceiveTimeout =>
      log.info("Timeout")
      sendJobs()
  }

  def sendJobs(): Unit = {
    log.info("Starting batch of factorials up to [{}]", upToN)
    1 to upToN foreach { backend ! _ }
  }
}
//#frontend

object FactorialFrontend {
  def main(args: Array[String]): Unit = {
    val upToN = 200

    val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
      withFallback(ConfigFactory.load("factorial"))

    val system = ActorSystem("ClusterSystem", config)
    system.log.info("Factorials will start when 2 backend members in the cluster.")
    //#registerOnUp
    Cluster(system) registerOnMemberUp {
      system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
        name = "factorialFrontend")
    }
    //#registerOnUp
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:62,代码来源:FactorialFrontend.scala


示例8: SimpleClusterApp

//设置package包名称以及导入依赖的类
package sample.cluster.simple

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props

object SimpleClusterApp {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty)
      startup(Seq("2551", "2552", "0"))
    else
      startup(args)
  }

  def startup(ports: Seq[String]): Unit = {
    ports foreach { port =>
      // Override the configuration of the port
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
        withFallback(ConfigFactory.load())

      // Create an Akka system
      val system = ActorSystem("ClusterSystem", config)
      // Create an actor that handles cluster domain events
      system.actorOf(Props[SimpleClusterListener], name = "clusterListener")
    }
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:29,代码来源:SimpleClusterApp.scala


示例9: Main2

//设置package包名称以及导入依赖的类
package sample.hello

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.ActorRef
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Terminated

object Main2 {

  def main(args: Array[String]): Unit = {
    val system = ActorSystem("Hello")
    val a = system.actorOf(Props[HelloWorld], "helloWorld")
    system.actorOf(Props(classOf[Terminator], a), "terminator")
  }

  class Terminator(ref: ActorRef) extends Actor with ActorLogging {
    context watch ref
    def receive = {
      case Terminated(_) =>
        log.info("{} has terminated, shutting down system", ref.path)
        context.system.shutdown()
    }
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:28,代码来源:Main2.scala


示例10: TestKitExtension

//设置package包名称以及导入依赖的类
package akka.testkit

import com.typesafe.config.Config
import akka.util.Timeout
import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
import scala.concurrent.duration.FiniteDuration

object TestKitExtension extends ExtensionId[TestKitSettings] {
  override def get(system: ActorSystem): TestKitSettings = super.get(system)
  def createExtension(system: ExtendedActorSystem): TestKitSettings = new TestKitSettings(system.settings.config)
}

class TestKitSettings(val config: Config) extends Extension {

  import akka.util.Helpers._

  val TestTimeFactor = config.getDouble("akka.test.timefactor").
    requiring(tf ? !tf.isInfinite && tf > 0, "akka.test.timefactor must be positive finite double")
  val SingleExpectDefaultTimeout: FiniteDuration = config.getMillisDuration("akka.test.single-expect-default")
  val TestEventFilterLeeway: FiniteDuration = config.getMillisDuration("akka.test.filter-leeway")
  val DefaultTimeout: Timeout = Timeout(config.getMillisDuration("akka.test.default-timeout"))
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:23,代码来源:TestKitExtension.scala


示例11: TestLatch

//设置package包名称以及导入依赖的类
package akka.testkit

import scala.concurrent.duration.Duration
import akka.actor.ActorSystem
import scala.concurrent.{ Await, CanAwait, Awaitable }
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import scala.concurrent.duration.FiniteDuration


object TestLatch {
  val DefaultTimeout = Duration(5, TimeUnit.SECONDS)

  def apply(count: Int = 1)(implicit system: ActorSystem) = new TestLatch(count)
}

class TestLatch(count: Int = 1)(implicit system: ActorSystem) extends Awaitable[Unit] {
  private var latch = new CountDownLatch(count)

  def countDown() = latch.countDown()
  def isOpen: Boolean = latch.getCount == 0
  def open() = while (!isOpen) countDown()
  def reset() = latch = new CountDownLatch(count)

  @throws(classOf[TimeoutException])
  def ready(atMost: Duration)(implicit permit: CanAwait) = {
    val waitTime = atMost match {
      case f: FiniteDuration ? f
      case _                 ? throw new IllegalArgumentException("TestLatch does not support waiting for " + atMost)
    }
    val opened = latch.await(waitTime.dilated.toNanos, TimeUnit.NANOSECONDS)
    if (!opened) throw new TimeoutException(
      "Timeout of %s with time factor of %s" format (atMost.toString, TestKitExtension(system).TestTimeFactor))
    this
  }
  @throws(classOf[Exception])
  def result(atMost: Duration)(implicit permit: CanAwait): Unit = {
    ready(atMost)
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:40,代码来源:TestLatch.scala


示例12: RemoteMetricsOn

//设置package包名称以及导入依赖的类
package akka.remote

import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import akka.actor.ActorSelectionMessage
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.event.Logging
import akka.routing.RouterEnvelope


private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteMetrics {

  private val logFrameSizeExceeding: Int = system.settings.config.getBytes(
    "akka.remote.log-frame-size-exceeding").toInt
  private val log = Logging(system, this.getClass)
  private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap

  override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit =
    if (payloadBytes >= logFrameSizeExceeding) {
      val clazz = msg match {
        case x: ActorSelectionMessage ? x.msg.getClass
        case x: RouterEnvelope        ? x.message.getClass
        case _                        ? msg.getClass
      }

      // 10% threshold until next log
      def newMax = (payloadBytes * 1.1).toInt

      @tailrec def check(): Unit = {
        val max = maxPayloadBytes.get(clazz)
        if (max eq null) {
          if (maxPayloadBytes.putIfAbsent(clazz, newMax) eq null)
            log.info("Payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes)
          else check()
        } else if (payloadBytes > max) {
          if (maxPayloadBytes.replace(clazz, max, newMax))
            log.info("New maximum payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes)
          else check()
        }
      }
      check()
    }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:48,代码来源:RemoteMetricsExtension.scala


示例13: DaemonicSpec

//设置package包名称以及导入依赖的类
package akka.remote

import akka.testkit._
import scala.concurrent.duration._
import akka.actor.{ Address, ExtendedActorSystem, ActorSystem }
import com.typesafe.config.ConfigFactory
import java.nio.channels.ServerSocketChannel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonicSpec extends AkkaSpec {

  def addr(sys: ActorSystem, proto: String) =
    sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get

  def unusedPort = {
    val ss = ServerSocketChannel.open().socket()
    ss.bind(new InetSocketAddress("localhost", 0))
    val port = ss.getLocalPort
    ss.close()
    port
  }

  "Remoting configured with daemonic = on" must {

    "shut down correctly after getting connection refused" in {
      // get all threads running before actor system i started
      val origThreads: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.to[Set]
      // create a separate actor system that we can check the threads for
      val daemonicSystem = ActorSystem("daemonic", ConfigFactory.parseString("""
        akka.daemonic = on
        akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
        akka.remote.netty.tcp.port = 0
        akka.log-dead-letters-during-shutdown = off
      """))

      val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
      val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor")
      selection ! "whatever"
      Thread.sleep(2.seconds.dilated.toMillis)

      // get new non daemonic threads running
      val newNonDaemons: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.seq.
        filter(t ? !origThreads(t) && t.isDaemon == false).to[Set]

      newNonDaemons should be(Set.empty[Thread])
      shutdown(daemonicSystem)
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:53,代码来源:DaemonicSpec.scala


示例14: EventStream

//设置package包名称以及导入依赖的类
package akka.event

import language.implicitConversions

import akka.actor.{ ActorRef, ActorSystem }
import akka.event.Logging.simpleName
import akka.util.Subclassification

object EventStream {
  //Why is this here and why isn't there a failing test if it is removed?
  implicit def fromActorSystem(system: ActorSystem) = system.eventStream
}


class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {

  type Event = AnyRef
  type Classifier = Class[_]

  protected implicit val subclassification = new Subclassification[Class[_]] {
    def isEqual(x: Class[_], y: Class[_]) = x == y
    def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
  }

  protected def classify(event: AnyRef): Class[_] = event.getClass

  protected def publish(event: AnyRef, subscriber: ActorRef) = {
    if (subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! event
  }

  override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
    super.subscribe(subscriber, channel)
  }

  override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    val ret = super.unsubscribe(subscriber, channel)
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
    ret
  }

  override def unsubscribe(subscriber: ActorRef) {
    if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
    super.unsubscribe(subscriber)
    if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:52,代码来源:EventStream.scala


示例15: FileBasedMailboxSettings

//设置package包名称以及导入依赖的类
package akka.actor.mailbox.filebased

import akka.actor.mailbox._
import com.typesafe.config.Config
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem

@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
  extends DurableMailboxSettings {

  def name: String = "file-based"

  val config = initialize
  import config._

  final val QueuePath: String = getString("directory-path")
  final val MaxItems: Int = getInt("max-items")
  final val MaxSize: Long = getBytes("max-size")
  final val MaxItemSize: Long = getBytes("max-item-size")
  final val MaxAge: FiniteDuration = Duration(getMilliseconds("max-age"), MILLISECONDS)
  final val MaxJournalSize: Long = getBytes("max-journal-size")
  final val MaxMemorySize: Long = getBytes("max-memory-size")
  final val MaxJournalOverflow: Int = getInt("max-journal-overflow")
  final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
  final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
  final val KeepJournal: Boolean = getBoolean("keep-journal")
  final val SyncJournal: Boolean = getBoolean("sync-journal")

  final val CircuitBreakerMaxFailures: Int = getInt("circuit-breaker.max-failures")
  final val CircuitBreakerCallTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
  final val CircuitBreakerResetTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:35,代码来源:FileBasedMailboxSettings.scala


示例16: CustomRoute

//设置package包名称以及导入依赖的类
package docs.camel

import akka.camel.CamelMessage
import akka.actor.Status.Failure

import language.existentials

object CustomRoute {
  object Sample1 {
    //#CustomRoute
    import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
    import akka.camel.{ CamelMessage, CamelExtension }
    import org.apache.camel.builder.RouteBuilder
    import akka.camel._
    class Responder extends Actor {
      def receive = {
        case msg: CamelMessage =>
          sender() ! (msg.mapBody {
            body: String => "received %s" format body
          })
      }
    }

    class CustomRouteBuilder(system: ActorSystem, responder: ActorRef)
      extends RouteBuilder {
      def configure {
        from("jetty:http://localhost:8877/camel/custom").to(responder)
      }
    }
    val system = ActorSystem("some-system")
    val camel = CamelExtension(system)
    val responder = system.actorOf(Props[Responder], name = "TestResponder")
    camel.context.addRoutes(new CustomRouteBuilder(system, responder))
    //#CustomRoute

  }
  object Sample2 {
    //#ErrorThrowingConsumer
    import akka.camel.Consumer

    import org.apache.camel.builder.Builder
    import org.apache.camel.model.RouteDefinition

    class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
      def receive = {
        case msg: CamelMessage => throw new Exception("error: %s" format msg.body)
      }
      override def onRouteDefinition = (rd) => rd.onException(classOf[Exception]).
        handled(true).transform(Builder.exceptionMessage).end

      final override def preRestart(reason: Throwable, message: Option[Any]) {
        sender() ! Failure(reason)
      }
    }
    //#ErrorThrowingConsumer
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:59,代码来源:CustomRoute.scala


示例17: PersistenceSerializerDocSpec

//设置package包名称以及导入依赖的类
package docs.persistence

import com.typesafe.config._
import scala.concurrent.duration._
import org.scalatest.WordSpec
import akka.actor.ActorSystem
import akka.serialization.{ Serializer, SerializationExtension }
import akka.testkit.TestKit

class PersistenceSerializerDocSpec extends WordSpec {

  val customSerializerConfig =
    """
      //#custom-serializer-config
      akka.actor {
        serializers {
          my-payload = "docs.persistence.MyPayloadSerializer"
          my-snapshot = "docs.persistence.MySnapshotSerializer"
        }
        serialization-bindings {
          "docs.persistence.MyPayload" = my-payload
          "docs.persistence.MySnapshot" = my-snapshot
        }
      }
      //#custom-serializer-config
    """.stripMargin

  val system = ActorSystem("PersistenceSerializerDocSpec", ConfigFactory.parseString(customSerializerConfig))
  try {
    SerializationExtension(system)
  } finally {
    TestKit.shutdownActorSystem(system, 10.seconds, false)
  }
}

class MyPayload
class MySnapshot

class MyPayloadSerializer extends Serializer {
  def identifier: Int = 77124
  def includeManifest: Boolean = false
  def toBinary(o: AnyRef): Array[Byte] = ???
  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}

class MySnapshotSerializer extends Serializer {
  def identifier: Int = 77125
  def includeManifest: Boolean = false
  def toBinary(o: AnyRef): Array[Byte] = ???
  def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:52,代码来源:PersistenceSerializerDocSpec.scala


示例18: MySpec

//设置package包名称以及导入依赖的类
package docs.testkit

//#plain-spec
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.{ TestActors, TestKit, ImplicitSender }
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll

//#implicit-sender
class MySpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  with WordSpecLike with Matchers with BeforeAndAfterAll {
  //#implicit-sender

  def this() = this(ActorSystem("MySpec"))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "An Echo actor" must {

    "send back messages unchanged" in {
      val echo = system.actorOf(TestActors.echoActorProps)
      echo ! "hello world"
      expectMsg("hello world")
    }

  }
}
//#plain-spec 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:34,代码来源:PlainWordSpec.scala


示例19: SomeMessage

//设置package包名称以及导入依赖的类
package docs.osgi

case object SomeMessage

class SomeActor extends akka.actor.Actor {
  def receive = { case SomeMessage => }
}

//#Activator
import akka.actor.{ Props, ActorSystem }
import org.osgi.framework.BundleContext
import akka.osgi.ActorSystemActivator

class Activator extends ActorSystemActivator {

  def configure(context: BundleContext, system: ActorSystem) {
    // optionally register the ActorSystem in the OSGi Service Registry
    registerService(context, system)

    val someActor = system.actorOf(Props[SomeActor], name = "someName")
    someActor ! SomeMessage
  }

}
//#Activator 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:26,代码来源:Activator.scala


示例20: CamelConfigSpec

//设置package包名称以及导入依赖的类
package akka.camel
import org.scalatest.Matchers
import org.scalatest.WordSpec
import akka.actor.ActorSystem
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit._
import akka.testkit.TestKit
import akka.util.Helpers.ConfigOps

class CamelConfigSpec extends WordSpec with Matchers {

  val (settings, config) = {
    val system = ActorSystem("CamelConfigSpec")
    val result = (CamelExtension(system).settings, system.settings.config)
    TestKit.shutdownActorSystem(system)
    result
  }
  "CamelConfigSpec" must {
    "have correct activationTimeout config" in {
      settings.ActivationTimeout should be(config.getMillisDuration("akka.camel.consumer.activation-timeout"))
    }

    "have correct autoAck config" in {
      settings.AutoAck should be(config.getBoolean("akka.camel.consumer.auto-ack"))
    }

    "have correct replyTimeout config" in {
      settings.ReplyTimeout should be(config.getMillisDuration("akka.camel.consumer.reply-timeout"))
    }

    "have correct streamingCache config" in {
      settings.StreamingCache should be(config.getBoolean("akka.camel.streamingCache"))
    }

    "have correct jmxStatistics config" in {
      settings.JmxStatistics should be(config.getBoolean("akka.camel.jmx"))
    }

    "have correct body conversions config" in {
      val conversions = config.getConfig("akka.camel.conversions")

      conversions.getString("file") should be("java.io.InputStream")
      conversions.entrySet.size should be(1)
    }

    "have correct Context Provider" in {
      settings.ContextProvider.isInstanceOf[DefaultContextProvider] should be(true)
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:51,代码来源:CamelConfigSpec.scala



注:本文中的akka.actor.ActorSystem类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ExecutionContext类代码示例发布时间:2022-05-23
下一篇:
Scala Future类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap