• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ClusterShardingSettings类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.cluster.sharding.ClusterShardingSettings的典型用法代码示例。如果您正苦于以下问题:Scala ClusterShardingSettings类的具体用法?Scala ClusterShardingSettings怎么用?Scala ClusterShardingSettings使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ClusterShardingSettings类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: GroupProcessorRegion

//设置package包名称以及导入依赖的类
package im.actor.server.group

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }

object GroupProcessorRegion {
  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    case GroupEnvelope(groupId, Some(dialogEnvelope), _, _) ?
      (
        groupId.toString,
        dialogEnvelope
      )
    case env @ GroupEnvelope(groupId, _, command, query) ?
      (
        groupId.toString,
        // payload
        if (query.isDefined) {
          env.getField(GroupEnvelope.descriptor.findFieldByNumber(query.number))
        } else {
          env.getField(GroupEnvelope.descriptor.findFieldByNumber(command.number))
        }
      )
  }

  private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
    case env: GroupEnvelope ? (env.groupId % 100).toString // TODO: configurable
  }

  private val typeName = "GroupProcessor"

  private def start(props: Props)(implicit system: ActorSystem): GroupProcessorRegion =
    GroupProcessorRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))

  def start()(implicit system: ActorSystem): GroupProcessorRegion = start(GroupProcessor.props)

  def startProxy()(implicit system: ActorSystem): GroupProcessorRegion =
    GroupProcessorRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))
}

case class GroupProcessorRegion(ref: ActorRef)

case class GroupViewRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:54,代码来源:GroupProcessorRegion.scala


示例2: SeqUpdatesManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.sequence

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.event.Logging

import scala.util.{ Success, Try }

final case class SeqUpdatesManagerRegion(ref: ActorRef)

object SeqUpdatesManagerRegion {

  import UserSequenceCommands._

  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    val log = Logging(system, getClass)

    {
      case e @ Envelope(userId, payload) ? (userId.toString, Try(e.getField(Envelope.descriptor.findFieldByNumber(payload.number))) match {
        case Success(any) ? any
        case _ ?
          val error = new RuntimeException(s"Payload not found for $e")
          log.error(error, error.getMessage)
          throw error
      })
    }
  }

  private val extractShardId: ShardRegion.ExtractShardId = {
    case Envelope(userId, _) ? (userId % 10).toString // TODO: configurable
  }

  private val typeName = "SeqUpdatesManager"

  private def start(props: Props)(implicit system: ActorSystem): SeqUpdatesManagerRegion =
    SeqUpdatesManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId
    ))

  def start()(
    implicit
    system: ActorSystem
  ): SeqUpdatesManagerRegion =
    start(UserSequence.props)

  def startProxy()(implicit system: ActorSystem): SeqUpdatesManagerRegion =
    SeqUpdatesManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId
    ))
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:58,代码来源:SeqUpdatesManagerRegion.scala


示例3: MormontShardRegion

//设置package包名称以及导入依赖的类
package lib.cluster

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import lib.MormontConfig
import lib.cluster.MormontShardRegion.MormontShardRegionActorRef
import lib.models.PayLoad


abstract class MormontShardRegion[T <: PayLoad](actorSystem: ActorSystem) {
  private val prefix : String = "shard-region-"
  def channelName: String

  def entityProps: Props

  final private val extractEntityId: ShardRegion.ExtractEntityId = {
    case p: PayLoad ?
      (p.persistenceId, p)
  }

  final private val extractShardId: ShardRegion.ExtractShardId = {
    case p: PayLoad ?
      (math.abs(p.persistenceId.hashCode) % MormontConfig.numOfShards).toString
  }

  final def startShard(): MormontShardRegionActorRef = {
   ClusterSharding(actorSystem).start(
      typeName = prefix + channelName,
      entityProps = entityProps,
      settings = ClusterShardingSettings(actorSystem),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    )
  }
}

object MormontShardRegion {
  type MormontShardRegionActorRef = ActorRef
} 
开发者ID:sharma-rohit,项目名称:mormont,代码行数:40,代码来源:MormontShardRegion.scala


示例4: ShardedApp

//设置package包名称以及导入依赖的类
package com.michalplachta.shoesorter.api

