本文整理汇总了Scala中akka.actor.ActorSystem类的典型用法代码示例。如果您正苦于以下问题:Scala ActorSystem类的具体用法?Scala ActorSystem怎么用?Scala ActorSystem使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ActorSystem类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Main
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.event.Logging
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.google.inject.Guice
import service.documents.{DocumentService, DocumentServiceModule}
import service.health._
object Main extends App with HealthRoutes {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val settings = Settings(system)
val logger = Logging(system, getClass)
private val injector = Guice.createInjector(DocumentServiceModule)
private val docService = injector.getInstance(classOf[DocumentService])
val routes = logRequestResult("", InfoLevel)(docService.docRoutes ~ healthRoutes)
Http().bindAndHandle(routes, settings.Http.interface, settings.Http.port) map { binding =>
logger.info(s"Server started on port {}", binding.localAddress.getPort)
} recoverWith { case _ => system.terminate() }
}
开发者ID:devknutst,项目名称:watermarkAkka,代码行数:30,代码来源:Main.scala
示例2: NodeTest
//设置package包名称以及导入依赖的类
package communication
import java.io.File
import akka.actor.{ActorRef, ActorSystem}
import akka.testkit.{ImplicitSender, TestKit}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import index.{DirectoryIndex, FileIndex}
class NodeTest() extends TestKit(ActorSystem("NodeTest")) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
implicit val homeDirPath = "/home/mike/Programming/scala/Cloudia"
override def afterAll {
TestKit.shutdownActorSystem(system)
}
"Node" must {
"send back directory index if pinged" in {
val node: ActorRef = system.actorOf(Node.props(homeDirPath))
node ! Ping()
expectMsgPF() {
case DirectoryIndex(_) => ()
}
}
}
it must {
"send back file manifest if one was requested" in {
val node: ActorRef = system.actorOf(Node.props(homeDirPath))
val fileIndex = new FileIndex(new File("src/test/resources/chunkifierTest"))
node ! Request(fileIndex)
expectMsgPF() {
case FileManifest(_, _) => ()
}
}
}
}
开发者ID:mprzewie,项目名称:cloudia-utils,代码行数:42,代码来源:NodeTest.scala
示例3: Settings
//设置package包名称以及导入依赖的类
package com.scalaio.kafka.consumer
import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.scalaio.kafka.consumer.Settings.consumerSettings
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.Future
object Settings {
def consumerSettings(implicit system: ActorSystem) =
ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("CommittableSourceConsumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def producerSettings(implicit system: ActorSystem) =
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("localhost:9092")
}
object CommittableSource extends App {
type KafkaMessage = CommittableMessage[Array[Byte], String]
implicit val system = ActorSystem("CommittableSourceConsumerMain")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// explicit commit
Consumer
.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
BusinessController.handleMessage(msg.record.value)
.flatMap(response => msg.committableOffset.commitScaladsl())
.recoverWith { case e => msg.committableOffset.commitScaladsl() }
}
.runWith(Sink.ignore)
}
object BusinessController {
type Service[A, B] = A => Future[B]
val handleMessage: Service[String, String] =
(message) => Future.successful(message.toUpperCase)
}
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:CommittableSource.scala
示例4: Session
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.googlecloud.pubsub
import java.security.PrivateKey
import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.Materializer
import scala.concurrent.Future
@akka.annotation.InternalApi
private[pubsub] class Session(clientEmail: String, privateKey: PrivateKey) {
protected var maybeAccessToken: Option[Future[AccessTokenExpiry]] = None
protected def now = Instant.now()
protected val httpApi: HttpApi = HttpApi
private def getNewToken()(implicit as: ActorSystem, materializer: Materializer): Future[AccessTokenExpiry] = {
val accessToken = httpApi.getAccessToken(clientEmail = clientEmail, privateKey = privateKey, when = now)
maybeAccessToken = Some(accessToken)
accessToken
}
private def expiresSoon(g: AccessTokenExpiry): Boolean =
g.expiresAt < (now.getEpochSecond + 60)
def getToken()(implicit as: ActorSystem, materializer: Materializer): Future[String] = {
import materializer.executionContext
maybeAccessToken
.getOrElse(getNewToken())
.flatMap { result =>
if (expiresSoon(result)) {
getNewToken()
} else {
Future.successful(result)
}
}
.map(_.accessToken)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:40,代码来源:Session.scala
示例5: Start
//设置package包名称以及导入依赖的类
package sample.kernel.hello
import akka.actor.{ Actor, ActorSystem, Props }
import akka.kernel.Bootable
case object Start
class HelloActor extends Actor {
val worldActor = context.actorOf(Props[WorldActor])
def receive = {
case Start => worldActor ! "Hello"
case message: String =>
println("Received message '%s'" format message)
}
}
class WorldActor extends Actor {
def receive = {
case message: String => sender() ! (message.toUpperCase + " world!")
}
}
class HelloKernel extends Bootable {
val system = ActorSystem("hellokernel")
def startup = {
system.actorOf(Props[HelloActor]) ! Start
}
def shutdown = {
system.shutdown()
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:35,代码来源:HelloKernel.scala
示例6: RatePredictor
//设置package包名称以及导入依赖的类
package com.ferhtaydn.rater
import akka.actor.ActorSystem
import com.ferhtaydn.models.PatientInfo
import org.apache.spark.ml.feature.StringIndexerModel
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.mllib.linalg.{ Matrix, Vector }
import org.apache.spark.sql.{ Row, SQLContext }
import scala.concurrent.{ ExecutionContextExecutor, Future }
class RatePredictor(system: ActorSystem, sqlContext: SQLContext,
indexModel: StringIndexerModel, cvModel: CrossValidatorModel,
confusionMatrix: String) {
private val decimalFormatter = new java.text.DecimalFormat("##.##")
private val blockingDispatcher: ExecutionContextExecutor = system.dispatchers.lookup("ml.predictor.dispatcher")
def confusionMatrixString: Future[String] = {
Future {
confusionMatrix
}(blockingDispatcher)
}
def predict(patientInfo: PatientInfo): Future[Either[String, Double]] = {
Future {
val df = sqlContext.createDataFrame(Seq(patientInfo.toRecord))
val indexedJobDF = indexModel.transform(df)
val result = cvModel
.transform(indexedJobDF)
.select("prediction", "probability").map {
case Row(prediction: Double, probability: Vector) ?
(probability, prediction)
}
result.collect().headOption match {
case Some((prob, _)) ? Right(decimalFormatter.format(prob(1)).toDouble)
case None ? Left(s"No result can be predicted for the patient")
}
}(blockingDispatcher)
}
}
开发者ID:ferhtaydn,项目名称:canceRater,代码行数:48,代码来源:RatePredictor.scala
示例7: FactorialFrontend
//设置package包名称以及导入依赖的类
package sample.cluster.factorial
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.routing.FromConfig
import akka.actor.ReceiveTimeout
//#frontend
class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
val backend = context.actorOf(FromConfig.props(),
name = "factorialBackendRouter")
override def preStart(): Unit = {
sendJobs()
if (repeat) {
context.setReceiveTimeout(10.seconds)
}
}
def receive = {
case (n: Int, factorial: BigInt) =>
if (n == upToN) {
log.debug("{}! = {}", n, factorial)
if (repeat) sendJobs()
else context.stop(self)
}
case ReceiveTimeout =>
log.info("Timeout")
sendJobs()
}
def sendJobs(): Unit = {
log.info("Starting batch of factorials up to [{}]", upToN)
1 to upToN foreach { backend ! _ }
}
}
//#frontend
object FactorialFrontend {
def main(args: Array[String]): Unit = {
val upToN = 200
val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
withFallback(ConfigFactory.load("factorial"))
val system = ActorSystem("ClusterSystem", config)
system.log.info("Factorials will start when 2 backend members in the cluster.")
//#registerOnUp
Cluster(system) registerOnMemberUp {
system.actorOf(Props(classOf[FactorialFrontend], upToN, true),
name = "factorialFrontend")
}
//#registerOnUp
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:62,代码来源:FactorialFrontend.scala
示例8: SimpleClusterApp
//设置package包名称以及导入依赖的类
package sample.cluster.simple
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
object SimpleClusterApp {
def main(args: Array[String]): Unit = {
if (args.isEmpty)
startup(Seq("2551", "2552", "0"))
else
startup(args)
}
def startup(ports: Seq[String]): Unit = {
ports foreach { port =>
// Override the configuration of the port
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load())
// Create an Akka system
val system = ActorSystem("ClusterSystem", config)
// Create an actor that handles cluster domain events
system.actorOf(Props[SimpleClusterListener], name = "clusterListener")
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:29,代码来源:SimpleClusterApp.scala
示例9: Main2
//设置package包名称以及导入依赖的类
package sample.hello
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.ActorRef
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Terminated
object Main2 {
def main(args: Array[String]): Unit = {
val system = ActorSystem("Hello")
val a = system.actorOf(Props[HelloWorld], "helloWorld")
system.actorOf(Props(classOf[Terminator], a), "terminator")
}
class Terminator(ref: ActorRef) extends Actor with ActorLogging {
context watch ref
def receive = {
case Terminated(_) =>
log.info("{} has terminated, shutting down system", ref.path)
context.system.shutdown()
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:28,代码来源:Main2.scala
示例10: TestKitExtension
//设置package包名称以及导入依赖的类
package akka.testkit
import com.typesafe.config.Config
import akka.util.Timeout
import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
import scala.concurrent.duration.FiniteDuration
object TestKitExtension extends ExtensionId[TestKitSettings] {
override def get(system: ActorSystem): TestKitSettings = super.get(system)
def createExtension(system: ExtendedActorSystem): TestKitSettings = new TestKitSettings(system.settings.config)
}
class TestKitSettings(val config: Config) extends Extension {
import akka.util.Helpers._
val TestTimeFactor = config.getDouble("akka.test.timefactor").
requiring(tf ? !tf.isInfinite && tf > 0, "akka.test.timefactor must be positive finite double")
val SingleExpectDefaultTimeout: FiniteDuration = config.getMillisDuration("akka.test.single-expect-default")
val TestEventFilterLeeway: FiniteDuration = config.getMillisDuration("akka.test.filter-leeway")
val DefaultTimeout: Timeout = Timeout(config.getMillisDuration("akka.test.default-timeout"))
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:23,代码来源:TestKitExtension.scala
示例11: TestLatch
//设置package包名称以及导入依赖的类
package akka.testkit
import scala.concurrent.duration.Duration
import akka.actor.ActorSystem
import scala.concurrent.{ Await, CanAwait, Awaitable }
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
import scala.concurrent.duration.FiniteDuration
object TestLatch {
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
def apply(count: Int = 1)(implicit system: ActorSystem) = new TestLatch(count)
}
class TestLatch(count: Int = 1)(implicit system: ActorSystem) extends Awaitable[Unit] {
private var latch = new CountDownLatch(count)
def countDown() = latch.countDown()
def isOpen: Boolean = latch.getCount == 0
def open() = while (!isOpen) countDown()
def reset() = latch = new CountDownLatch(count)
@throws(classOf[TimeoutException])
def ready(atMost: Duration)(implicit permit: CanAwait) = {
val waitTime = atMost match {
case f: FiniteDuration ? f
case _ ? throw new IllegalArgumentException("TestLatch does not support waiting for " + atMost)
}
val opened = latch.await(waitTime.dilated.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TimeoutException(
"Timeout of %s with time factor of %s" format (atMost.toString, TestKitExtension(system).TestTimeFactor))
this
}
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): Unit = {
ready(atMost)
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:40,代码来源:TestLatch.scala
示例12: RemoteMetricsOn
//设置package包名称以及导入依赖的类
package akka.remote
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import akka.actor.ActorSelectionMessage
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.event.Logging
import akka.routing.RouterEnvelope
private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteMetrics {
private val logFrameSizeExceeding: Int = system.settings.config.getBytes(
"akka.remote.log-frame-size-exceeding").toInt
private val log = Logging(system, this.getClass)
private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap
override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit =
if (payloadBytes >= logFrameSizeExceeding) {
val clazz = msg match {
case x: ActorSelectionMessage ? x.msg.getClass
case x: RouterEnvelope ? x.message.getClass
case _ ? msg.getClass
}
// 10% threshold until next log
def newMax = (payloadBytes * 1.1).toInt
@tailrec def check(): Unit = {
val max = maxPayloadBytes.get(clazz)
if (max eq null) {
if (maxPayloadBytes.putIfAbsent(clazz, newMax) eq null)
log.info("Payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes)
else check()
} else if (payloadBytes > max) {
if (maxPayloadBytes.replace(clazz, max, newMax))
log.info("New maximum payload size for [{}] is [{}] bytes", clazz.getName, payloadBytes)
else check()
}
}
check()
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:48,代码来源:RemoteMetricsExtension.scala
示例13: DaemonicSpec
//设置package包名称以及导入依赖的类
package akka.remote
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.{ Address, ExtendedActorSystem, ActorSystem }
import com.typesafe.config.ConfigFactory
import java.nio.channels.ServerSocketChannel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonicSpec extends AkkaSpec {
def addr(sys: ActorSystem, proto: String) =
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
def unusedPort = {
val ss = ServerSocketChannel.open().socket()
ss.bind(new InetSocketAddress("localhost", 0))
val port = ss.getLocalPort
ss.close()
port
}
"Remoting configured with daemonic = on" must {
"shut down correctly after getting connection refused" in {
// get all threads running before actor system i started
val origThreads: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.to[Set]
// create a separate actor system that we can check the threads for
val daemonicSystem = ActorSystem("daemonic", ConfigFactory.parseString("""
akka.daemonic = on
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
akka.remote.netty.tcp.port = 0
akka.log-dead-letters-during-shutdown = off
"""))
val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor")
selection ! "whatever"
Thread.sleep(2.seconds.dilated.toMillis)
// get new non daemonic threads running
val newNonDaemons: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.seq.
filter(t ? !origThreads(t) && t.isDaemon == false).to[Set]
newNonDaemons should be(Set.empty[Thread])
shutdown(daemonicSystem)
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:53,代码来源:DaemonicSpec.scala
示例14: EventStream
//设置package包名称以及导入依赖的类
package akka.event
import language.implicitConversions
import akka.actor.{ ActorRef, ActorSystem }
import akka.event.Logging.simpleName
import akka.util.Subclassification
object EventStream {
//Why is this here and why isn't there a failing test if it is removed?
implicit def fromActorSystem(system: ActorSystem) = system.eventStream
}
class EventStream(private val debug: Boolean = false) extends LoggingBus with SubchannelClassification {
type Event = AnyRef
type Classifier = Class[_]
protected implicit val subclassification = new Subclassification[Class[_]] {
def isEqual(x: Class[_], y: Class[_]) = x == y
def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
}
protected def classify(event: AnyRef): Class[_] = event.getClass
protected def publish(event: AnyRef, subscriber: ActorRef) = {
if (subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! event
}
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
super.subscribe(subscriber, channel)
}
override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
val ret = super.unsubscribe(subscriber, channel)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
ret
}
override def unsubscribe(subscriber: ActorRef) {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
super.unsubscribe(subscriber)
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:52,代码来源:EventStream.scala
示例15: FileBasedMailboxSettings
//设置package包名称以及导入依赖的类
package akka.actor.mailbox.filebased
import akka.actor.mailbox._
import com.typesafe.config.Config
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
@deprecated("durable mailboxes are superseded by akka-persistence", "2.3")
class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name: String = "file-based"
val config = initialize
import config._
final val QueuePath: String = getString("directory-path")
final val MaxItems: Int = getInt("max-items")
final val MaxSize: Long = getBytes("max-size")
final val MaxItemSize: Long = getBytes("max-item-size")
final val MaxAge: FiniteDuration = Duration(getMilliseconds("max-age"), MILLISECONDS)
final val MaxJournalSize: Long = getBytes("max-journal-size")
final val MaxMemorySize: Long = getBytes("max-memory-size")
final val MaxJournalOverflow: Int = getInt("max-journal-overflow")
final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
final val KeepJournal: Boolean = getBoolean("keep-journal")
final val SyncJournal: Boolean = getBoolean("sync-journal")
final val CircuitBreakerMaxFailures: Int = getInt("circuit-breaker.max-failures")
final val CircuitBreakerCallTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
final val CircuitBreakerResetTimeout: FiniteDuration = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:35,代码来源:FileBasedMailboxSettings.scala
示例16: CustomRoute
//设置package包名称以及导入依赖的类
package docs.camel
import akka.camel.CamelMessage
import akka.actor.Status.Failure
import language.existentials
object CustomRoute {
object Sample1 {
//#CustomRoute
import akka.actor.{ Props, ActorSystem, Actor, ActorRef }
import akka.camel.{ CamelMessage, CamelExtension }
import org.apache.camel.builder.RouteBuilder
import akka.camel._
class Responder extends Actor {
def receive = {
case msg: CamelMessage =>
sender() ! (msg.mapBody {
body: String => "received %s" format body
})
}
}
class CustomRouteBuilder(system: ActorSystem, responder: ActorRef)
extends RouteBuilder {
def configure {
from("jetty:http://localhost:8877/camel/custom").to(responder)
}
}
val system = ActorSystem("some-system")
val camel = CamelExtension(system)
val responder = system.actorOf(Props[Responder], name = "TestResponder")
camel.context.addRoutes(new CustomRouteBuilder(system, responder))
//#CustomRoute
}
object Sample2 {
//#ErrorThrowingConsumer
import akka.camel.Consumer
import org.apache.camel.builder.Builder
import org.apache.camel.model.RouteDefinition
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
def receive = {
case msg: CamelMessage => throw new Exception("error: %s" format msg.body)
}
override def onRouteDefinition = (rd) => rd.onException(classOf[Exception]).
handled(true).transform(Builder.exceptionMessage).end
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender() ! Failure(reason)
}
}
//#ErrorThrowingConsumer
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:59,代码来源:CustomRoute.scala
示例17: PersistenceSerializerDocSpec
//设置package包名称以及导入依赖的类
package docs.persistence
import com.typesafe.config._
import scala.concurrent.duration._
import org.scalatest.WordSpec
import akka.actor.ActorSystem
import akka.serialization.{ Serializer, SerializationExtension }
import akka.testkit.TestKit
class PersistenceSerializerDocSpec extends WordSpec {
val customSerializerConfig =
"""
//#custom-serializer-config
akka.actor {
serializers {
my-payload = "docs.persistence.MyPayloadSerializer"
my-snapshot = "docs.persistence.MySnapshotSerializer"
}
serialization-bindings {
"docs.persistence.MyPayload" = my-payload
"docs.persistence.MySnapshot" = my-snapshot
}
}
//#custom-serializer-config
""".stripMargin
val system = ActorSystem("PersistenceSerializerDocSpec", ConfigFactory.parseString(customSerializerConfig))
try {
SerializationExtension(system)
} finally {
TestKit.shutdownActorSystem(system, 10.seconds, false)
}
}
class MyPayload
class MySnapshot
class MyPayloadSerializer extends Serializer {
def identifier: Int = 77124
def includeManifest: Boolean = false
def toBinary(o: AnyRef): Array[Byte] = ???
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}
class MySnapshotSerializer extends Serializer {
def identifier: Int = 77125
def includeManifest: Boolean = false
def toBinary(o: AnyRef): Array[Byte] = ???
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:52,代码来源:PersistenceSerializerDocSpec.scala
示例18: MySpec
//设置package包名称以及导入依赖的类
package docs.testkit
//#plain-spec
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.{ TestActors, TestKit, ImplicitSender }
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
//#implicit-sender
class MySpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
//#implicit-sender
def this() = this(ActorSystem("MySpec"))
override def afterAll {
TestKit.shutdownActorSystem(system)
}
"An Echo actor" must {
"send back messages unchanged" in {
val echo = system.actorOf(TestActors.echoActorProps)
echo ! "hello world"
expectMsg("hello world")
}
}
}
//#plain-spec
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:34,代码来源:PlainWordSpec.scala
示例19: SomeMessage
//设置package包名称以及导入依赖的类
package docs.osgi
case object SomeMessage
class SomeActor extends akka.actor.Actor {
def receive = { case SomeMessage => }
}
//#Activator
import akka.actor.{ Props, ActorSystem }
import org.osgi.framework.BundleContext
import akka.osgi.ActorSystemActivator
class Activator extends ActorSystemActivator {
def configure(context: BundleContext, system: ActorSystem) {
// optionally register the ActorSystem in the OSGi Service Registry
registerService(context, system)
val someActor = system.actorOf(Props[SomeActor], name = "someName")
someActor ! SomeMessage
}
}
//#Activator
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:26,代码来源:Activator.scala
示例20: CamelConfigSpec
//设置package包名称以及导入依赖的类
package akka.camel
import org.scalatest.Matchers
import org.scalatest.WordSpec
import akka.actor.ActorSystem
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit._
import akka.testkit.TestKit
import akka.util.Helpers.ConfigOps
class CamelConfigSpec extends WordSpec with Matchers {
val (settings, config) = {
val system = ActorSystem("CamelConfigSpec")
val result = (CamelExtension(system).settings, system.settings.config)
TestKit.shutdownActorSystem(system)
result
}
"CamelConfigSpec" must {
"have correct activationTimeout config" in {
settings.ActivationTimeout should be(config.getMillisDuration("akka.camel.consumer.activation-timeout"))
}
"have correct autoAck config" in {
settings.AutoAck should be(config.getBoolean("akka.camel.consumer.auto-ack"))
}
"have correct replyTimeout config" in {
settings.ReplyTimeout should be(config.getMillisDuration("akka.camel.consumer.reply-timeout"))
}
"have correct streamingCache config" in {
settings.StreamingCache should be(config.getBoolean("akka.camel.streamingCache"))
}
"have correct jmxStatistics config" in {
settings.JmxStatistics should be(config.getBoolean("akka.camel.jmx"))
}
"have correct body conversions config" in {
val conversions = config.getConfig("akka.camel.conversions")
conversions.getString("file") should be("java.io.InputStream")
conversions.entrySet.size should be(1)
}
"have correct Context Provider" in {
settings.ContextProvider.isInstanceOf[DefaultContextProvider] should be(true)
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:51,代码来源:CamelConfigSpec.scala
注:本文中的akka.actor.ActorSystem类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论