本文整理汇总了Scala中akka.cluster.sharding.ClusterSharding类的典型用法代码示例。如果您正苦于以下问题:Scala ClusterSharding类的具体用法?Scala ClusterSharding怎么用?Scala ClusterSharding使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ClusterSharding类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GroupProcessorRegion
//设置package包名称以及导入依赖的类
package im.actor.server.group
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
object GroupProcessorRegion {
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
case GroupEnvelope(groupId, Some(dialogEnvelope), _, _) ?
(
groupId.toString,
dialogEnvelope
)
case env @ GroupEnvelope(groupId, _, command, query) ?
(
groupId.toString,
// payload
if (query.isDefined) {
env.getField(GroupEnvelope.descriptor.findFieldByNumber(query.number))
} else {
env.getField(GroupEnvelope.descriptor.findFieldByNumber(command.number))
}
)
}
private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
case env: GroupEnvelope ? (env.groupId % 100).toString // TODO: configurable
}
private val typeName = "GroupProcessor"
private def start(props: Props)(implicit system: ActorSystem): GroupProcessorRegion =
GroupProcessorRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
def start()(implicit system: ActorSystem): GroupProcessorRegion = start(GroupProcessor.props)
def startProxy()(implicit system: ActorSystem): GroupProcessorRegion =
GroupProcessorRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
}
case class GroupProcessorRegion(ref: ActorRef)
case class GroupViewRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:54,代码来源:GroupProcessorRegion.scala
示例2: SeqUpdatesManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.sequence
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.event.Logging
import scala.util.{ Success, Try }
final case class SeqUpdatesManagerRegion(ref: ActorRef)
object SeqUpdatesManagerRegion {
import UserSequenceCommands._
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
val log = Logging(system, getClass)
{
case e @ Envelope(userId, payload) ? (userId.toString, Try(e.getField(Envelope.descriptor.findFieldByNumber(payload.number))) match {
case Success(any) ? any
case _ ?
val error = new RuntimeException(s"Payload not found for $e")
log.error(error, error.getMessage)
throw error
})
}
}
private val extractShardId: ShardRegion.ExtractShardId = {
case Envelope(userId, _) ? (userId % 10).toString // TODO: configurable
}
private val typeName = "SeqUpdatesManager"
private def start(props: Props)(implicit system: ActorSystem): SeqUpdatesManagerRegion =
SeqUpdatesManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId
))
def start()(
implicit
system: ActorSystem
): SeqUpdatesManagerRegion =
start(UserSequence.props)
def startProxy()(implicit system: ActorSystem): SeqUpdatesManagerRegion =
SeqUpdatesManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId
))
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:58,代码来源:SeqUpdatesManagerRegion.scala
示例3: MormontShardRegion
//设置package包名称以及导入依赖的类
package lib.cluster
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import lib.MormontConfig
import lib.cluster.MormontShardRegion.MormontShardRegionActorRef
import lib.models.PayLoad
abstract class MormontShardRegion[T <: PayLoad](actorSystem: ActorSystem) {
private val prefix : String = "shard-region-"
def channelName: String
def entityProps: Props
final private val extractEntityId: ShardRegion.ExtractEntityId = {
case p: PayLoad ?
(p.persistenceId, p)
}
final private val extractShardId: ShardRegion.ExtractShardId = {
case p: PayLoad ?
(math.abs(p.persistenceId.hashCode) % MormontConfig.numOfShards).toString
}
final def startShard(): MormontShardRegionActorRef = {
ClusterSharding(actorSystem).start(
typeName = prefix + channelName,
entityProps = entityProps,
settings = ClusterShardingSettings(actorSystem),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
}
}
object MormontShardRegion {
type MormontShardRegionActorRef = ActorRef
}
开发者ID:sharma-rohit,项目名称:mormont,代码行数:40,代码来源:MormontShardRegion.scala
示例4: ShardedApp
//设置package包名称以及导入依赖的类
package com.michalplachta.shoesorter.api
import akka.actor.{ ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
import com.michalplachta.shoesorter.SortingDecider
import com.typesafe.config.ConfigFactory
import kamon.Kamon
object ShardedApp extends App {
val config = ConfigFactory.load("sharded")
implicit val system = ActorSystem(config getString "application.name", config)
ClusterSharding(system).start(
typeName = SortingDecider.shardName,
entityProps = SortingDecider.props,
settings = ClusterShardingSettings(system),
extractEntityId = SortingDecider.extractEntityId,
extractShardId = SortingDecider.extractShardId
)
val guardian = ClusterSharding(system).shardRegion(SortingDecider.shardName)
new RestInterface(guardian, config getInt "application.exposed-port")
}
开发者ID:miciek,项目名称:monitoring-akka-prometheus-kamon,代码行数:24,代码来源:ShardedApp.scala
示例5: WeakUpdatesManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.sequence
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterShardingSettings, ClusterSharding, ShardRegion }
import im.actor.server.sequence.WeakUpdatesManager.Envelope
object WeakUpdatesManagerRegion {
private val extractEntityId: ShardRegion.ExtractEntityId = {
case env @ Envelope(authId, payload) ? (authId.toString, env)
}
private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
case Envelope(authId, _) ? (authId % 32).toString // TODO: configurable
}
private val typeName = "WeakUpdatesManager"
private def startRegion(props: Props)(implicit system: ActorSystem): WeakUpdatesManagerRegion = {
WeakUpdatesManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
def startRegion()(implicit system: ActorSystem): WeakUpdatesManagerRegion = startRegion(WeakUpdatesManager.props)
def startRegionProxy()(implicit system: ActorSystem): WeakUpdatesManagerRegion =
WeakUpdatesManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
case class WeakUpdatesManagerRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:41,代码来源:WeakUpdatesManagerRegion.scala
示例6: GroupPresenceManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.presences
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.GroupPresenceManager.Envelope
object GroupPresenceManagerRegion {
private val extractEntityId: ShardRegion.ExtractEntityId = {
case env @ Envelope(userId, payload) ? (userId.toString, env)
}
private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
}
private val typeName = "GroupPresenceManager"
private def startRegion(props: Props)(implicit system: ActorSystem): GroupPresenceManagerRegion =
GroupPresenceManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
def startRegion()(implicit system: ActorSystem): GroupPresenceManagerRegion = startRegion(GroupPresenceManager.props)
def startRegionProxy()(implicit system: ActorSystem): GroupPresenceManagerRegion =
GroupPresenceManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
case class GroupPresenceManagerRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:39,代码来源:GroupPresenceManagerRegion.scala
示例7: PresenceManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.presences
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.PresenceManager.Envelope
object PresenceManagerRegion {
private val extractEntityId: ShardRegion.ExtractEntityId = {
case env @ Envelope(userId, payload) ? (userId.toString, payload)
}
private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
}
private val typeName = "PresenceManager"
private def startRegion(props: Props)(implicit system: ActorSystem): PresenceManagerRegion =
PresenceManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
def startRegion()(implicit system: ActorSystem): PresenceManagerRegion = startRegion(PresenceManager.props)
def startRegionProxy()(implicit system: ActorSystem): PresenceManagerRegion =
PresenceManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
case class PresenceManagerRegion(val ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:40,代码来源:PresenceManagerRegion.scala
示例8: UserProcessorRegion
//设置package包名称以及导入依赖的类
package im.actor.server.user
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
object UserProcessorRegion {
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
{
case c: UserCommand ? (c.userId.toString, c)
case q: UserQuery ? (q.userId.toString, q)
case e @ UserEnvelope(
userId,
dialogRootEnvelope,
dialogEnvelope
) ?
(
userId.toString,
dialogRootEnvelope.getOrElse(dialogEnvelope.get)
)
}
}
private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
case c: UserCommand ? (c.userId % 100).toString // TODO: configurable
case q: UserQuery ? (q.userId % 100).toString
case e: UserEnvelope ? (e.userId % 100).toString
}
val typeName = "UserProcessor"
private def start(props: Props)(implicit system: ActorSystem): UserProcessorRegion =
UserProcessorRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
def start()(implicit system: ActorSystem): UserProcessorRegion =
start(UserProcessor.props)
def startProxy()(implicit system: ActorSystem): UserProcessorRegion =
UserProcessorRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
}
final case class UserProcessorRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:53,代码来源:UserProcessorRegion.scala
示例9: entityId
//设置package包名称以及导入依赖的类
package com.mooneyserver.dublinpubs.shite_talking.protocol
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.mooneyserver.dublinpubs.shite_talking.protocol.actors.ShiteTalker
import com.mooneyserver.dublinpubs.shite_talking.protocol.actors.models.{DrunkenWaffle, ShiteTalkerIsYapping}
trait ShiteTalkingActors {
lazy val actorSystem = ActorSystem("Shite-Talkers")
val messageExtractor = new HashCodeMessageExtractor(25) {
override def entityId(message: Any): String = message match {
case msg @ ShiteTalkerIsYapping(id) => id
case msg @ DrunkenWaffle(id, _, _, _) => id
}
}
lazy val shiteTalkersShard: ActorRef = ClusterSharding(actorSystem).start(
typeName = "ShiteTalker",
entityProps = Props[ShiteTalker],
settings = ClusterShardingSettings(actorSystem),
messageExtractor = messageExtractor)
}
开发者ID:irishshagua,项目名称:dublin-pubs-shite-talking,代码行数:26,代码来源:ShiteTalkingActors.scala
示例10: UserWorkerSharding
//设置package包名称以及导入依赖的类
package homeworkzen.clustering
import akka.actor.{ActorRef, ActorSystem, Props}
import homeworkzen.domain.command.actor._
import homeworkzen.domain.command.message._
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import homeworkzen.Config
object UserWorkerSharding {
def register(implicit system: ActorSystem): ActorRef =
ClusterSharding(system).start(
"UserWorker",
Props(new UserWorker),
ClusterShardingSettings(system),
idExtractor,
shardResolver)
def props(): Props = Props(new UserWorker)
val idExtractor: ShardRegion.ExtractEntityId = {
case msg: UserCommand => (msg.userId.id.toString, msg)
}
val shardResolver: ShardRegion.ExtractShardId = {
case msg: UserCommand => (math.abs(msg.userId.id.hashCode()) % Config.Cluster.shardCount).toString
}
val shardName: String = "UserSharding"
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:30,代码来源:UserWorkerSharding.scala
示例11: ShardedIdGenerator
//设置package包名称以及导入依赖的类
package org.tanukkii.reactive.snowflake
import akka.actor.{ActorRef, Props, Actor}
import akka.cluster.sharding.{ClusterShardingSettings, ClusterSharding}
class ShardedIdGenerator extends Actor {
ClusterSharding(context.system).start(
typeName = ShardedIdRouter.shardName,
entityProps = ShardedIdRouter.props,
settings = ClusterShardingSettings(context.system),
extractEntityId = ShardedIdRouter.idExtractor,
extractShardId = ShardedIdRouter.shardResolver
)
def shardedIdRouter: ActorRef = {
ClusterSharding(context.system).shardRegion(ShardedIdRouter.shardName)
}
def receive: Receive = {
case msg => {
shardedIdRouter forward msg
}
}
}
object ShardedIdGenerator {
def props: Props = Props(new ShardedIdGenerator)
def name: String = "idGenerator"
}
开发者ID:TanUkkii007,项目名称:reactive-snowflake,代码行数:32,代码来源:ShardedIdGenerator.scala
示例12: PubSubTest
//设置package包名称以及导入依赖的类
package com.mooneyserver.akkapubsub
import akka.actor.ActorSystem
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
object PubSubTest {
implicit val system = ActorSystem("PubSubTestSystem")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def main(args: Array[String]) {
val betShard = ClusterSharding(system).start(
typeName = "SubscriberShard",
entityProps = SubscribingActor.props,
settings = ClusterShardingSettings(system),
extractEntityId = SubscribingActor.idExtractor,
extractShardId = SubscribingActor.shardExtractor)
val selectionShard = ClusterSharding(system).start(
typeName = "PublisherShard",
entityProps = PublisherActor.props,
settings = ClusterShardingSettings(system),
extractEntityId = PublisherActor.idExtractor,
extractShardId = PublisherActor.shardExtractor)
Http().bindAndHandle(
new RestService(betShard, selectionShard).routes,
"localhost",
8080)
}
}
开发者ID:irishshagua,项目名称:akka-distributed-pub-sub-testing,代码行数:35,代码来源:PubSubTest.scala
示例13: Main
//设置package包名称以及导入依赖的类
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import api.Api
import com.typesafe.config.ConfigFactory
import config.AppConfig
import services.UserProcessor
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
object Main extends App {
private final class Root extends Actor with ActorLogging {
log.info("App up and running...")
val appConfig: AppConfig = context.system.settings.config.as[AppConfig]("app")
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
ClusterSharding(context.system).start(
typeName = UserProcessor.shardName,
entityProps = Props[UserProcessor],
settings = ClusterShardingSettings(context.system),
extractEntityId = UserProcessor.idExtractor,
extractShardId = UserProcessor.shardResolver(appConfig.maxShards)
)
private val api = context.actorOf(Api.props(), name = Api.name)
context.watch(api)
override def receive = {
case Terminated(actor) =>
log.error("Terminating the system because {} terminated!", actor.path)
context.system.terminate()
}
}
override def main(args: Array[String]): Unit = {
// read config
val config = ConfigFactory.load()
implicit val system = ActorSystem("application", config)
Cluster(system).registerOnMemberUp(system.actorOf(Props(classOf[Root]), "root"))
Await.ready(system.whenTerminated, Duration.Inf)
}
}
开发者ID:MavenCode,项目名称:akkaDocker,代码行数:51,代码来源:Main.scala
示例14: BathurstMain
//设置package包名称以及导入依赖的类
package com.abstractcode.bathurst
import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.abstractcode.bathurst.Catalog.{AddItemToCatalog, SetName}
import com.typesafe.config.ConfigFactory
object BathurstMain extends App {
val port = if (args.isEmpty) "0" else args(0)
println(s"Running Bathurst on port $port")
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
.withFallback(ConfigFactory.defaultApplication())
val system = ActorSystem("bathurst", config)
val settings = ClusterShardingSettings(system)
val items: ActorRef = ClusterSharding(system).start(
typeName = "Item",
entityProps = Item.props(),
settings = settings,
extractEntityId = Item.extractEntityId,
extractShardId = Item.extractShardId)
val borrowers: ActorRef = ClusterSharding(system).start(
typeName = "Borrower",
entityProps = Borrower.props(),
settings = settings,
extractEntityId = Borrower.extractEntityId,
extractShardId = Borrower.extractShardId)
val catalogs: ActorRef = ClusterSharding(system).start(
typeName = "Catalog",
entityProps = Catalog.props(items, borrowers),
settings = settings,
extractEntityId = Catalog.extractEntityId,
extractShardId = Catalog.extractShardId)
if (port == "2552") {
Cluster(system) registerOnMemberUp {
catalogs ! SetName(1, "thing")
for (a <- 1 to 30) {
catalogs ! AddItemToCatalog(1, "whatever")
}
}
}
}
开发者ID:ColinScott,项目名称:bathurst,代码行数:55,代码来源:BathurstMain.scala
示例15: ApplicationMain
//设置package包名称以及导入依赖的类
package com.example
import akka.actor.ActorSystem
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import akka.http.scaladsl.Http
import akka.persistence.Persistence
import akka.stream.ActorMaterializer
import akka.stream.alpakka.amqp._
import akka.stream.alpakka.amqp.scaladsl.AmqpSource
import akka.stream.scaladsl.Sink
import com.example.OrderActor._
import com.typesafe.config.ConfigFactory
object ApplicationMain extends App with OrderFlow with OrderApi {
val config = ConfigFactory.load()
implicit val system = ActorSystem("order", config)
val localSystem = ActorSystem("local", config.getConfig("local"))
implicit val mat = ActorMaterializer()(localSystem)
Persistence(system)
override val orderActor = ClusterSharding(system).start(
typeName = OrderActor.shardName,
entityProps = OrderActor.props,
settings = ClusterShardingSettings(system),
extractShardId = OrderActor.extractShardId,
extractEntityId = OrderActor.extractEntityId)
val orderQueue = "order-akka.queue"
BindingDeclaration(orderQueue, "order.exchange").withRoutingKey("#")
val queueDeclaration = QueueDeclaration(orderQueue).withDurable(true).withAutoDelete(false)
val amqpSource = AmqpSource(
NamedQueueSourceSettings(AmqpConnectionDetails("192.168.99.100", 5672), orderQueue).withDeclarations(queueDeclaration),
bufferSize = 10
)
amqpSource
.log("order.queue")
.via(deliveryToCreateOrderFlow())
.to(Sink.actorRef[CreateOrder](orderActor, "done"))
.run()
Http().bindAndHandle(orderRoutes, "localhost", 8070)
}
开发者ID:mduesterhoeft,项目名称:order-akka,代码行数:48,代码来源:ApplicationMain.scala
示例16: ShardingApp
//设置package包名称以及导入依赖的类
package com.example.sharding
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
object ShardingApp extends App {
val port: Int = args(0).toInt
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("cluster-sharding.conf"))
val system = ActorSystem("ClusterSharding", config)
val extractEntityId: ShardRegion.ExtractEntityId = {
case ClientMessage(message) => (message, message)
}
val extractShardId: ShardRegion.ExtractShardId = {
case ClientMessage(message) => message
}
ClusterSharding(system).start(
typeName = "Client",
entityProps = Props[ClientActor],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
// terminate system
StdIn.readLine()
system.terminate()
}
class ReceiverActor extends Actor {
override def receive: Receive = {
case ShardingMessage(message) => {
}
}
}
class ClientActor extends Actor {
override def receive: Receive = {
case ClientMessage(message) =>
}
}
开发者ID:TechResearchID,项目名称:akka-cluster,代码行数:56,代码来源:ShardingApp.scala
示例17: AccountApp
//设置package包名称以及导入依赖的类
package sample.blog
import akka.actor.{ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.typesafe.config.ConfigFactory
object AccountApp {
def main(args: Array[String]): Unit = {
if (args.isEmpty)
startup(Seq("2551", "2552", "0"))
else
startup(args)
}
def startup(ports: Seq[String]): Unit = {
println("The ports are - " + ports )
ports foreach { port =>
//val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).withFallback(ConfigFactory.load())
val config = ConfigFactory.parseString(s"""akka {
remote {
netty.tcp {
port = ${port}
}
}
cluster {
roles = ["MyWorker","AccountEntity"]
}
}""").withFallback(ConfigFactory.load())
// Create an Akka system
val system = ActorSystem("ClusterSystem", config)
ClusterSharding(system).start(
typeName = "SupervisedAccount",
entityProps = Props[AccountSupervisor],
settings = ClusterShardingSettings(system),
extractEntityId = AccountEntity.idExtractor,
extractShardId = AccountEntity.shardResolver)
}
}
}
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:47,代码来源:AccountApp.scala
示例18: GenericAkkaRuntime
//设置package包名称以及导入依赖的类
package aecor.runtime.akkageneric
import aecor.data.{ Behavior, Correlation }
import aecor.effect.{ Async, Capture, CaptureFuture }
import aecor.runtime.akkageneric.GenericAkkaRuntime.CorrelatedCommand
import akka.actor.ActorSystem
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.pattern._
import akka.util.Timeout
import cats.~>
import scala.concurrent.Future
object GenericAkkaRuntime {
def apply[F[_]: Async: CaptureFuture: Capture](system: ActorSystem): GenericAkkaRuntime[F] =
new GenericAkkaRuntime(system)
private final case class CorrelatedCommand[A](entityId: String, command: A)
}
class GenericAkkaRuntime[F[_]: Async: CaptureFuture: Capture](system: ActorSystem) {
def start[Op[_]](typeName: String,
correlation: Correlation[Op],
behavior: Behavior[F, Op],
settings: GenericAkkaRuntimeSettings =
GenericAkkaRuntimeSettings.default(system)): F[Op ~> F] =
Capture[F]
.capture {
val numberOfShards = settings.numberOfShards
val extractEntityId: ShardRegion.ExtractEntityId = {
case CorrelatedCommand(entityId, c) =>
(entityId, GenericAkkaRuntimeActor.PerformOp(c.asInstanceOf[Op[_]]))
}
val extractShardId: ShardRegion.ExtractShardId = {
case CorrelatedCommand(entityId, _) =>
(scala.math.abs(entityId.hashCode) % numberOfShards).toString
case other => throw new IllegalArgumentException(s"Unexpected message [$other]")
}
val props = GenericAkkaRuntimeActor.props(behavior, settings.idleTimeout)
val shardRegionRef = ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = settings.clusterShardingSettings,
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
implicit val timeout = Timeout(settings.askTimeout)
new (Op ~> F) {
override def apply[A](fa: Op[A]): F[A] = CaptureFuture[F].captureFuture {
(shardRegionRef ? CorrelatedCommand(correlation(fa), fa)).asInstanceOf[Future[A]]
}
}
}
}
开发者ID:notxcain,项目名称:aecor,代码行数:59,代码来源:GenericAkkaRuntime.scala
示例19: UserService
//设置package包名称以及导入依赖的类
package nl.tradecloud.user
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import nl.tradecloud.user.repositories.UserRepository
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object UserService extends App {
private final class Root extends Actor with ActorLogging {
log.info("UserService up and running...")
val appConfig: AppConfig = context.system.settings.config.as[AppConfig]("app")
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
ClusterSharding(context.system).start(
typeName = UserRepository.shardName,
entityProps = Props[UserRepository],
settings = ClusterShardingSettings(context.system),
extractEntityId = UserRepository.idExtractor,
extractShardId = UserRepository.shardResolver(appConfig.maxShards)
)
context.watch(context.actorOf(Api.props(), name = Api.name))
override def receive = {
case Terminated(actor) =>
log.error("Terminating the system because {} terminated!", actor.path)
context.system.terminate()
}
}
val config = ConfigFactory.load()
implicit val system = ActorSystem("user-service", config)
Cluster(system).registerOnMemberUp(system.actorOf(Props(classOf[Root]), "root"))
Await.ready(system.whenTerminated, Duration.Inf)
}
开发者ID:tradecloud,项目名称:tradecloud-microservices-demo,代码行数:45,代码来源:UserService.scala
示例20: IdentityService
//设置package包名称以及导入依赖的类
package nl.tradecloud.identity
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import nl.tradecloud.identity.repositories.IdentityRepository
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object IdentityService extends App {
private final class Root extends Actor with ActorLogging {
log.info("IdentityService up and running...")
val appConfig: AppConfig = context.system.settings.config.as[AppConfig]("app")
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
ClusterSharding(context.system).start(
typeName = IdentityRepository.shardName,
entityProps = Props[IdentityRepository],
settings = ClusterShardingSettings(context.system),
extractEntityId = IdentityRepository.idExtractor,
extractShardId = IdentityRepository.shardResolver(appConfig.maxShards)
)
context.watch(context.actorOf(Api.props(), name = Api.name))
context.watch(context.actorOf(UserListener.props(), name = UserListener.name))
override def receive = {
case Terminated(actor) =>
log.error("Terminating the system because {} terminated!", actor.path)
context.system.terminate()
}
}
val config = ConfigFactory.load()
implicit val system = ActorSystem("identity-service", config)
Cluster(system).registerOnMemberUp(system.actorOf(Props(classOf[Root]), "root"))
Await.ready(system.whenTerminated, Duration.Inf)
}
开发者ID:tradecloud,项目名称:tradecloud-microservices-demo,代码行数:46,代码来源:IdentityService.scala
注:本文中的akka.cluster.sharding.ClusterSharding类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论