• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ExtendedActorSystem类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.ExtendedActorSystem的典型用法代码示例。如果您正苦于以下问题:Scala ExtendedActorSystem类的具体用法?Scala ExtendedActorSystem怎么用?Scala ExtendedActorSystem使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ExtendedActorSystem类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: SettingsImpl

//设置package包名称以及导入依赖的类
import akka.actor.{ExtendedActorSystem, ExtensionIdProvider, ExtensionId, Extension}
import com.typesafe.config._

class SettingsImpl(config: Config) extends Extension {

  object Http {
    lazy val interface = config.getString("http.interface")
    lazy val port = config.getInt("http.port")
  }
}

object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider {

  override def lookup = Settings

  override def createExtension(system: ExtendedActorSystem) =
    new SettingsImpl(system.settings.config)
} 
开发者ID:devknutst,项目名称:watermarkAkka,代码行数:19,代码来源:Settings.scala


示例2: InfluxDbConfig

//设置package包名称以及导入依赖的类
package io.soheila.cms.services.metrics.influx

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import com.typesafe.config.Config


class InfluxDbConfig(config: Config) extends Extension {
  val host: String = config.getString("influxDb.host")
  val port: Int = config.getInt("influxDb.port")
  val username: String = config.getString("influxDb.username")
  val password: String = config.getString("influxDb.password")
  val database: String = config.getString("influxDb.database")
}

object InfluxDbConfig extends ExtensionId[InfluxDbConfig] with ExtensionIdProvider {
  override def lookup() = InfluxDbConfig

  override def createExtension(system: ExtendedActorSystem): InfluxDbConfig =
    new InfluxDbConfig(system.settings.config)
} 
开发者ID:esfand-r,项目名称:soheila-cm,代码行数:21,代码来源:InfluxDbConfig.scala


示例3: trade

//设置package包名称以及导入依赖的类
package akka.performance.trading.domain

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorSystem }

abstract trait TradeObserver {
  def trade(bid: Bid, ask: Ask)
}

trait TotalTradeObserver extends TradeObserver {
  def system: ActorSystem
  private lazy val counter: TotalTradeCounter = TotalTradeCounterExtension(system)
  override def trade(bid: Bid, ask: Ask) {
    counter.increment()
  }
}

trait NopTradeObserver extends TradeObserver {
  override def trade(bid: Bid, ask: Ask) {
  }
}

class TotalTradeCounter extends Extension {
  private val counter = new AtomicInteger

  def increment() = counter.incrementAndGet()
  def reset() {
    counter.set(0)
  }
  def count: Int = counter.get
}

object TotalTradeCounterExtension
  extends ExtensionId[TotalTradeCounter]
  with ExtensionIdProvider {
  override def lookup = TotalTradeCounterExtension
  override def createExtension(system: ExtendedActorSystem) = new TotalTradeCounter
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:40,代码来源:TradeObserver.scala


示例4: TestKitExtension

//设置package包名称以及导入依赖的类
package akka.testkit

import com.typesafe.config.Config
import akka.util.Timeout
import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
import scala.concurrent.duration.FiniteDuration

object TestKitExtension extends ExtensionId[TestKitSettings] {
  override def get(system: ActorSystem): TestKitSettings = super.get(system)
  def createExtension(system: ExtendedActorSystem): TestKitSettings = new TestKitSettings(system.settings.config)
}

class TestKitSettings(val config: Config) extends Extension {

  import akka.util.Helpers._

  val TestTimeFactor = config.getDouble("akka.test.timefactor").
    requiring(tf ? !tf.isInfinite && tf > 0, "akka.test.timefactor must be positive finite double")
  val SingleExpectDefaultTimeout: FiniteDuration = config.getMillisDuration("akka.test.single-expect-default")
  val TestEventFilterLeeway: FiniteDuration = config.getMillisDuration("akka.test.filter-leeway")
  val DefaultTimeout: Timeout = Timeout(config.getMillisDuration("akka.test.default-timeout"))
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:23,代码来源:TestKitExtension.scala


示例5: serialize

//设置package包名称以及导入依赖的类
package akka.remote

import akka.remote.WireFormats._
import com.google.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializationExtension


  def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = {
    val s = SerializationExtension(system)
    val serializer = s.findSerializerFor(message)
    val builder = SerializedMessage.newBuilder
    builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
    builder.setSerializerId(serializer.identifier)
    if (serializer.includeManifest)
      builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
    builder.build
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:20,代码来源:MessageSerializer.scala


示例6: DaemonicSpec

//设置package包名称以及导入依赖的类
package akka.remote

import akka.testkit._
import scala.concurrent.duration._
import akka.actor.{ Address, ExtendedActorSystem, ActorSystem }
import com.typesafe.config.ConfigFactory
import java.nio.channels.ServerSocketChannel
import java.net.InetSocketAddress
import scala.collection.JavaConverters._

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonicSpec extends AkkaSpec {

  def addr(sys: ActorSystem, proto: String) =
    sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get

  def unusedPort = {
    val ss = ServerSocketChannel.open().socket()
    ss.bind(new InetSocketAddress("localhost", 0))
    val port = ss.getLocalPort
    ss.close()
    port
  }

  "Remoting configured with daemonic = on" must {

    "shut down correctly after getting connection refused" in {
      // get all threads running before actor system i started
      val origThreads: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.to[Set]
      // create a separate actor system that we can check the threads for
      val daemonicSystem = ActorSystem("daemonic", ConfigFactory.parseString("""
        akka.daemonic = on
        akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
        akka.remote.netty.tcp.port = 0
        akka.log-dead-letters-during-shutdown = off
      """))

      val unusedAddress = addr(daemonicSystem, "tcp").copy(port = Some(unusedPort))
      val selection = daemonicSystem.actorSelection(s"${unusedAddress}/user/SomeActor")
      selection ! "whatever"
      Thread.sleep(2.seconds.dilated.toMillis)

      // get new non daemonic threads running
      val newNonDaemons: Set[Thread] = Thread.getAllStackTraces().keySet().asScala.seq.
        filter(t ? !origThreads(t) && t.isDaemon == false).to[Set]

      newNonDaemons should be(Set.empty[Thread])
      shutdown(daemonicSystem)
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:53,代码来源:DaemonicSpec.scala


示例7: main

//设置package包名称以及导入依赖的类
package akka

import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Actor
import akka.actor.Terminated
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.ActorRef
import scala.util.control.NonFatal


  def main(args: Array[String]): Unit = {
    if (args.length != 1) {
      println("you need to provide exactly one argument: the class of the application supervisor actor")
    } else {
      val system = ActorSystem("Main")
      try {
        val appClass = system.asInstanceOf[ExtendedActorSystem].dynamicAccess.getClassFor[Actor](args(0)).get
        val app = system.actorOf(Props(appClass), "app")
        val terminator = system.actorOf(Props(classOf[Terminator], app), "app-terminator")
      } catch {
        case NonFatal(e) ? system.shutdown(); throw e
      }
    }
  }

  class Terminator(app: ActorRef) extends Actor with ActorLogging {
    context watch app
    def receive = {
      case Terminated(_) ?
        log.info("application supervisor has terminated, shutting down")
        context.system.shutdown()
    }
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:38,代码来源:Main.scala


示例8: DistributedPubSubMessageSerializerSpec

//设置package包名称以及导入依赖的类
package akka.contrib.pattern.protobuf

import akka.actor.{ ExtendedActorSystem, Address }
import akka.testkit.AkkaSpec
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.contrib.pattern.DistributedPubSubMediator.Internal._
import akka.actor.Props
import scala.collection.immutable.TreeMap

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DistributedPubSubMessageSerializerSpec extends AkkaSpec {

  val serializer = new DistributedPubSubMessageSerializer(system.asInstanceOf[ExtendedActorSystem])

  def checkSerialization(obj: AnyRef): Unit = {
    val blob = serializer.toBinary(obj)
    val ref = serializer.fromBinary(blob, obj.getClass)
    ref should be(obj)
  }

  " DistributedPubSubMessages" must {

    "be serializable" in {
      val address1 = Address("akka.tcp", "system", "some.host.org", 4711)
      val address2 = Address("akka.tcp", "system", "other.host.org", 4711)
      val address3 = Address("akka.tcp", "system", "some.host.org", 4712)
      val u1 = system.actorOf(Props.empty, "u1")
      val u2 = system.actorOf(Props.empty, "u2")
      val u3 = system.actorOf(Props.empty, "u3")
      val u4 = system.actorOf(Props.empty, "u4")
      checkSerialization(Status(Map(address1 -> 3, address2 -> 17, address3 -> 5)))
      checkSerialization(Delta(List(
        Bucket(address1, 3, TreeMap("/user/u1" -> ValueHolder(2, Some(u1)), "/user/u2" -> ValueHolder(3, Some(u2)))),
        Bucket(address2, 17, TreeMap("/user/u3" -> ValueHolder(17, Some(u3)))),
        Bucket(address3, 5, TreeMap("/user/u4" -> ValueHolder(4, Some(u4)), "/user/u5" -> ValueHolder(5, None))))))
      checkSerialization(Send("/user/u3", "hello", localAffinity = true))
      checkSerialization(SendToAll("/user/u3", "hello", allButSelf = true))
      checkSerialization(Publish("mytopic", "hello"))
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:42,代码来源:DistributedPubSubMessageSerializerSpec.scala


示例9: TransactorExtension

//设置package包名称以及导入依赖的类
package akka.transactor

import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem }
import akka.actor.Extension
import com.typesafe.config.Config
import akka.util.Timeout
import scala.concurrent.duration.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS


@deprecated("akka.transactor will be removed", "2.3")
object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider {
  override def get(system: ActorSystem): TransactorSettings = super.get(system)
  override def lookup: TransactorExtension.type = TransactorExtension
  override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config)
}

@deprecated("akka.transactor will be removed", "2.3")
class TransactorSettings(val config: Config) extends Extension {
  import config._
  val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:23,代码来源:TransactorExtension.scala


示例10: SettingsImpl

//设置package包名称以及导入依赖的类
package com.edu.chat

import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import com.typesafe.config.Config

class SettingsImpl(config: Config) extends Extension {
  val dbUsername: String = config.getString("mongo.username")
  val dbPassword: Array[Char] = config.getString("mongo.password").toCharArray
  val dbHost: String = config.getString("mongo.host")
  val dbName: String = config.getString("mongo.db")
  val dbPort: Int = config.getInt("mongo.port")
  val webPort: Int = config.getInt("web.port")
  val staticContentPath: String = config.getString("web.staticContent")
  val clientIndexPath: String = config.getString("web.client")
}

object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider {
  override def createExtension(system: ExtendedActorSystem): SettingsImpl = {
    new SettingsImpl(system.settings.config)
  }

  override def lookup(): ExtensionId[_ <: Extension] = Settings
} 
开发者ID:themirrortruth,项目名称:chat-akka,代码行数:24,代码来源:SettingsImpl.scala


示例11: ActorEventAdapter

//设置package包名称以及导入依赖的类
package im.actor.server.cqrs

import akka.actor.ExtendedActorSystem
import akka.persistence.journal.{ Tagged, EventSeq, EventAdapter }

final class ActorEventAdapter(system: ExtendedActorSystem) extends EventAdapter {
  override def manifest(event: Any): String = "V1"

  override def toJournal(event: Any): Any = {
    event match {
      case e: TaggedEvent ? Tagged(e, e.tags)
      case _              ? event
    }
  }

  override def fromJournal(event: Any, manifest: String): EventSeq =
    event match {
      case e: AnyRef ? EventSeq(e)
      case _         ? throw new IllegalArgumentException(s"Supported AnyRef but got: ${event.getClass}")
    }

} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:23,代码来源:ActorEventAdapter.scala


示例12: UserConfigExtension

//设置package包名称以及导入依赖的类
package im.actor.server.userconfig

import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import im.actor.api.rpc.configs.UpdateParameterChanged
import im.actor.hook.{ Hook2, Hook3, HooksStorage2, HooksStorage3 }
import im.actor.server.db.DbExtension
import im.actor.server.model.configs.Parameter
import im.actor.server.persist.configs.ParameterRepo
import im.actor.server.sequence.{ SeqState, SeqUpdatesExtension }
import im.actor.types._

import scala.concurrent.Future

object UserConfigExtension extends ExtensionId[UserConfigExtension] with ExtensionIdProvider {
  override def createExtension(system: ExtendedActorSystem) = new UserConfigExtension(system)

  override def lookup(): ExtensionId[_ <: Extension] = UserConfigExtension
}

final class UserConfigExtension(system: ActorSystem) extends Extension {

  import system.dispatcher

  private lazy val db = DbExtension(system).db
  private lazy val seqUpdExt = SeqUpdatesExtension(system)

  val hooks = new HooksStorage3[EditParameterHook, Any, UserId, String, Option[String]]()

  def fetchParameters(userId: Int): Future[Seq[(String, Option[String])]] = {
    for {
      params ? db.run(ParameterRepo.find(userId))
    } yield params.map(p ? p.key ? p.value)
  }

  def editParameter(userId: Int, authId: Long, rawKey: String, value: Option[String]): Future[SeqState] = {
    val key = rawKey.trim

    val update = UpdateParameterChanged(key, value)

    for {
      _ ? db.run(ParameterRepo.createOrUpdate(Parameter(userId, key, value)))
      seqState ? seqUpdExt.deliverClientUpdate(userId, authId, update)
    } yield {
      seqUpdExt.reloadSettings(userId)
      seqState
    }
  }
}

trait EditParameterHook extends Hook3[Any, UserId, String, Option[String]] 
开发者ID:wex5,项目名称:dangchat-server,代码行数:51,代码来源:UserConfigExtension.scala


示例13: main

//设置package包名称以及导入依赖的类
package akka

import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Actor
import akka.actor.Terminated
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.ActorRef
import scala.util.control.NonFatal


  def main(args: Array[String]): Unit = {
    if (args.length != 1) {
      println("you need to provide exactly one argument: the class of the application supervisor actor")
    } else {
      val system = ActorSystem("Main")
      try {
        val appClass = system.asInstanceOf[ExtendedActorSystem].dynamicAccess.getClassFor[Actor](args(0)).get
        val app = system.actorOf(Props(appClass), "app")
        val terminator = system.actorOf(Props(classOf[Terminator], app), "app-terminator")
      } catch {
        case NonFatal(e) ? system.terminate(); throw e
      }
    }
  }

  class Terminator(app: ActorRef) extends Actor with ActorLogging {
    context watch app
    def receive = {
      case Terminated(_) ?
        log.info("application supervisor has terminated, shutting down")
        context.system.terminate()
    }
  }

} 
开发者ID:rorygraves,项目名称:perf_tester,代码行数:38,代码来源:Main.scala


示例14: main

//设置package包名称以及导入依赖的类
package akka

import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Actor
import akka.actor.Terminated
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.ActorRef
import scala.util.control.NonFatal


  def main(args: Array[String]): Unit = {
    if (args.length != 1) {
      println("you need to provide exactly one argument: the class of the application supervisor actor")
    } else {
      val system = ActorSystem("Main")
      try {
        val appClass = system.asInstanceOf[ExtendedActorSystem].dynamicAccess.getClassFor[Actor](args(0)).get
        val app = system.actorOf(Props(appClass), "app")
        val terminator = system.actorOf(Props(classOf[Terminator], app), "app-terminator")
      } catch {
        case NonFatal(e) ? system.terminate(); throw e
      }
    }
  }

  class Terminator(app: ActorRef) extends Actor with ActorLogging {
    context watch app
    def receive: PartialFunction[Any, Unit] = {
      case Terminated(_) ?
        log.info("application supervisor has terminated, shutting down")
        context.system.terminate()
    }
  }

} 
开发者ID:rorygraves,项目名称:perf_tester,代码行数:38,代码来源:Main.scala


示例15: Settings

//设置package包名称以及导入依赖的类
package com.mikemunhall.simpletwitterstats

import akka.actor.{ActorContext, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import com.typesafe.config.Config

object Settings extends ExtensionId[Settings] with ExtensionIdProvider {

  override def lookup = Settings
  override def createExtension(system: ExtendedActorSystem) = new Settings(system.settings.config, system)
  def apply(context: ActorContext): Settings = apply(context.system)
}

class Settings(config: Config, extendedSystem: ExtendedActorSystem) extends Extension {

  object API {
    object Http {
      val Port = config.getInt("simple-twitter-stats.api.http.port")
      val Interface = config.getString("simple-twitter-stats.api.http.interface")
    }
  }

  object Server {
    object Twitter {
      object Client {
        object OAuth {
          object Consumer {
            val key = config.getString("simple-twitter-stats.server.twitter.client.oauth.consumer.key")
            val secret = config.getString("simple-twitter-stats.server.twitter.client.oauth.consumer.secret")
          }
          object Access {
            val token = config.getString("simple-twitter-stats.server.twitter.client.oauth.access.token")
            val secret = config.getString("simple-twitter-stats.server.twitter.client.oauth.access.secret")
          }
        }
      }
    }
  }

} 
开发者ID:mmunhall,项目名称:simple-twitter-stats,代码行数:40,代码来源:Settings.scala


示例16: MessageSerializer

//设置package包名称以及导入依赖的类
package aecor.distributedprocessing.serialization

import aecor.distributedprocessing.DistributedProcessingWorker.KeepRunning
import akka.actor.ExtendedActorSystem
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }

class MessageSerializer(val system: ExtendedActorSystem)
    extends SerializerWithStringManifest
    with BaseSerializer {
  val KeepRunningManifest = "A"
  override def manifest(o: AnyRef): String = o match {
    case KeepRunning(_) => KeepRunningManifest
    case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
  }

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case KeepRunning(workerId) => msg.KeepRunning(workerId).toByteArray
    case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
    manifest match {
      case KeepRunningManifest =>
        KeepRunning(msg.KeepRunning.parseFrom(bytes).workerId)
      case other => throw new IllegalArgumentException(s"Unknown manifest [$other]")
    }
} 
开发者ID:notxcain,项目名称:aecor,代码行数:28,代码来源:MessageSerializer.scala


示例17: apply

//设置package包名称以及导入依赖的类
package akka.persistence.cassandra

import akka.Done
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.event.Logging
import akka.persistence.cassandra.session.CassandraSessionSettings
import akka.persistence.cassandra.session.scaladsl.CassandraSession
import com.datastax.driver.core.Session

import scala.concurrent.{ ExecutionContext, Future }


  def apply(system: ActorSystem, metricsCategory: String, init: Session => Future[Unit])(
    implicit executionContext: ExecutionContext
  ): CassandraSession = {
    val log = Logging(system, classOf[CassandraSession])
    val provider = SessionProvider(
      system.asInstanceOf[ExtendedActorSystem],
      system.settings.config.getConfig("cassandra-journal")
    )
    val settings = CassandraSessionSettings(system.settings.config.getConfig("cassandra-journal"))
    new CassandraSession(
      system,
      provider,
      settings,
      executionContext,
      log,
      metricsCategory,
      init.andThen(_.map(_ => Done))
    )
  }
} 
开发者ID:notxcain,项目名称:aecor,代码行数:33,代码来源:CassandraSessionInitSerialization.scala


示例18: PersistentRepr

//设置package包名称以及导入依赖的类
package aecor.runtime.akkapersistence.serialization

import akka.actor.ExtendedActorSystem
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }

final case class PersistentRepr(manifest: String, payload: Array[Byte])

class PersistentReprSerializer(val system: ExtendedActorSystem)
    extends SerializerWithStringManifest
    with BaseSerializer {

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case pr: PersistentRepr => pr.payload
    case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
  }

  override def manifest(o: AnyRef): String = o match {
    case pr: PersistentRepr => pr.manifest
    case x => throw new IllegalArgumentException(s"Serialization of [$x] is not supported")
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
    PersistentRepr(manifest, bytes)
} 
开发者ID:notxcain,项目名称:aecor,代码行数:25,代码来源:PersistentRepr.scala


示例19: OrderTaggingEventAdapter

//设置package包名称以及导入依赖的类
package poc.persistence.write

import java.nio.charset.Charset

import akka.actor.ExtendedActorSystem
import akka.event.Logging
import akka.persistence.journal.{Tagged, WriteEventAdapter}
import org.json4s.DefaultFormats
import poc.persistence.events.{OrderCancelled, OrderInitialized}

class OrderTaggingEventAdapter(actorSystem: ExtendedActorSystem) extends WriteEventAdapter {

  private val log = Logging.getLogger(actorSystem, this)

  override def toJournal(event: Any): Any = event match {
    case e: OrderInitialized =>
      log.debug("tagging OrderInitialized event")
      Tagged(e, Set("UserEvent"))
    case e: OrderCancelled =>
      log.debug("tagged OrderCancelled event")
      Tagged(e, Set("UserEvent"))
  }

  override def manifest(event: Any): String = ""
}

import akka.serialization.Serializer

class EventSerialization(actorSystem: ExtendedActorSystem) extends Serializer {

  import org.json4s.jackson.Serialization.{read, write}

  private val log = Logging.getLogger(actorSystem, this)

  val UTF8: Charset = Charset.forName("UTF-8")

  implicit val formats = DefaultFormats

  // Completely unique value to identify this implementation of Serializer, used to optimize network traffic.
  // Values from 0 to 16 are reserved for Akka internal usage.
  // Make sure this does not conflict with any other kind of serializer or you will have problems
  override def identifier: Int = 90020001

  override def includeManifest = true

  override def fromBinary(bytes: Array[Byte], manifestOpt: Option[Class[_]]): AnyRef = {
    implicit val manifest = manifestOpt match {
      case Some(x) => Manifest.classType(x)
      case None => Manifest.AnyRef
    }
    val str = new String(bytes, UTF8)
    val result = read(str)
    result
  }

  override def toBinary(o: AnyRef): Array[Byte] = {
    val jsonString = write(o)
    val dat = write(o).getBytes(UTF8)
    dat
  }
} 
开发者ID:logicaalternativa,项目名称:akka-persistence-POC,代码行数:62,代码来源:Infrastructure.scala


示例20: ChatEventSerializer

//设置package包名称以及导入依赖的类
package chat

import akka.actor.ExtendedActorSystem
import akka.serialization.{BaseSerializer, SerializerWithStringManifest}
import play.api.libs.json.Json


class ChatEventSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
  override def manifest(o: AnyRef) = o match {
    case _: ChatMessage => "M"
    case _: JoinRoom => "J"
    case _: LeaveRoom => "L"
    case other => sys.error("Don't know how to serialize " + other)
  }

  override def toBinary(o: AnyRef) = {
    val json = o match {
      case cm: ChatMessage => Json.toJson(cm)
      case jr: JoinRoom => Json.toJson(jr)
      case lr: LeaveRoom => Json.toJson(lr)
      case other => sys.error("Don't know how to serialize " + other)
    }
    Json.toBytes(json)
  }

  override def fromBinary(bytes: Array[Byte], manifest: String) = {
    val json = Json.parse(bytes)
    manifest match {
      case "M" => json.as[ChatMessage]
      case "J" => json.as[JoinRoom]
      case "L" => json.as[LeaveRoom]
      case other => sys.error("Unknown manifest " + other)
    }
  }
} 
开发者ID:playframework,项目名称:play-socket.io,代码行数:36,代码来源:ChatEventSerializer.scala



注:本文中的akka.actor.ExtendedActorSystem类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala lit类代码示例发布时间:2022-05-23
下一篇:
Scala AuthParamsControllerConfig类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap