• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ShardRegion类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.cluster.sharding.ShardRegion的典型用法代码示例。如果您正苦于以下问题:Scala ShardRegion类的具体用法?Scala ShardRegion怎么用?Scala ShardRegion使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ShardRegion类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: GroupProcessorRegion

//设置package包名称以及导入依赖的类
package im.actor.server.group

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }

object GroupProcessorRegion {
  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    case GroupEnvelope(groupId, Some(dialogEnvelope), _, _) ?
      (
        groupId.toString,
        dialogEnvelope
      )
    case env @ GroupEnvelope(groupId, _, command, query) ?
      (
        groupId.toString,
        // payload
        if (query.isDefined) {
          env.getField(GroupEnvelope.descriptor.findFieldByNumber(query.number))
        } else {
          env.getField(GroupEnvelope.descriptor.findFieldByNumber(command.number))
        }
      )
  }

  private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
    case env: GroupEnvelope ? (env.groupId % 100).toString // TODO: configurable
  }

  private val typeName = "GroupProcessor"

  private def start(props: Props)(implicit system: ActorSystem): GroupProcessorRegion =
    GroupProcessorRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))

  def start()(implicit system: ActorSystem): GroupProcessorRegion = start(GroupProcessor.props)

  def startProxy()(implicit system: ActorSystem): GroupProcessorRegion =
    GroupProcessorRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))
}

case class GroupProcessorRegion(ref: ActorRef)

case class GroupViewRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:54,代码来源:GroupProcessorRegion.scala


示例2: SeqUpdatesManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.sequence

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.event.Logging

import scala.util.{ Success, Try }

final case class SeqUpdatesManagerRegion(ref: ActorRef)

object SeqUpdatesManagerRegion {

  import UserSequenceCommands._

  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    val log = Logging(system, getClass)

    {
      case e @ Envelope(userId, payload) ? (userId.toString, Try(e.getField(Envelope.descriptor.findFieldByNumber(payload.number))) match {
        case Success(any) ? any
        case _ ?
          val error = new RuntimeException(s"Payload not found for $e")
          log.error(error, error.getMessage)
          throw error
      })
    }
  }

  private val extractShardId: ShardRegion.ExtractShardId = {
    case Envelope(userId, _) ? (userId % 10).toString // TODO: configurable
  }

  private val typeName = "SeqUpdatesManager"

  private def start(props: Props)(implicit system: ActorSystem): SeqUpdatesManagerRegion =
    SeqUpdatesManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId
    ))

  def start()(
    implicit
    system: ActorSystem
  ): SeqUpdatesManagerRegion =
    start(UserSequence.props)

  def startProxy()(implicit system: ActorSystem): SeqUpdatesManagerRegion =
    SeqUpdatesManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId
    ))
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:58,代码来源:SeqUpdatesManagerRegion.scala


示例3: MormontShardRegion

//设置package包名称以及导入依赖的类
package lib.cluster

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import lib.MormontConfig
import lib.cluster.MormontShardRegion.MormontShardRegionActorRef
import lib.models.PayLoad


abstract class MormontShardRegion[T <: PayLoad](actorSystem: ActorSystem) {
  private val prefix : String = "shard-region-"
  def channelName: String

  def entityProps: Props

  final private val extractEntityId: ShardRegion.ExtractEntityId = {
    case p: PayLoad ?
      (p.persistenceId, p)
  }

  final private val extractShardId: ShardRegion.ExtractShardId = {
    case p: PayLoad ?
      (math.abs(p.persistenceId.hashCode) % MormontConfig.numOfShards).toString
  }

  final def startShard(): MormontShardRegionActorRef = {
   ClusterSharding(actorSystem).start(
      typeName = prefix + channelName,
      entityProps = entityProps,
      settings = ClusterShardingSettings(actorSystem),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    )
  }
}

object MormontShardRegion {
  type MormontShardRegionActorRef = ActorRef
} 
开发者ID:sharma-rohit,项目名称:mormont,代码行数:40,代码来源:MormontShardRegion.scala


示例4: SortingDecider

//设置package包名称以及导入依赖的类
package com.michalplachta.shoesorter

import akka.actor.{ Actor, ActorLogging, Props }
import akka.cluster.sharding.ShardRegion
import com.michalplachta.shoesorter.Messages.{ Go, WhereShouldIGo }

object SortingDecider {
  def props = Props[SortingDecider]

  def shardName = "sortingDecider"

  val extractShardId: ShardRegion.ExtractShardId = {
    case WhereShouldIGo(junction, _) ?
      (junction.id % 2).toString
  }

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case m: WhereShouldIGo ?
      (m.junction.id.toString, m)
  }
}

class SortingDecider extends Actor with ActorLogging {
  def receive = {
    case WhereShouldIGo(junction, container) ?
      val decision = Decisions.whereShouldContainerGo(junction, container)
      log.debug(s"Decision: $decision")
      sender ! Go(decision)
  }
} 
开发者ID:miciek,项目名称:monitoring-akka-prometheus-kamon,代码行数:31,代码来源:SortingDecider.scala


示例5: WeakUpdatesManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.sequence

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterShardingSettings, ClusterSharding, ShardRegion }
import im.actor.server.sequence.WeakUpdatesManager.Envelope

object WeakUpdatesManagerRegion {
  private val extractEntityId: ShardRegion.ExtractEntityId = {
    case env @ Envelope(authId, payload) ? (authId.toString, env)
  }

  private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
    case Envelope(authId, _) ? (authId % 32).toString // TODO: configurable
  }

  private val typeName = "WeakUpdatesManager"

  private def startRegion(props: Props)(implicit system: ActorSystem): WeakUpdatesManagerRegion = {
    WeakUpdatesManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))
  }

  def startRegion()(implicit system: ActorSystem): WeakUpdatesManagerRegion = startRegion(WeakUpdatesManager.props)

  def startRegionProxy()(implicit system: ActorSystem): WeakUpdatesManagerRegion =
    WeakUpdatesManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

}

case class WeakUpdatesManagerRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:41,代码来源:WeakUpdatesManagerRegion.scala


示例6: GroupPresenceManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.presences

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.GroupPresenceManager.Envelope

object GroupPresenceManagerRegion {
  private val extractEntityId: ShardRegion.ExtractEntityId = {
    case env @ Envelope(userId, payload) ? (userId.toString, env)
  }

  private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
    case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
  }

  private val typeName = "GroupPresenceManager"

  private def startRegion(props: Props)(implicit system: ActorSystem): GroupPresenceManagerRegion =
    GroupPresenceManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

  def startRegion()(implicit system: ActorSystem): GroupPresenceManagerRegion = startRegion(GroupPresenceManager.props)

  def startRegionProxy()(implicit system: ActorSystem): GroupPresenceManagerRegion =
    GroupPresenceManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))
}

case class GroupPresenceManagerRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:39,代码来源:GroupPresenceManagerRegion.scala


示例7: PresenceManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.presences

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.PresenceManager.Envelope

object PresenceManagerRegion {
  private val extractEntityId: ShardRegion.ExtractEntityId = {
    case env @ Envelope(userId, payload) ? (userId.toString, payload)
  }

  private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
    case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
  }

  private val typeName = "PresenceManager"

  private def startRegion(props: Props)(implicit system: ActorSystem): PresenceManagerRegion =
    PresenceManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

  def startRegion()(implicit system: ActorSystem): PresenceManagerRegion = startRegion(PresenceManager.props)

  def startRegionProxy()(implicit system: ActorSystem): PresenceManagerRegion =
    PresenceManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

}

case class PresenceManagerRegion(val ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:40,代码来源:PresenceManagerRegion.scala


示例8: UserProcessorRegion

//设置package包名称以及导入依赖的类
package im.actor.server.user

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }

object UserProcessorRegion {
  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    {
      case c: UserCommand ? (c.userId.toString, c)
      case q: UserQuery   ? (q.userId.toString, q)
      case e @ UserEnvelope(
        userId,
        dialogRootEnvelope,
        dialogEnvelope
        ) ?
        (
          userId.toString,
          dialogRootEnvelope.getOrElse(dialogEnvelope.get)
        )
    }
  }

  private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
    case c: UserCommand  ? (c.userId % 100).toString // TODO: configurable
    case q: UserQuery    ? (q.userId % 100).toString
    case e: UserEnvelope ? (e.userId % 100).toString
  }

  val typeName = "UserProcessor"

  private def start(props: Props)(implicit system: ActorSystem): UserProcessorRegion =
    UserProcessorRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))

  def start()(implicit system: ActorSystem): UserProcessorRegion =
    start(UserProcessor.props)

  def startProxy()(implicit system: ActorSystem): UserProcessorRegion =
    UserProcessorRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))
}

final case class UserProcessorRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:53,代码来源:UserProcessorRegion.scala


示例9: UserWorkerSharding

//设置package包名称以及导入依赖的类
package homeworkzen.clustering

import akka.actor.{ActorRef, ActorSystem, Props}
import homeworkzen.domain.command.actor._
import homeworkzen.domain.command.message._
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import homeworkzen.Config

object UserWorkerSharding {
  def register(implicit system: ActorSystem): ActorRef =
    ClusterSharding(system).start(
      "UserWorker",
      Props(new UserWorker),
      ClusterShardingSettings(system),
      idExtractor,
      shardResolver)

  def props(): Props = Props(new UserWorker)

  val idExtractor: ShardRegion.ExtractEntityId = {
    case msg: UserCommand => (msg.userId.id.toString, msg)
  }

  val shardResolver: ShardRegion.ExtractShardId = {
    case msg: UserCommand => (math.abs(msg.userId.id.hashCode()) % Config.Cluster.shardCount).toString
  }

  val shardName: String = "UserSharding"
} 
开发者ID:anopse,项目名称:HomeworkZen,代码行数:30,代码来源:UserWorkerSharding.scala


示例10: ShardedIdRouterProtocol

//设置package包名称以及导入依赖的类
package org.tanukkii.reactive.snowflake

import akka.actor.{Props, ReceiveTimeout}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate

object ShardedIdRouterProtocol {
  case object StopSending
}

class ShardedIdRouter extends IdRouter {
  import ShardedIdRouterProtocol._

  override def unhandled(msg: Any) = msg match {
    case ReceiveTimeout =>
      context.parent ! Passivate(stopMessage = StopSending)
    case StopSending => context.stop(self)
    case other => super.unhandled(other)
  }
}

object ShardedIdRouter {
  import IdRouterProtocol._

  def props: Props = Props(new ShardedIdRouter)

  def name: String = "IdRouter"

  val shardName: String = "IdRouter"

  val idExtractor: ShardRegion.ExtractEntityId = {
    case [email protected](datacenterId, workerId) => (s"$datacenterId-$workerId", cmd)
  }

  val shardResolver: ShardRegion.ExtractShardId = {
    case [email protected](datacenterId, workerId) => s"${datacenterId % 32}-${workerId % 32}"
  }
} 
开发者ID:TanUkkii007,项目名称:reactive-snowflake,代码行数:39,代码来源:ShardedIdRouter.scala


示例11: PublisherUpdate

//设置package包名称以及导入依赖的类
package com.mooneyserver.akkapubsub

import akka.actor.{Props, ActorLogging, Actor}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.cluster.sharding.ShardRegion

// Domain
final case class PublisherUpdate(publisher: UpdateableEvent)

object PublisherActor {

  lazy val props = Props(classOf[PublisherActor])

  val idExtractor: ShardRegion.ExtractEntityId = {
    case s: PublisherUpdate => (s.publisher.id.toString, s.publisher)
  }

  val shardExtractor: ShardRegion.ExtractShardId = {
    case msg: PublisherUpdate => (msg.publisher.id % 100).toString
  }
}

class PublisherActor extends Actor with ActorLogging {

  log.info(s"Publisher ${self.path.name} actor instance created")

  val mediator = DistributedPubSub(context.system).mediator

  override def receive: Receive = {
    case event: UpdateableEvent => {
      log.info(s"$event to be routed to all listeners")
      mediator ! Publish(s"Publisher-${self.path.name}", event)
    }
  }
} 
开发者ID:irishshagua,项目名称:akka-distributed-pub-sub-testing,代码行数:37,代码来源:PublisherActor.scala


示例12: Borrower

//设置package包名称以及导入依赖的类
package com.abstractcode.bathurst

import akka.actor.{ActorLogging, Props, ReceiveTimeout}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.{PersistentActor, RecoveryCompleted}


class Borrower extends PersistentActor with ActorLogging {

  import Borrower._

  override def persistenceId: String = "borrower-" + self.path.name

  override def receiveRecover = {
    case e: BorrowerEvent => updateState(e)
    case RecoveryCompleted =>  }

  override def receiveCommand = {
    case CreateBorrower(catalogId, borrowerId, name) => persist(BorrowerCreated(catalogId, borrowerId, name))(e => {
      updateState(e)
      sender() ! e
    })
    case ReceiveTimeout ? context.parent ! Passivate(stopMessage = Stop)
    case Stop ? context.stop(self)
  }

  def updateState(e: BorrowerEvent) = e match {
    case i: BorrowerCreated =>
  }
}

object Borrower {
  sealed trait BorrowerCommand {
    val catalogId: Int
    val borrowerId: Int
  }

  final case class CreateBorrower(catalogId: Int, borrowerId: Int, name: String) extends BorrowerCommand

  sealed trait BorrowerEvent {
    val borrowerId: Int
  }

  final case class BorrowerCreated(catalogId: Int, borrowerId: Int, name: String) extends BorrowerEvent

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case i: BorrowerCommand => (s"${i.catalogId}_${i.borrowerId}", i)
  }

  val numberOfShards = 128

  val extractShardId: ShardRegion.ExtractShardId = {
    case i: BorrowerCommand => math.abs(s"${i.catalogId}_${i.borrowerId}".hashCode % numberOfShards).toString
  }

  def props() = Props(new Borrower())
} 
开发者ID:ColinScott,项目名称:bathurst,代码行数:59,代码来源:Borrower.scala


示例13: ShardingApp

//设置package包名称以及导入依赖的类
package com.example.sharding

import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn


object ShardingApp extends App {

  val port: Int = args(0).toInt

  val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load("cluster-sharding.conf"))

  val system = ActorSystem("ClusterSharding", config)

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case ClientMessage(message) => (message, message)
  }

  val extractShardId: ShardRegion.ExtractShardId = {
    case ClientMessage(message) => message
  }

  ClusterSharding(system).start(
    typeName = "Client",
    entityProps = Props[ClientActor],
    settings = ClusterShardingSettings(system),
    extractEntityId = extractEntityId,
    extractShardId = extractShardId
  )

  // terminate system
  StdIn.readLine()
  system.terminate()

}

class ReceiverActor extends Actor {
  override def receive: Receive = {
    case ShardingMessage(message) => {

    }
  }
}

class ClientActor extends Actor {
  override def receive: Receive = {
    case ClientMessage(message) =>

  }
} 
开发者ID:TechResearchID,项目名称:akka-cluster,代码行数:56,代码来源:ShardingApp.scala


示例14: AuthorListing

//设置package包名称以及导入依赖的类
package sample.blog

import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.ReceiveTimeout
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.PersistentActor

object AuthorListing {

  def props(): Props = Props(new AuthorListing)

  case class PostSummary(author: String, postId: String, title: String)
  case class GetPosts(author: String)
  case class Posts(list: immutable.IndexedSeq[PostSummary])

  val idExtractor: ShardRegion.ExtractEntityId = {
    case s: PostSummary => (s.author, s)
    case m: GetPosts    => (m.author, m)
  }

  val shardResolver: ShardRegion.ExtractShardId = msg => msg match {
    case s: PostSummary   => (math.abs(s.author.hashCode) % 100).toString
    case GetPosts(author) => (math.abs(author.hashCode) % 100).toString
  }

  val shardName: String = "AuthorListing"
}

class AuthorListing extends PersistentActor with ActorLogging {
  import AuthorListing._

  override def persistenceId: String = self.path.parent.name + "-" + self.path.name

  // passivate the entity when no activity
  context.setReceiveTimeout(2.minutes)

  var posts = Vector.empty[PostSummary]

  def receiveCommand = {
    case s: PostSummary =>
      persist(s) { evt =>
        posts :+= evt
        log.info("Post added to {}'s list: {}", s.author, s.title)
      }
    case GetPosts(_) =>
      sender() ! Posts(posts)
    case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)
  }

  override def receiveRecover: Receive = {
    case evt: PostSummary => posts :+= evt

  }

} 
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:61,代码来源:AuthorListing.scala


示例15: DistributedProcessingSupervisor

//设置package包名称以及导入依赖的类
package aecor.distributedprocessing

import aecor.distributedprocessing.DistributedProcessingSupervisor.{
  GracefulShutdown,
  ShutdownCompleted,
  Tick
}
import aecor.distributedprocessing.DistributedProcessingWorker.KeepRunning
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.cluster.sharding.ShardRegion

import scala.concurrent.duration.{ FiniteDuration, _ }

object DistributedProcessingSupervisor {
  private final case object Tick
  final case object GracefulShutdown
  final case object ShutdownCompleted

  def props(processCount: Int, shardRegion: ActorRef, heartbeatInterval: FiniteDuration): Props =
    Props(new DistributedProcessingSupervisor(processCount, shardRegion, heartbeatInterval))
}

final class DistributedProcessingSupervisor(processCount: Int,
                                            shardRegion: ActorRef,
                                            heartbeatInterval: FiniteDuration)
    extends Actor
    with ActorLogging {

  import context.dispatcher

  private val heartbeat =
    context.system.scheduler.schedule(0.seconds, heartbeatInterval, self, Tick)

  context.watch(shardRegion)

  override def postStop(): Unit = {
    heartbeat.cancel()
    ()
  }

  override def receive: Receive = {
    case Tick =>
      (0 until processCount).foreach { processId =>
        shardRegion ! KeepRunning(processId)
      }
    case Terminated(`shardRegion`) =>
      context.stop(self)
    case GracefulShutdown =>
      log.info(s"Performing graceful shutdown of [$shardRegion]")
      shardRegion ! ShardRegion.GracefulShutdown
      val replyTo = sender()
      context.become {
        case Terminated(`shardRegion`) =>
          log.info(s"Graceful shutdown completed for [$shardRegion]")
          context.stop(self)
          replyTo ! ShutdownCompleted
      }

  }
} 
开发者ID:notxcain,项目名称:aecor,代码行数:61,代码来源:DistributedProcessingSupervisor.scala


示例16: GenericAkkaRuntime

//设置package包名称以及导入依赖的类
package aecor.runtime.akkageneric

import aecor.data.{ Behavior, Correlation }
import aecor.effect.{ Async, Capture, CaptureFuture }
import aecor.runtime.akkageneric.GenericAkkaRuntime.CorrelatedCommand
import akka.actor.ActorSystem
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.pattern._
import akka.util.Timeout
import cats.~>

import scala.concurrent.Future

object GenericAkkaRuntime {
  def apply[F[_]: Async: CaptureFuture: Capture](system: ActorSystem): GenericAkkaRuntime[F] =
    new GenericAkkaRuntime(system)
  private final case class CorrelatedCommand[A](entityId: String, command: A)
}

class GenericAkkaRuntime[F[_]: Async: CaptureFuture: Capture](system: ActorSystem) {
  def start[Op[_]](typeName: String,
                   correlation: Correlation[Op],
                   behavior: Behavior[F, Op],
                   settings: GenericAkkaRuntimeSettings =
                     GenericAkkaRuntimeSettings.default(system)): F[Op ~> F] =
    Capture[F]
      .capture {
        val numberOfShards = settings.numberOfShards

        val extractEntityId: ShardRegion.ExtractEntityId = {
          case CorrelatedCommand(entityId, c) =>
            (entityId, GenericAkkaRuntimeActor.PerformOp(c.asInstanceOf[Op[_]]))
        }

        val extractShardId: ShardRegion.ExtractShardId = {
          case CorrelatedCommand(entityId, _) =>
            (scala.math.abs(entityId.hashCode) % numberOfShards).toString
          case other => throw new IllegalArgumentException(s"Unexpected message [$other]")
        }

        val props = GenericAkkaRuntimeActor.props(behavior, settings.idleTimeout)

        val shardRegionRef = ClusterSharding(system).start(
          typeName = typeName,
          entityProps = props,
          settings = settings.clusterShardingSettings,
          extractEntityId = extractEntityId,
          extractShardId = extractShardId
        )

        implicit val timeout = Timeout(settings.askTimeout)
        new (Op ~> F) {
          override def apply[A](fa: Op[A]): F[A] = CaptureFuture[F].captureFuture {
            (shardRegionRef ? CorrelatedCommand(correlation(fa), fa)).asInstanceOf[Future[A]]
          }
        }
      }
} 
开发者ID:notxcain,项目名称:aecor,代码行数:59,代码来源:GenericAkkaRuntime.scala


示例17: GracefulLeave

//设置package包名称以及导入依赖的类
package com.lightbend.lagom.internal.persistence.cluster

import akka.Done
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ShardRegion

private[lagom] object GracefulLeave {
  def props(entityTypeNames: Set[String]): Props =
    Props(new GracefulLeave(entityTypeNames))

  case object Leave
  private case object Removed
}

private[lagom] class GracefulLeave(entityTypeNames: Set[String])
  extends Actor {
  import GracefulLeave._
  import context.dispatcher
  val system = context.system
  val cluster = Cluster(system)

  def receive = {
    case Leave ?
      if (entityTypeNames.isEmpty) {
        cluster.leave(cluster.selfAddress)
        context.become(leavingInProgress(sender()))
      } else {
        entityTypeNames.foreach { name =>
          val region = ClusterSharding(system).shardRegion(name)
          context.watch(region)
          region ! ShardRegion.GracefulShutdown
        }
        context.become(shardingInProgress(sender(), entityTypeNames.size))
      }
  }

  def shardingInProgress(replyTo: ActorRef, count: Int): Receive = {
    case Terminated(_) ?
      if (count == 1) {
        cluster.registerOnMemberRemoved(self ! Removed)
        cluster.leave(cluster.selfAddress)
        context.become(leavingInProgress(replyTo))
      } else
        context.become(shardingInProgress(replyTo, count - 1))
  }

  def leavingInProgress(replyTo: ActorRef): Receive = {
    case Removed =>
      replyTo ! Done
      context.stop(self)
  }
} 
开发者ID:lagom,项目名称:lagom,代码行数:58,代码来源:GracefulLeave.scala


示例18: Sharding

//设置package包名称以及导入依赖的类
package com.simpleorder.sharding

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}

object Sharding {

  import ShardingSupport._

  def apply(props: Props, typeName: String, numberOfShards: Int, system: ActorSystem): ActorRef =
    ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId(numberOfShards))
}

object ShardingSupport {

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case ShardingEnvelope(id, value) ? (id, value)
  }

  def extractShardId(numberOfShards: Int): ShardRegion.ExtractShardId = {
    case ShardingEnvelope(id, _) ? (id.toInt % numberOfShards).toString
  }

}

sealed case class ShardingEnvelope(id: String, value: Any) 
开发者ID:simpleorder,项目名称:akka-cassandra,代码行数:32,代码来源:Sharding.scala


示例19: ShardedShopper

//设置package包名称以及导入依赖的类
package aia.persistence.sharded

import aia.persistence._
import akka.actor._
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate

object ShardedShopper {
  def props = Props(new ShardedShopper)
  def name(shopperId: Long) = shopperId.toString


  case object StopShopping

  val shardName: String = "shoppers"

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case cmd: Shopper.Command => (cmd.shopperId.toString, cmd)
  }

  val extractShardId: ShardRegion.ExtractShardId = {
    case cmd: Shopper.Command => (cmd.shopperId % 12).toString
  }
}

class ShardedShopper extends Shopper {
  import ShardedShopper._

  context.setReceiveTimeout(Settings(context.system).passivateTimeout)

  override def unhandled(msg: Any) = msg match {
    case ReceiveTimeout =>
      context.parent ! Passivate(stopMessage = ShardedShopper.StopShopping)
    case StopShopping => context.stop(self)
  }
} 
开发者ID:gilbutITbook,项目名称:006877,代码行数:37,代码来源:ShardedShopper.scala


示例20: ClusterApi

//设置package包名称以及导入依赖的类
package linguistic.api

import akka.actor.{ActorSystem, ActorRef}
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ClusterShardingStats, CurrentRegions, CurrentShardRegionState}
import akka.http.scaladsl.model.HttpResponse
import akka.pattern._
import linguistic.utils.ShutdownCoordinator
import linguistic.{HttpServer, ps}
import ShutdownCoordinator.NodeShutdownOpts
import linguistic.ps.{WordShardEntity, HomophonesSubTreeShardEntity}

import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._

class ClusterApi(http: ActorRef, searchMaster: ActorRef, regions: Set[ActorRef])
  (implicit ex: ExecutionContext, sys: ActorSystem) extends BaseApi {
  implicit val timeout = akka.util.Timeout(10 seconds)

  val route = pathPrefix("cluster") {
    (get & path(Segment / "regions")) { seq =>
      complete {
        (searchMaster ?(seq, ShardRegion.GetCurrentRegions)).mapTo[CurrentRegions].map { r =>
          HttpResponse(entity = r.regions.mkString(","))
        }
      }
    } ~ (get & path(Segment / "shards")) { seq =>
      complete {
        (searchMaster ? (seq, ShardRegion.GetShardRegionState)).mapTo[CurrentShardRegionState].map { r =>
          HttpResponse(entity = r.shards.mkString(","))
        }
      }
    } ~ (get & path(Segment / "shards2")) { seq =>
      complete {
        (searchMaster ? (seq, ShardRegion.GetClusterShardingStats(5 seconds))).mapTo[ClusterShardingStats].map { r =>
          HttpResponse(entity = r.regions.mkString(","))
        }
      }
    }
  }
} 
开发者ID:haghard,项目名称:linguistic,代码行数:42,代码来源:ClusterApi.scala



注:本文中的akka.cluster.sharding.ShardRegion类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Observable类代码示例发布时间:2022-05-23
下一篇:
Scala PasswordHasher类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap