本文整理汇总了Scala中akka.cluster.sharding.ShardRegion类的典型用法代码示例。如果您正苦于以下问题:Scala ShardRegion类的具体用法?Scala ShardRegion怎么用?Scala ShardRegion使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ShardRegion类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: GroupProcessorRegion
//设置package包名称以及导入依赖的类
package im.actor.server.group
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
object GroupProcessorRegion {
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
case GroupEnvelope(groupId, Some(dialogEnvelope), _, _) ?
(
groupId.toString,
dialogEnvelope
)
case env @ GroupEnvelope(groupId, _, command, query) ?
(
groupId.toString,
// payload
if (query.isDefined) {
env.getField(GroupEnvelope.descriptor.findFieldByNumber(query.number))
} else {
env.getField(GroupEnvelope.descriptor.findFieldByNumber(command.number))
}
)
}
private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
case env: GroupEnvelope ? (env.groupId % 100).toString // TODO: configurable
}
private val typeName = "GroupProcessor"
private def start(props: Props)(implicit system: ActorSystem): GroupProcessorRegion =
GroupProcessorRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
def start()(implicit system: ActorSystem): GroupProcessorRegion = start(GroupProcessor.props)
def startProxy()(implicit system: ActorSystem): GroupProcessorRegion =
GroupProcessorRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
}
case class GroupProcessorRegion(ref: ActorRef)
case class GroupViewRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:54,代码来源:GroupProcessorRegion.scala
示例2: SeqUpdatesManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.sequence
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.event.Logging
import scala.util.{ Success, Try }
final case class SeqUpdatesManagerRegion(ref: ActorRef)
object SeqUpdatesManagerRegion {
import UserSequenceCommands._
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
val log = Logging(system, getClass)
{
case e @ Envelope(userId, payload) ? (userId.toString, Try(e.getField(Envelope.descriptor.findFieldByNumber(payload.number))) match {
case Success(any) ? any
case _ ?
val error = new RuntimeException(s"Payload not found for $e")
log.error(error, error.getMessage)
throw error
})
}
}
private val extractShardId: ShardRegion.ExtractShardId = {
case Envelope(userId, _) ? (userId % 10).toString // TODO: configurable
}
private val typeName = "SeqUpdatesManager"
private def start(props: Props)(implicit system: ActorSystem): SeqUpdatesManagerRegion =
SeqUpdatesManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId
))
def start()(
implicit
system: ActorSystem
): SeqUpdatesManagerRegion =
start(UserSequence.props)
def startProxy()(implicit system: ActorSystem): SeqUpdatesManagerRegion =
SeqUpdatesManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId
))
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:58,代码来源:SeqUpdatesManagerRegion.scala
示例3: MormontShardRegion
//设置package包名称以及导入依赖的类
package lib.cluster
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import lib.MormontConfig
import lib.cluster.MormontShardRegion.MormontShardRegionActorRef
import lib.models.PayLoad
abstract class MormontShardRegion[T <: PayLoad](actorSystem: ActorSystem) {
private val prefix : String = "shard-region-"
def channelName: String
def entityProps: Props
final private val extractEntityId: ShardRegion.ExtractEntityId = {
case p: PayLoad ?
(p.persistenceId, p)
}
final private val extractShardId: ShardRegion.ExtractShardId = {
case p: PayLoad ?
(math.abs(p.persistenceId.hashCode) % MormontConfig.numOfShards).toString
}
final def startShard(): MormontShardRegionActorRef = {
ClusterSharding(actorSystem).start(
typeName = prefix + channelName,
entityProps = entityProps,
settings = ClusterShardingSettings(actorSystem),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
}
}
object MormontShardRegion {
type MormontShardRegionActorRef = ActorRef
}
开发者ID:sharma-rohit,项目名称:mormont,代码行数:40,代码来源:MormontShardRegion.scala
示例4: SortingDecider
//设置package包名称以及导入依赖的类
package com.michalplachta.shoesorter
import akka.actor.{ Actor, ActorLogging, Props }
import akka.cluster.sharding.ShardRegion
import com.michalplachta.shoesorter.Messages.{ Go, WhereShouldIGo }
object SortingDecider {
def props = Props[SortingDecider]
def shardName = "sortingDecider"
val extractShardId: ShardRegion.ExtractShardId = {
case WhereShouldIGo(junction, _) ?
(junction.id % 2).toString
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case m: WhereShouldIGo ?
(m.junction.id.toString, m)
}
}
class SortingDecider extends Actor with ActorLogging {
def receive = {
case WhereShouldIGo(junction, container) ?
val decision = Decisions.whereShouldContainerGo(junction, container)
log.debug(s"Decision: $decision")
sender ! Go(decision)
}
}
开发者ID:miciek,项目名称:monitoring-akka-prometheus-kamon,代码行数:31,代码来源:SortingDecider.scala
示例5: WeakUpdatesManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.sequence
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterShardingSettings, ClusterSharding, ShardRegion }
import im.actor.server.sequence.WeakUpdatesManager.Envelope
object WeakUpdatesManagerRegion {
private val extractEntityId: ShardRegion.ExtractEntityId = {
case env @ Envelope(authId, payload) ? (authId.toString, env)
}
private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
case Envelope(authId, _) ? (authId % 32).toString // TODO: configurable
}
private val typeName = "WeakUpdatesManager"
private def startRegion(props: Props)(implicit system: ActorSystem): WeakUpdatesManagerRegion = {
WeakUpdatesManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
def startRegion()(implicit system: ActorSystem): WeakUpdatesManagerRegion = startRegion(WeakUpdatesManager.props)
def startRegionProxy()(implicit system: ActorSystem): WeakUpdatesManagerRegion =
WeakUpdatesManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
case class WeakUpdatesManagerRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:41,代码来源:WeakUpdatesManagerRegion.scala
示例6: GroupPresenceManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.presences
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.GroupPresenceManager.Envelope
object GroupPresenceManagerRegion {
private val extractEntityId: ShardRegion.ExtractEntityId = {
case env @ Envelope(userId, payload) ? (userId.toString, env)
}
private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
}
private val typeName = "GroupPresenceManager"
private def startRegion(props: Props)(implicit system: ActorSystem): GroupPresenceManagerRegion =
GroupPresenceManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
def startRegion()(implicit system: ActorSystem): GroupPresenceManagerRegion = startRegion(GroupPresenceManager.props)
def startRegionProxy()(implicit system: ActorSystem): GroupPresenceManagerRegion =
GroupPresenceManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
case class GroupPresenceManagerRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:39,代码来源:GroupPresenceManagerRegion.scala
示例7: PresenceManagerRegion
//设置package包名称以及导入依赖的类
package im.actor.server.presences
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.PresenceManager.Envelope
object PresenceManagerRegion {
private val extractEntityId: ShardRegion.ExtractEntityId = {
case env @ Envelope(userId, payload) ? (userId.toString, payload)
}
private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
}
private val typeName = "PresenceManager"
private def startRegion(props: Props)(implicit system: ActorSystem): PresenceManagerRegion =
PresenceManagerRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
def startRegion()(implicit system: ActorSystem): PresenceManagerRegion = startRegion(PresenceManager.props)
def startRegionProxy()(implicit system: ActorSystem): PresenceManagerRegion =
PresenceManagerRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId,
extractShardId = extractShardId
))
}
case class PresenceManagerRegion(val ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:40,代码来源:PresenceManagerRegion.scala
示例8: UserProcessorRegion
//设置package包名称以及导入依赖的类
package im.actor.server.user
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
object UserProcessorRegion {
private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
{
case c: UserCommand ? (c.userId.toString, c)
case q: UserQuery ? (q.userId.toString, q)
case e @ UserEnvelope(
userId,
dialogRootEnvelope,
dialogEnvelope
) ?
(
userId.toString,
dialogRootEnvelope.getOrElse(dialogEnvelope.get)
)
}
}
private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
case c: UserCommand ? (c.userId % 100).toString // TODO: configurable
case q: UserQuery ? (q.userId % 100).toString
case e: UserEnvelope ? (e.userId % 100).toString
}
val typeName = "UserProcessor"
private def start(props: Props)(implicit system: ActorSystem): UserProcessorRegion =
UserProcessorRegion(ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
def start()(implicit system: ActorSystem): UserProcessorRegion =
start(UserProcessor.props)
def startProxy()(implicit system: ActorSystem): UserProcessorRegion =
UserProcessorRegion(ClusterSharding(system).startProxy(
typeName = typeName,
role = None,
extractEntityId = extractEntityId(system),
extractShardId = extractShardId(system)
))
}
final case class UserProcessorRegion(ref: ActorRef)
开发者ID:wex5,项目名称:dangchat-server,代码行数:53,代码来源:UserProcessorRegion.scala
示例9: UserWorkerSharding
//设置package包名称以及导入依赖的类
package homeworkzen.clustering
import akka.actor.{ActorRef, ActorSystem, Props}
import homeworkzen.domain.command.actor._
import homeworkzen.domain.command.message._
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import homeworkzen.Config
object UserWorkerSharding {
def register(implicit system: ActorSystem): ActorRef =
ClusterSharding(system).start(
"UserWorker",
Props(new UserWorker),
ClusterShardingSettings(system),
idExtractor,
shardResolver)
def props(): Props = Props(new UserWorker)
val idExtractor: ShardRegion.ExtractEntityId = {
case msg: UserCommand => (msg.userId.id.toString, msg)
}
val shardResolver: ShardRegion.ExtractShardId = {
case msg: UserCommand => (math.abs(msg.userId.id.hashCode()) % Config.Cluster.shardCount).toString
}
val shardName: String = "UserSharding"
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:30,代码来源:UserWorkerSharding.scala
示例10: ShardedIdRouterProtocol
//设置package包名称以及导入依赖的类
package org.tanukkii.reactive.snowflake
import akka.actor.{Props, ReceiveTimeout}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
object ShardedIdRouterProtocol {
case object StopSending
}
class ShardedIdRouter extends IdRouter {
import ShardedIdRouterProtocol._
override def unhandled(msg: Any) = msg match {
case ReceiveTimeout =>
context.parent ! Passivate(stopMessage = StopSending)
case StopSending => context.stop(self)
case other => super.unhandled(other)
}
}
object ShardedIdRouter {
import IdRouterProtocol._
def props: Props = Props(new ShardedIdRouter)
def name: String = "IdRouter"
val shardName: String = "IdRouter"
val idExtractor: ShardRegion.ExtractEntityId = {
case [email protected](datacenterId, workerId) => (s"$datacenterId-$workerId", cmd)
}
val shardResolver: ShardRegion.ExtractShardId = {
case [email protected](datacenterId, workerId) => s"${datacenterId % 32}-${workerId % 32}"
}
}
开发者ID:TanUkkii007,项目名称:reactive-snowflake,代码行数:39,代码来源:ShardedIdRouter.scala
示例11: PublisherUpdate
//设置package包名称以及导入依赖的类
package com.mooneyserver.akkapubsub
import akka.actor.{Props, ActorLogging, Actor}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.cluster.sharding.ShardRegion
// Domain
final case class PublisherUpdate(publisher: UpdateableEvent)
object PublisherActor {
lazy val props = Props(classOf[PublisherActor])
val idExtractor: ShardRegion.ExtractEntityId = {
case s: PublisherUpdate => (s.publisher.id.toString, s.publisher)
}
val shardExtractor: ShardRegion.ExtractShardId = {
case msg: PublisherUpdate => (msg.publisher.id % 100).toString
}
}
class PublisherActor extends Actor with ActorLogging {
log.info(s"Publisher ${self.path.name} actor instance created")
val mediator = DistributedPubSub(context.system).mediator
override def receive: Receive = {
case event: UpdateableEvent => {
log.info(s"$event to be routed to all listeners")
mediator ! Publish(s"Publisher-${self.path.name}", event)
}
}
}
开发者ID:irishshagua,项目名称:akka-distributed-pub-sub-testing,代码行数:37,代码来源:PublisherActor.scala
示例12: Borrower
//设置package包名称以及导入依赖的类
package com.abstractcode.bathurst
import akka.actor.{ActorLogging, Props, ReceiveTimeout}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.{PersistentActor, RecoveryCompleted}
class Borrower extends PersistentActor with ActorLogging {
import Borrower._
override def persistenceId: String = "borrower-" + self.path.name
override def receiveRecover = {
case e: BorrowerEvent => updateState(e)
case RecoveryCompleted => }
override def receiveCommand = {
case CreateBorrower(catalogId, borrowerId, name) => persist(BorrowerCreated(catalogId, borrowerId, name))(e => {
updateState(e)
sender() ! e
})
case ReceiveTimeout ? context.parent ! Passivate(stopMessage = Stop)
case Stop ? context.stop(self)
}
def updateState(e: BorrowerEvent) = e match {
case i: BorrowerCreated =>
}
}
object Borrower {
sealed trait BorrowerCommand {
val catalogId: Int
val borrowerId: Int
}
final case class CreateBorrower(catalogId: Int, borrowerId: Int, name: String) extends BorrowerCommand
sealed trait BorrowerEvent {
val borrowerId: Int
}
final case class BorrowerCreated(catalogId: Int, borrowerId: Int, name: String) extends BorrowerEvent
val extractEntityId: ShardRegion.ExtractEntityId = {
case i: BorrowerCommand => (s"${i.catalogId}_${i.borrowerId}", i)
}
val numberOfShards = 128
val extractShardId: ShardRegion.ExtractShardId = {
case i: BorrowerCommand => math.abs(s"${i.catalogId}_${i.borrowerId}".hashCode % numberOfShards).toString
}
def props() = Props(new Borrower())
}
开发者ID:ColinScott,项目名称:bathurst,代码行数:59,代码来源:Borrower.scala
示例13: ShardingApp
//设置package包名称以及导入依赖的类
package com.example.sharding
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
object ShardingApp extends App {
val port: Int = args(0).toInt
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("cluster-sharding.conf"))
val system = ActorSystem("ClusterSharding", config)
val extractEntityId: ShardRegion.ExtractEntityId = {
case ClientMessage(message) => (message, message)
}
val extractShardId: ShardRegion.ExtractShardId = {
case ClientMessage(message) => message
}
ClusterSharding(system).start(
typeName = "Client",
entityProps = Props[ClientActor],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
// terminate system
StdIn.readLine()
system.terminate()
}
class ReceiverActor extends Actor {
override def receive: Receive = {
case ShardingMessage(message) => {
}
}
}
class ClientActor extends Actor {
override def receive: Receive = {
case ClientMessage(message) =>
}
}
开发者ID:TechResearchID,项目名称:akka-cluster,代码行数:56,代码来源:ShardingApp.scala
示例14: AuthorListing
//设置package包名称以及导入依赖的类
package sample.blog
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.ReceiveTimeout
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.PersistentActor
object AuthorListing {
def props(): Props = Props(new AuthorListing)
case class PostSummary(author: String, postId: String, title: String)
case class GetPosts(author: String)
case class Posts(list: immutable.IndexedSeq[PostSummary])
val idExtractor: ShardRegion.ExtractEntityId = {
case s: PostSummary => (s.author, s)
case m: GetPosts => (m.author, m)
}
val shardResolver: ShardRegion.ExtractShardId = msg => msg match {
case s: PostSummary => (math.abs(s.author.hashCode) % 100).toString
case GetPosts(author) => (math.abs(author.hashCode) % 100).toString
}
val shardName: String = "AuthorListing"
}
class AuthorListing extends PersistentActor with ActorLogging {
import AuthorListing._
override def persistenceId: String = self.path.parent.name + "-" + self.path.name
// passivate the entity when no activity
context.setReceiveTimeout(2.minutes)
var posts = Vector.empty[PostSummary]
def receiveCommand = {
case s: PostSummary =>
persist(s) { evt =>
posts :+= evt
log.info("Post added to {}'s list: {}", s.author, s.title)
}
case GetPosts(_) =>
sender() ! Posts(posts)
case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)
}
override def receiveRecover: Receive = {
case evt: PostSummary => posts :+= evt
}
}
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:61,代码来源:AuthorListing.scala
示例15: DistributedProcessingSupervisor
//设置package包名称以及导入依赖的类
package aecor.distributedprocessing
import aecor.distributedprocessing.DistributedProcessingSupervisor.{
GracefulShutdown,
ShutdownCompleted,
Tick
}
import aecor.distributedprocessing.DistributedProcessingWorker.KeepRunning
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.cluster.sharding.ShardRegion
import scala.concurrent.duration.{ FiniteDuration, _ }
object DistributedProcessingSupervisor {
private final case object Tick
final case object GracefulShutdown
final case object ShutdownCompleted
def props(processCount: Int, shardRegion: ActorRef, heartbeatInterval: FiniteDuration): Props =
Props(new DistributedProcessingSupervisor(processCount, shardRegion, heartbeatInterval))
}
final class DistributedProcessingSupervisor(processCount: Int,
shardRegion: ActorRef,
heartbeatInterval: FiniteDuration)
extends Actor
with ActorLogging {
import context.dispatcher
private val heartbeat =
context.system.scheduler.schedule(0.seconds, heartbeatInterval, self, Tick)
context.watch(shardRegion)
override def postStop(): Unit = {
heartbeat.cancel()
()
}
override def receive: Receive = {
case Tick =>
(0 until processCount).foreach { processId =>
shardRegion ! KeepRunning(processId)
}
case Terminated(`shardRegion`) =>
context.stop(self)
case GracefulShutdown =>
log.info(s"Performing graceful shutdown of [$shardRegion]")
shardRegion ! ShardRegion.GracefulShutdown
val replyTo = sender()
context.become {
case Terminated(`shardRegion`) =>
log.info(s"Graceful shutdown completed for [$shardRegion]")
context.stop(self)
replyTo ! ShutdownCompleted
}
}
}
开发者ID:notxcain,项目名称:aecor,代码行数:61,代码来源:DistributedProcessingSupervisor.scala
示例16: GenericAkkaRuntime
//设置package包名称以及导入依赖的类
package aecor.runtime.akkageneric
import aecor.data.{ Behavior, Correlation }
import aecor.effect.{ Async, Capture, CaptureFuture }
import aecor.runtime.akkageneric.GenericAkkaRuntime.CorrelatedCommand
import akka.actor.ActorSystem
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.pattern._
import akka.util.Timeout
import cats.~>
import scala.concurrent.Future
object GenericAkkaRuntime {
def apply[F[_]: Async: CaptureFuture: Capture](system: ActorSystem): GenericAkkaRuntime[F] =
new GenericAkkaRuntime(system)
private final case class CorrelatedCommand[A](entityId: String, command: A)
}
class GenericAkkaRuntime[F[_]: Async: CaptureFuture: Capture](system: ActorSystem) {
def start[Op[_]](typeName: String,
correlation: Correlation[Op],
behavior: Behavior[F, Op],
settings: GenericAkkaRuntimeSettings =
GenericAkkaRuntimeSettings.default(system)): F[Op ~> F] =
Capture[F]
.capture {
val numberOfShards = settings.numberOfShards
val extractEntityId: ShardRegion.ExtractEntityId = {
case CorrelatedCommand(entityId, c) =>
(entityId, GenericAkkaRuntimeActor.PerformOp(c.asInstanceOf[Op[_]]))
}
val extractShardId: ShardRegion.ExtractShardId = {
case CorrelatedCommand(entityId, _) =>
(scala.math.abs(entityId.hashCode) % numberOfShards).toString
case other => throw new IllegalArgumentException(s"Unexpected message [$other]")
}
val props = GenericAkkaRuntimeActor.props(behavior, settings.idleTimeout)
val shardRegionRef = ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = settings.clusterShardingSettings,
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
implicit val timeout = Timeout(settings.askTimeout)
new (Op ~> F) {
override def apply[A](fa: Op[A]): F[A] = CaptureFuture[F].captureFuture {
(shardRegionRef ? CorrelatedCommand(correlation(fa), fa)).asInstanceOf[Future[A]]
}
}
}
}
开发者ID:notxcain,项目名称:aecor,代码行数:59,代码来源:GenericAkkaRuntime.scala
示例17: GracefulLeave
//设置package包名称以及导入依赖的类
package com.lightbend.lagom.internal.persistence.cluster
import akka.Done
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ShardRegion
private[lagom] object GracefulLeave {
def props(entityTypeNames: Set[String]): Props =
Props(new GracefulLeave(entityTypeNames))
case object Leave
private case object Removed
}
private[lagom] class GracefulLeave(entityTypeNames: Set[String])
extends Actor {
import GracefulLeave._
import context.dispatcher
val system = context.system
val cluster = Cluster(system)
def receive = {
case Leave ?
if (entityTypeNames.isEmpty) {
cluster.leave(cluster.selfAddress)
context.become(leavingInProgress(sender()))
} else {
entityTypeNames.foreach { name =>
val region = ClusterSharding(system).shardRegion(name)
context.watch(region)
region ! ShardRegion.GracefulShutdown
}
context.become(shardingInProgress(sender(), entityTypeNames.size))
}
}
def shardingInProgress(replyTo: ActorRef, count: Int): Receive = {
case Terminated(_) ?
if (count == 1) {
cluster.registerOnMemberRemoved(self ! Removed)
cluster.leave(cluster.selfAddress)
context.become(leavingInProgress(replyTo))
} else
context.become(shardingInProgress(replyTo, count - 1))
}
def leavingInProgress(replyTo: ActorRef): Receive = {
case Removed =>
replyTo ! Done
context.stop(self)
}
}
开发者ID:lagom,项目名称:lagom,代码行数:58,代码来源:GracefulLeave.scala
示例18: Sharding
//设置package包名称以及导入依赖的类
package com.simpleorder.sharding
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
object Sharding {
import ShardingSupport._
def apply(props: Props, typeName: String, numberOfShards: Int, system: ActorSystem): ActorRef =
ClusterSharding(system).start(
typeName = typeName,
entityProps = props,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId(numberOfShards))
}
object ShardingSupport {
val extractEntityId: ShardRegion.ExtractEntityId = {
case ShardingEnvelope(id, value) ? (id, value)
}
def extractShardId(numberOfShards: Int): ShardRegion.ExtractShardId = {
case ShardingEnvelope(id, _) ? (id.toInt % numberOfShards).toString
}
}
sealed case class ShardingEnvelope(id: String, value: Any)
开发者ID:simpleorder,项目名称:akka-cassandra,代码行数:32,代码来源:Sharding.scala
示例19: ShardedShopper
//设置package包名称以及导入依赖的类
package aia.persistence.sharded
import aia.persistence._
import akka.actor._
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
object ShardedShopper {
def props = Props(new ShardedShopper)
def name(shopperId: Long) = shopperId.toString
case object StopShopping
val shardName: String = "shoppers"
val extractEntityId: ShardRegion.ExtractEntityId = {
case cmd: Shopper.Command => (cmd.shopperId.toString, cmd)
}
val extractShardId: ShardRegion.ExtractShardId = {
case cmd: Shopper.Command => (cmd.shopperId % 12).toString
}
}
class ShardedShopper extends Shopper {
import ShardedShopper._
context.setReceiveTimeout(Settings(context.system).passivateTimeout)
override def unhandled(msg: Any) = msg match {
case ReceiveTimeout =>
context.parent ! Passivate(stopMessage = ShardedShopper.StopShopping)
case StopShopping => context.stop(self)
}
}
开发者ID:gilbutITbook,项目名称:006877,代码行数:37,代码来源:ShardedShopper.scala
示例20: ClusterApi
//设置package包名称以及导入依赖的类
package linguistic.api
import akka.actor.{ActorSystem, ActorRef}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ClusterShardingStats, CurrentRegions, CurrentShardRegionState}
import akka.http.scaladsl.model.HttpResponse
import akka.pattern._
import linguistic.utils.ShutdownCoordinator
import linguistic.{HttpServer, ps}
import ShutdownCoordinator.NodeShutdownOpts
import linguistic.ps.{WordShardEntity, HomophonesSubTreeShardEntity}
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
class ClusterApi(http: ActorRef, searchMaster: ActorRef, regions: Set[ActorRef])
(implicit ex: ExecutionContext, sys: ActorSystem) extends BaseApi {
implicit val timeout = akka.util.Timeout(10 seconds)
val route = pathPrefix("cluster") {
(get & path(Segment / "regions")) { seq =>
complete {
(searchMaster ?(seq, ShardRegion.GetCurrentRegions)).mapTo[CurrentRegions].map { r =>
HttpResponse(entity = r.regions.mkString(","))
}
}
} ~ (get & path(Segment / "shards")) { seq =>
complete {
(searchMaster ? (seq, ShardRegion.GetShardRegionState)).mapTo[CurrentShardRegionState].map { r =>
HttpResponse(entity = r.shards.mkString(","))
}
}
} ~ (get & path(Segment / "shards2")) { seq =>
complete {
(searchMaster ? (seq, ShardRegion.GetClusterShardingStats(5 seconds))).mapTo[ClusterShardingStats].map { r =>
HttpResponse(entity = r.regions.mkString(","))
}
}
}
}
}
开发者ID:haghard,项目名称:linguistic,代码行数:42,代码来源:ClusterApi.scala
注:本文中的akka.cluster.sharding.ShardRegion类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论