import akka.actor.{ ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
import com.michalplachta.shoesorter.SortingDecider
import com.typesafe.config.ConfigFactory
import kamon.Kamon

object ShardedApp extends App {
  val config = ConfigFactory.load("sharded")
  implicit val system = ActorSystem(config getString "application.name", config)

  ClusterSharding(system).start(
    typeName        = SortingDecider.shardName,
    entityProps     = SortingDecider.props,
    settings        = ClusterShardingSettings(system),
    extractEntityId = SortingDecider.extractEntityId,
    extractShardId  = SortingDecider.extractShardId
  )

  val guardian = ClusterSharding(system).shardRegion(SortingDecider.shardName)
  new RestInterface(guardian, config getInt "application.exposed-port")
} 
开发者ID:miciek,项目名称:monitoring-akka-prometheus-kamon,代码行数:24,代码来源:ShardedApp.scala


示例5: WeakUpdatesManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.sequence

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterShardingSettings, ClusterSharding, ShardRegion }
import im.actor.server.sequence.WeakUpdatesManager.Envelope

object WeakUpdatesManagerRegion {
  private val extractEntityId: ShardRegion.ExtractEntityId = {
    case env @ Envelope(authId, payload) ? (authId.toString, env)
  }

  private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
    case Envelope(authId, _) ? (authId % 32).toString // TODO: configurable
  }

  private val typeName = "WeakUpdatesManager"

  private def startRegion(props: Props)(implicit system: ActorSystem): WeakUpdatesManagerRegion = {
    WeakUpdatesManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))
  }

  def startRegion()(implicit system: ActorSystem): WeakUpdatesManagerRegion = startRegion(WeakUpdatesManager.props)

  def startRegionProxy()(implicit system: ActorSystem): WeakUpdatesManagerRegion =
    WeakUpdatesManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

}

case class WeakUpdatesManagerRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:41,代码来源:WeakUpdatesManagerRegion.scala


示例6: GroupPresenceManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.presences

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.GroupPresenceManager.Envelope

object GroupPresenceManagerRegion {
  private val extractEntityId: ShardRegion.ExtractEntityId = {
    case env @ Envelope(userId, payload) ? (userId.toString, env)
  }

  private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
    case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
  }

  private val typeName = "GroupPresenceManager"

  private def startRegion(props: Props)(implicit system: ActorSystem): GroupPresenceManagerRegion =
    GroupPresenceManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

  def startRegion()(implicit system: ActorSystem): GroupPresenceManagerRegion = startRegion(GroupPresenceManager.props)

  def startRegionProxy()(implicit system: ActorSystem): GroupPresenceManagerRegion =
    GroupPresenceManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))
}

case class GroupPresenceManagerRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:39,代码来源:GroupPresenceManagerRegion.scala


示例7: PresenceManagerRegion

//设置package包名称以及导入依赖的类
package im.actor.server.presences

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import im.actor.server.presences.PresenceManager.Envelope

object PresenceManagerRegion {
  private val extractEntityId: ShardRegion.ExtractEntityId = {
    case env @ Envelope(userId, payload) ? (userId.toString, payload)
  }

  private val extractShardId: ShardRegion.ExtractShardId = msg ? msg match {
    case Envelope(userId, _) ? (userId % 32).toString // TODO: configurable
  }

  private val typeName = "PresenceManager"

  private def startRegion(props: Props)(implicit system: ActorSystem): PresenceManagerRegion =
    PresenceManagerRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

  def startRegion()(implicit system: ActorSystem): PresenceManagerRegion = startRegion(PresenceManager.props)

  def startRegionProxy()(implicit system: ActorSystem): PresenceManagerRegion =
    PresenceManagerRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    ))

}

case class PresenceManagerRegion(val ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:40,代码来源:PresenceManagerRegion.scala


示例8: UserProcessorRegion

//设置package包名称以及导入依赖的类
package im.actor.server.user

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }

object UserProcessorRegion {
  private def extractEntityId(system: ActorSystem): ShardRegion.ExtractEntityId = {
    {
      case c: UserCommand ? (c.userId.toString, c)
      case q: UserQuery   ? (q.userId.toString, q)
      case e @ UserEnvelope(
        userId,
        dialogRootEnvelope,
        dialogEnvelope
        ) ?
        (
          userId.toString,
          dialogRootEnvelope.getOrElse(dialogEnvelope.get)
        )
    }
  }

  private def extractShardId(system: ActorSystem): ShardRegion.ExtractShardId = {
    case c: UserCommand  ? (c.userId % 100).toString // TODO: configurable
    case q: UserQuery    ? (q.userId % 100).toString
    case e: UserEnvelope ? (e.userId % 100).toString
  }

  val typeName = "UserProcessor"

  private def start(props: Props)(implicit system: ActorSystem): UserProcessorRegion =
    UserProcessorRegion(ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system),
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))

  def start()(implicit system: ActorSystem): UserProcessorRegion =
    start(UserProcessor.props)

  def startProxy()(implicit system: ActorSystem): UserProcessorRegion =
    UserProcessorRegion(ClusterSharding(system).startProxy(
      typeName = typeName,
      role = None,
      extractEntityId = extractEntityId(system),
      extractShardId = extractShardId(system)
    ))
}

final case class UserProcessorRegion(ref: ActorRef) 
开发者ID:wex5,项目名称:dangchat-server,代码行数:53,代码来源:UserProcessorRegion.scala


示例9: entityId

//设置package包名称以及导入依赖的类
package com.mooneyserver.dublinpubs.shite_talking.protocol

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.mooneyserver.dublinpubs.shite_talking.protocol.actors.ShiteTalker
import com.mooneyserver.dublinpubs.shite_talking.protocol.actors.models.{DrunkenWaffle, ShiteTalkerIsYapping}

trait ShiteTalkingActors {

  lazy val actorSystem = ActorSystem("Shite-Talkers")

  val messageExtractor = new HashCodeMessageExtractor(25) {
    override def entityId(message: Any): String = message match {
      case msg @ ShiteTalkerIsYapping(id) => id
      case msg @ DrunkenWaffle(id, _, _, _) => id
    }
  }

  lazy val shiteTalkersShard: ActorRef = ClusterSharding(actorSystem).start(
    typeName = "ShiteTalker",
    entityProps = Props[ShiteTalker],
    settings = ClusterShardingSettings(actorSystem),
    messageExtractor = messageExtractor)
} 
开发者ID:irishshagua,项目名称:dublin-pubs-shite-talking,代码行数:26,代码来源:ShiteTalkingActors.scala


示例10: UserWorkerSharding

//设置package包名称以及导入依赖的类
package homeworkzen.clustering

import akka.actor.{ActorRef, ActorSystem, Props}
import homeworkzen.domain.command.actor._
import homeworkzen.domain.command.message._
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import homeworkzen.Config

object UserWorkerSharding {
  def register(implicit system: ActorSystem): ActorRef =
    ClusterSharding(system).start(
      "UserWorker",
      Props(new UserWorker),
      ClusterShardingSettings(system),
      idExtractor,
      shardResolver)

  def props(): Props = Props(new UserWorker)

  val idExtractor: ShardRegion.ExtractEntityId = {
    case msg: UserCommand => (msg.userId.id.toString, msg)
  }

  val shardResolver: ShardRegion.ExtractShardId = {
    case msg: UserCommand => (math.abs(msg.userId.id.hashCode()) % Config.Cluster.shardCount).toString
  }

  val shardName: String = "UserSharding"
} 
开发者ID:anopse,项目名称:HomeworkZen,代码行数:30,代码来源:UserWorkerSharding.scala


示例11: ShardedIdGenerator

//设置package包名称以及导入依赖的类
package org.tanukkii.reactive.snowflake

import akka.actor.{ActorRef, Props, Actor}
import akka.cluster.sharding.{ClusterShardingSettings, ClusterSharding}

class ShardedIdGenerator extends Actor {
  ClusterSharding(context.system).start(
    typeName = ShardedIdRouter.shardName,
    entityProps = ShardedIdRouter.props,
    settings = ClusterShardingSettings(context.system),
    extractEntityId = ShardedIdRouter.idExtractor,
    extractShardId = ShardedIdRouter.shardResolver
  )

  def shardedIdRouter: ActorRef = {
    ClusterSharding(context.system).shardRegion(ShardedIdRouter.shardName)
  }

  def receive: Receive = {
    case msg => {
      shardedIdRouter forward msg
    }
  }

}

object ShardedIdGenerator {
  def props: Props = Props(new ShardedIdGenerator)

  def name: String = "idGenerator"
} 
开发者ID:TanUkkii007,项目名称:reactive-snowflake,代码行数:32,代码来源:ShardedIdGenerator.scala


示例12: PubSubTest

//设置package包名称以及导入依赖的类
package com.mooneyserver.akkapubsub

import akka.actor.ActorSystem
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

object PubSubTest {

  implicit val system = ActorSystem("PubSubTestSystem")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def main(args: Array[String]) {
    val betShard = ClusterSharding(system).start(
      typeName = "SubscriberShard",
      entityProps = SubscribingActor.props,
      settings = ClusterShardingSettings(system),
      extractEntityId = SubscribingActor.idExtractor,
      extractShardId = SubscribingActor.shardExtractor)

    val selectionShard = ClusterSharding(system).start(
      typeName = "PublisherShard",
      entityProps = PublisherActor.props,
      settings = ClusterShardingSettings(system),
      extractEntityId = PublisherActor.idExtractor,
      extractShardId = PublisherActor.shardExtractor)

    Http().bindAndHandle(
      new RestService(betShard, selectionShard).routes,
      "localhost",
      8080)
  }
} 
开发者ID:irishshagua,项目名称:akka-distributed-pub-sub-testing,代码行数:35,代码来源:PubSubTest.scala


示例13: Main

//设置package包名称以及导入依赖的类
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import api.Api
import com.typesafe.config.ConfigFactory
import config.AppConfig
import services.UserProcessor

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._

object Main extends App {

  private final class Root extends Actor with ActorLogging {
    log.info("App up and running...")

    val appConfig: AppConfig = context.system.settings.config.as[AppConfig]("app")

    override val supervisorStrategy = SupervisorStrategy.stoppingStrategy

    ClusterSharding(context.system).start(
      typeName = UserProcessor.shardName,
      entityProps = Props[UserProcessor],
      settings = ClusterShardingSettings(context.system),
      extractEntityId = UserProcessor.idExtractor,
      extractShardId = UserProcessor.shardResolver(appConfig.maxShards)
    )

    private val api = context.actorOf(Api.props(), name = Api.name)
    context.watch(api)

    override def receive = {
      case Terminated(actor) =>
        log.error("Terminating the system because {} terminated!", actor.path)
        context.system.terminate()
    }

  }

  override def main(args: Array[String]): Unit = {
    // read config
    val config = ConfigFactory.load()
    implicit val system = ActorSystem("application", config)
    Cluster(system).registerOnMemberUp(system.actorOf(Props(classOf[Root]), "root"))
    Await.ready(system.whenTerminated, Duration.Inf)
  }

} 
开发者ID:MavenCode,项目名称:akkaDocker,代码行数:51,代码来源:Main.scala


示例14: BathurstMain

//设置package包名称以及导入依赖的类
package com.abstractcode.bathurst

import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.abstractcode.bathurst.Catalog.{AddItemToCatalog, SetName}
import com.typesafe.config.ConfigFactory


object BathurstMain extends App {

  val port = if (args.isEmpty) "0" else args(0)

  println(s"Running Bathurst on port $port")

  val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
    .withFallback(ConfigFactory.defaultApplication())

  val system = ActorSystem("bathurst", config)
  val settings = ClusterShardingSettings(system)

  val items: ActorRef = ClusterSharding(system).start(
    typeName = "Item",
    entityProps = Item.props(),
    settings = settings,
    extractEntityId = Item.extractEntityId,
    extractShardId = Item.extractShardId)

  val borrowers: ActorRef = ClusterSharding(system).start(
    typeName = "Borrower",
    entityProps = Borrower.props(),
    settings = settings,
    extractEntityId = Borrower.extractEntityId,
    extractShardId = Borrower.extractShardId)

  val catalogs: ActorRef = ClusterSharding(system).start(
    typeName = "Catalog",
    entityProps = Catalog.props(items, borrowers),
    settings = settings,
    extractEntityId = Catalog.extractEntityId,
    extractShardId = Catalog.extractShardId)

  if (port == "2552") {
    Cluster(system) registerOnMemberUp {

      catalogs ! SetName(1, "thing")

      for (a <- 1 to 30) {
        catalogs ! AddItemToCatalog(1, "whatever")
      }
    }
  }

} 
开发者ID:ColinScott,项目名称:bathurst,代码行数:55,代码来源:BathurstMain.scala


示例15: ApplicationMain

//设置package包名称以及导入依赖的类
package com.example

import akka.actor.ActorSystem
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import akka.http.scaladsl.Http
import akka.persistence.Persistence
import akka.stream.ActorMaterializer
import akka.stream.alpakka.amqp._
import akka.stream.alpakka.amqp.scaladsl.AmqpSource
import akka.stream.scaladsl.Sink
import com.example.OrderActor._
import com.typesafe.config.ConfigFactory

object  ApplicationMain extends App with OrderFlow with OrderApi {
  val config = ConfigFactory.load()

  implicit val system = ActorSystem("order", config)

  val localSystem = ActorSystem("local", config.getConfig("local"))
  implicit val mat = ActorMaterializer()(localSystem)

  Persistence(system)

  override val orderActor = ClusterSharding(system).start(
    typeName = OrderActor.shardName,
    entityProps = OrderActor.props,
    settings = ClusterShardingSettings(system),
    extractShardId = OrderActor.extractShardId,
    extractEntityId = OrderActor.extractEntityId)

  val orderQueue = "order-akka.queue"
  BindingDeclaration(orderQueue, "order.exchange").withRoutingKey("#")
  val queueDeclaration = QueueDeclaration(orderQueue).withDurable(true).withAutoDelete(false)

  val amqpSource = AmqpSource(
    NamedQueueSourceSettings(AmqpConnectionDetails("192.168.99.100", 5672), orderQueue).withDeclarations(queueDeclaration),
    bufferSize = 10
  )

  amqpSource
    .log("order.queue")
    .via(deliveryToCreateOrderFlow())
    .to(Sink.actorRef[CreateOrder](orderActor, "done"))
    .run()

  Http().bindAndHandle(orderRoutes, "localhost", 8070)
} 
开发者ID:mduesterhoeft,项目名称:order-akka,代码行数:48,代码来源:ApplicationMain.scala


示例16: ShardingApp

//设置package包名称以及导入依赖的类
package com.example.sharding

import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import com.typesafe.config.ConfigFactory

import scala.io.StdIn


object ShardingApp extends App {

  val port: Int = args(0).toInt

  val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load("cluster-sharding.conf"))

  val system = ActorSystem("ClusterSharding", config)

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case ClientMessage(message) => (message, message)
  }

  val extractShardId: ShardRegion.ExtractShardId = {
    case ClientMessage(message) => message
  }

  ClusterSharding(system).start(
    typeName = "Client",
    entityProps = Props[ClientActor],
    settings = ClusterShardingSettings(system),
    extractEntityId = extractEntityId,
    extractShardId = extractShardId
  )

  // terminate system
  StdIn.readLine()
  system.terminate()

}

class ReceiverActor extends Actor {
  override def receive: Receive = {
    case ShardingMessage(message) => {

    }
  }
}

class ClientActor extends Actor {
  override def receive: Receive = {
    case ClientMessage(message) =>

  }
} 
开发者ID:TechResearchID,项目名称:akka-cluster,代码行数:56,代码来源:ShardingApp.scala


示例17: AccountApp

//设置package包名称以及导入依赖的类
package sample.blog

import akka.actor.{ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.typesafe.config.ConfigFactory


object AccountApp {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty)
      startup(Seq("2551", "2552", "0"))
    else
      startup(args)
  }

  def startup(ports: Seq[String]): Unit = {
    println("The ports are - " + ports )
    ports foreach { port =>
      //val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).withFallback(ConfigFactory.load())

      val config = ConfigFactory.parseString(s"""akka {
          remote {
              netty.tcp {
              port = ${port}
            }
          }
          cluster {
            roles = ["MyWorker","AccountEntity"]
          }
        }""").withFallback(ConfigFactory.load())

      // Create an Akka system
      val system = ActorSystem("ClusterSystem", config)

      

      ClusterSharding(system).start(
        typeName = "SupervisedAccount",
        entityProps = Props[AccountSupervisor],
        settings = ClusterShardingSettings(system),
        extractEntityId = AccountEntity.idExtractor,
        extractShardId = AccountEntity.shardResolver)

    }
  }
} 
开发者ID:iamanandkris,项目名称:cluster-sharding-experiment,代码行数:47,代码来源:AccountApp.scala


示例18: GenericAkkaRuntimeSettings

//设置package包名称以及导入依赖的类
package aecor.runtime.akkageneric

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.cluster.sharding.ClusterShardingSettings

import scala.concurrent.duration._

final case class GenericAkkaRuntimeSettings(numberOfShards: Int,
                                            idleTimeout: FiniteDuration,
                                            askTimeout: FiniteDuration,
                                            clusterShardingSettings: ClusterShardingSettings)

object GenericAkkaRuntimeSettings {

  
  def default(system: ActorSystem): GenericAkkaRuntimeSettings = {
    val config = system.settings.config.getConfig("aecor.akka-runtime")
    def getMillisDuration(path: String): FiniteDuration =
      Duration(config.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)

    GenericAkkaRuntimeSettings(
      config.getInt("number-of-shards"),
      getMillisDuration("idle-timeout"),
      getMillisDuration("ask-timeout"),
      ClusterShardingSettings(system)
    )
  }
} 
开发者ID:notxcain,项目名称:aecor,代码行数:31,代码来源:GenericAkkaRuntimeSettings.scala


示例19: AkkaPersistenceRuntimeSettings

//设置package包名称以及导入依赖的类
package aecor.runtime.akkapersistence

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.cluster.sharding.ClusterShardingSettings

import scala.concurrent.duration._

final case class AkkaPersistenceRuntimeSettings(numberOfShards: Int,
                                                idleTimeout: FiniteDuration,
                                                askTimeout: FiniteDuration,
                                                clusterShardingSettings: ClusterShardingSettings)

object AkkaPersistenceRuntimeSettings {

  
  def default(system: ActorSystem): AkkaPersistenceRuntimeSettings = {
    val config = system.settings.config.getConfig("aecor.akka-runtime")
    def getMillisDuration(path: String): FiniteDuration =
      Duration(config.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)

    AkkaPersistenceRuntimeSettings(
      config.getInt("number-of-shards"),
      getMillisDuration("idle-timeout"),
      getMillisDuration("ask-timeout"),
      ClusterShardingSettings(system)
    )
  }
} 
开发者ID:notxcain,项目名称:aecor,代码行数:31,代码来源:AkkaPersistenceRuntimeSettings.scala


示例20: UserService

//设置package包名称以及导入依赖的类
package nl.tradecloud.user

import akka.actor._
import akka.cluster.Cluster
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import nl.tradecloud.user.repositories.UserRepository

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object UserService extends App {
   private final class Root extends Actor with ActorLogging {
     log.info("UserService up and running...")

     val appConfig: AppConfig = context.system.settings.config.as[AppConfig]("app")

     override val supervisorStrategy = SupervisorStrategy.stoppingStrategy

     ClusterSharding(context.system).start(
       typeName = UserRepository.shardName,
       entityProps = Props[UserRepository],
       settings = ClusterShardingSettings(context.system),
       extractEntityId = UserRepository.idExtractor,
       extractShardId = UserRepository.shardResolver(appConfig.maxShards)
     )

     context.watch(context.actorOf(Api.props(), name = Api.name))

     override def receive = {
       case Terminated(actor) =>
         log.error("Terminating the system because {} terminated!", actor.path)
         context.system.terminate()
     }

   }

   val config = ConfigFactory.load()
   implicit val system = ActorSystem("user-service", config)
   Cluster(system).registerOnMemberUp(system.actorOf(Props(classOf[Root]), "root"))
   Await.ready(system.whenTerminated, Duration.Inf)
 } 
开发者ID:tradecloud,项目名称:tradecloud-microservices-demo,代码行数:45,代码来源:UserService.scala



注:本文中的akka.cluster.sharding.ClusterShardingSettings类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Extraction类代码示例发布时间:2022-05-23
下一篇:
Scala UserService类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap