本文整理汇总了Scala中akka.cluster.pubsub.DistributedPubSub类的典型用法代码示例。如果您正苦于以下问题:Scala DistributedPubSub类的具体用法?Scala DistributedPubSub怎么用?Scala DistributedPubSub使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DistributedPubSub类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: PublisherUpdate
//设置package包名称以及导入依赖的类
package com.mooneyserver.akkapubsub
import akka.actor.{Props, ActorLogging, Actor}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.cluster.sharding.ShardRegion
// Domain
final case class PublisherUpdate(publisher: UpdateableEvent)
object PublisherActor {
lazy val props = Props(classOf[PublisherActor])
val idExtractor: ShardRegion.ExtractEntityId = {
case s: PublisherUpdate => (s.publisher.id.toString, s.publisher)
}
val shardExtractor: ShardRegion.ExtractShardId = {
case msg: PublisherUpdate => (msg.publisher.id % 100).toString
}
}
class PublisherActor extends Actor with ActorLogging {
log.info(s"Publisher ${self.path.name} actor instance created")
val mediator = DistributedPubSub(context.system).mediator
override def receive: Receive = {
case event: UpdateableEvent => {
log.info(s"$event to be routed to all listeners")
mediator ! Publish(s"Publisher-${self.path.name}", event)
}
}
}
开发者ID:irishshagua,项目名称:akka-distributed-pub-sub-testing,代码行数:37,代码来源:PublisherActor.scala
示例2: Application
//设置package包名称以及导入依赖的类
package controllers
import javax.inject._
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Flow, _}
import play.api.mvc._
@Singleton
class Application @Inject() (system: ActorSystem) extends Controller {
val mediator = DistributedPubSub(system).mediator
def socket = WebSocket.accept[String, String] { request =>
new Chat(mediator, "chat").flow
}
}
class Chat(mediator: ActorRef, topic: String) {
def flow: Flow[String, String, NotUsed] =
Flow.fromSinkAndSource(
publishToMediatorSink,
sourceFrom)
private def publishToMediatorSink[In]: Sink[In, NotUsed] =
Flow[In].map(DistributedPubSubMediator.Publish(topic, _)) to
Sink.actorRef[DistributedPubSubMediator.Publish](mediator, ())
private def sourceFrom[Out]: Source[Out, ActorRef] =
Source
.actorRef[Out](5, OverflowStrategy.fail)
.mapMaterializedValue { ref => mediator ! DistributedPubSubMediator.Subscribe(topic, ref); ref }
}
开发者ID:jonasanso,项目名称:play-simple-chat,代码行数:40,代码来源:Application.scala
示例3: UserSocket
//设置package包名称以及导入依赖的类
package actors
import actors.UserSocket.{ChatMessage, Message}
import actors.UserSocket.Message.messageReads
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
import akka.event.LoggingReceive
import play.api.libs.json.{Writes, JsValue, Json}
import play.twirl.api.HtmlFormat
import scala.xml.Utility
object UserSocket {
def props(user: String)(out: ActorRef) = Props(new UserSocket(user, out))
case class Message(msg: String)
object Message {
implicit val messageReads = Json.reads[Message]
}
case class ChatMessage(user: String, text: String)
object ChatMessage {
implicit val chatMessageWrites = new Writes[ChatMessage] {
def writes(chatMessage: ChatMessage): JsValue = {
Json.obj(
"type" -> "message",
"user" -> chatMessage.user,
"text" -> multiLine(chatMessage.text)
)
}
}
private def multiLine(text: String) = {
HtmlFormat.raw(text).body.replace("\n", "<br/>")
}
}
}
class UserSocket(uid: String, out: ActorRef) extends Actor with ActorLogging {
val topic = "chat"
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe(topic, self)
def receive = LoggingReceive {
case js: JsValue =>
js.validate[Message](messageReads)
.map(message => Utility.escape(message.msg))
.foreach { msg => mediator ! Publish(topic, ChatMessage(uid, msg))}
case c:ChatMessage => out ! Json.toJson(c)
}
}
开发者ID:onegrx,项目名称:playakkachat,代码行数:59,代码来源:UserSocket.scala
示例4: MainController
//设置package包名称以及导入依赖的类
package bar.controllers
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.pubsub.DistributedPubSub
import com.google.inject.Inject
import common.BarEvents
import common.BarEvents.EntityInserted
import play.api.libs.json.Json
import play.api.mvc._
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import akka.pattern._
import akka.util.Timeout
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class MainController @Inject()(system: ActorSystem)(implicit ec: ExecutionContext)
extends Controller {
val entities = system.actorOf(Props(new EntitiesService))
val random = new Random()
implicit val timeout = Timeout(5.seconds)
def list() = Action.async {
val currently = (entities ? "get").mapTo[Seq[Int]]
currently.map(x => Ok(Json.toJson(x)))
}
def insert() = Action {
val next = random.nextInt
entities ! next
Created(Json.toJson(random.nextInt))
}
}
class EntitiesService extends Actor with ActorLogging {
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
val mediator = DistributedPubSub(context.system).mediator
val topic = BarEvents.KEY
val existingEntities = ArrayBuffer[Int]()
def receive = {
case "get" => sender ! existingEntities
case next: Int =>
mediator ! Publish(topic, existingEntities)
existingEntities.append(next)
log.warning(s"publishing $existingEntities")
}
}
开发者ID:KadekM,项目名称:example-play-microservice-cluster,代码行数:53,代码来源:MainController.scala
示例5: MainController
//设置package包名称以及导入依赖的类
package foo.controllers
import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.cluster.pubsub.DistributedPubSub
import com.google.inject.Inject
import common.BarEvents
import play.api.libs.json.Json
import play.api.mvc._
import akka.pattern._
import akka.util.Timeout
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class MainController @Inject()(system: ActorSystem)(implicit ec: ExecutionContext)
extends Controller {
val cache = system.actorOf(Props(new Cache()))
val subscriber = system.actorOf(Props(new Subscriber(BarEvents.KEY, cache)))
implicit val timeout = Timeout(5.seconds)
def list() = Action.async {
val currently = (cache ? "get").mapTo[Seq[Int]]
currently.map(x => Json.toJson(x)).map(x => Ok(x))
}
}
class Subscriber(topic: String, forwardTo: ActorRef) extends Actor with ActorLogging {
import akka.cluster.pubsub.DistributedPubSubMediator._
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe(BarEvents.KEY, self)
def receive = {
case ack: SubscribeAck =>
log.warning("subscribing {}", ack)
case x =>
log.warning("forwarding {}", x)
forwardTo ! x
}
}
class Cache extends Actor with ActorLogging {
var cached = Seq.empty[Int]
def receive: Receive = {
case "get" =>
log.warning("getting cache")
sender ! cached
case entities: Seq[Int] =>
log.warning(s"setting cache to $entities")
cached = entities
}
}
开发者ID:KadekM,项目名称:example-play-microservice-cluster,代码行数:56,代码来源:MainController.scala
示例6: topics
//设置package包名称以及导入依赖的类
package org.a4r.actors.pubsub
import akka.actor.Actor
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import org.a4r.Logged
import org.a4r.messages.MessageTopic
trait DistributedPublisher extends Actor with Logged{
// activate the extension
def topics:Seq[MessageTopic]
val mediator = DistributedPubSub(context.system).mediator
def publish( msg:Any ) = {
topics.map(topic => {
info(s"publishing $msg to ${topic.name}")
mediator ! Publish(topic.name, msg)
})
}
}
开发者ID:brendanobra,项目名称:a4r,代码行数:24,代码来源:DistributedPublisher.scala
示例7: topics
//设置package包名称以及导入依赖的类
package org.a4r.actors.pubsub
import akka.actor.Actor
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
import org.a4r.Logged
import org.a4r.messages.MessageTopic
trait DistributedSubscriber extends Actor with Logged {
def topics:Seq[MessageTopic]
import akka.cluster.pubsub.DistributedPubSubMediator.Subscribe
val mediator = DistributedPubSub(context.system).mediator
info(s"subscribing to topics $topics")
topics.map(topic => {
mediator ! Subscribe(topic.name, self)
})
def wrappedReceive:Receive
override def receive: Receive = {
case ack:SubscribeAck =>
info(s"Am now subscribed: $ack")
case msg =>
wrappedReceive(msg)
}
}
开发者ID:brendanobra,项目名称:a4r,代码行数:32,代码来源:DistributedSubscriber.scala
示例8: AkkaToZmqFlow
//设置package包名称以及导入依赖的类
package tmt.demo.connectors
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Subscribe
import akka.stream.scaladsl.Sink
import com.trueaccord.scalapb.GeneratedMessage
import tmt.app.library.Connector
import tmt.app.utils.ActorRuntime
import tmt.demo.zeromq_drivers.ZmqPublisherFactory
class AkkaToZmqFlow(actorRuntime: ActorRuntime, zmqPublisherFactory: ZmqPublisherFactory) {
import actorRuntime._
def connect[Msg <: GeneratedMessage](
subscriberTopic: String,
publishingPort: Int
) = {
val (sourceLinkedRef, source) = Connector.coupling[Msg](Sink.asPublisher(fanout = false))
DistributedPubSub(system).mediator ! Subscribe(subscriberTopic, sourceLinkedRef)
val zmqPublisher = zmqPublisherFactory.make[Msg](publishingPort)
zmqPublisher
.publish(source)
.onComplete { x =>
zmqPublisher.shutdown()
}
}
}
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:29,代码来源:AkkaToZmqFlow.scala
示例9: ZmqToAkkaFlow
//设置package包名称以及导入依赖的类
package tmt.demo.connectors
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import com.trueaccord.scalapb.GeneratedMessageCompanion
import tmt.app.utils.{ActorRuntime, PbMessage}
import tmt.demo.zeromq_drivers.ZmqSubscriberFactory
class ZmqToAkkaFlow(actorRuntime: ActorRuntime, zmqSubscriberFactory: ZmqSubscriberFactory) {
import actorRuntime._
def connect[Msg <: PbMessage.Of[Msg]](
publishingTopic: String,
subscriberPort: Int,
responseParser: GeneratedMessageCompanion[Msg]
) = {
val zmqSubscriber = zmqSubscriberFactory.make(subscriberPort, responseParser)
zmqSubscriber.stream
.runForeach { message =>
DistributedPubSub(system).mediator ! Publish(publishingTopic, message)
}.onComplete { x =>
zmqSubscriber.shutdown()
}
}
}
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:26,代码来源:ZmqToAkkaFlow.scala
示例10: EventPublisher
//设置package包名称以及导入依赖的类
package tmt.demo.hcd_drivers
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.stream.scaladsl.Source
import com.trueaccord.scalapb.GeneratedMessage
import tmt.app.utils.ActorRuntime
class EventPublisher(actorRuntime: ActorRuntime) {
import actorRuntime._
def publish[Msg <: GeneratedMessage](
messages: Source[Msg, Any],
topic: String
) = {
messages.runForeach { message =>
println(s"********* PublisherClient is publishing: $message")
DistributedPubSub(system).mediator ! Publish(topic, message)
}
}
}
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:23,代码来源:EventPublisher.scala
示例11: FileReceiver
//设置package包名称以及导入依赖的类
package api.actors
import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props }
import common.CommonMessages.{ ProcessLine, Received, fileProcessingTopic }
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
object FileReceiver {
def props(): Props = Props(new FileReceiver)
def createFileReceiver(system: ActorSystem): ActorRef = {
system.actorOf(FileReceiver.props())
}
}
class FileReceiver extends Actor with ActorLogging {
val mediator = DistributedPubSub(context.system).mediator
override def receive: Receive = {
case msg: ProcessLine =>
mediator ! Publish(fileProcessingTopic, msg)
sender() ! Received
}
}
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:25,代码来源:FileReceiver.scala
示例12: FrontendApi
//设置package包名称以及导入依赖的类
package api
import akka.actor.{ ActorSystem, Props }
import akka.cluster.pubsub.DistributedPubSub
import akka.pattern.pipe
import akka.event.{ Logging, LoggingAdapter }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import api.actors.{ ClusterListener, FileReceiver }
import api.http.{ HttpApi, Service }
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ ExecutionContext, Future }
object FrontendApi {
def props(): Props = Props(new FrontendApi)
}
class FrontendApi extends HttpApi with Service {
override val name: String = "frontend-api"
val config = ConfigFactory.load()
override val log: LoggingAdapter = Logging(context.system, getClass)
override val host: String = config.getString("frontend.host")
override val port: Int = config.getInt("frontend.port")
override implicit val system: ActorSystem = context.system
override implicit val materializer: ActorMaterializer = ActorMaterializer()
override implicit val ec: ExecutionContext = context.dispatcher
//Start up actors
val clusterListener = system.actorOf(ClusterListener.props())
val fileReceiver = system.actorOf(FileReceiver.props())
val mediator = DistributedPubSub(system).mediator
override val paths: Route = routes(s"$name", clusterListener, mediator)
override val http: Future[Http.ServerBinding] = Http(system).bindAndHandle(
paths,
host,
port
)
http pipeTo self
}
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:49,代码来源:FrontendApi.scala
示例13: FileProcessor
//设置package包名称以及导入依赖的类
package processing
import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props }
import common.CommonMessages.{ KillYourself, ProcessLine, fileProcessingTopic }
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck }
object FileProcessor {
def props(): Props = Props(new FileProcessor)
def createFileProcessor(system: ActorSystem): ActorRef = {
system.actorOf(FileProcessor.props())
}
}
class FileProcessor extends Actor with ActorLogging {
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe(fileProcessingTopic, self)
override def receive: Receive = {
case SubscribeAck(Subscribe(topic, None, `self`)) ?
log.info(s"subscribing to $topic")
case msg: ProcessLine =>
log.info(msg.toString)
case KillYourself =>
context stop self
}
}
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:32,代码来源:FileProcessor.scala
示例14: Notification
//设置package包名称以及导入依赖的类
package com.packt.chapter7
import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
case class Notification(title: String, body: String)
class NotificationSubscriber extends Actor {
val mediator = DistributedPubSub(context.system).mediator
// subscribe to the topic named "notifications"
mediator ! Subscribe("notification", self)
val cluster = Cluster(context.system)
val clusterAddress = cluster.selfUniqueAddress
def receive = {
case notification: Notification ?
println(s"Got notification in node $clusterAddress => $notification")
case SubscribeAck(Subscribe("notification", None, `self`)) ?
println("subscribing");
}
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:25,代码来源:NotificationSubscriber.scala
示例15: TasksHandlingActor
//设置package包名称以及导入依赖的类
package com.github.mkorman9.districron.actor
import akka.actor.{Actor, ActorRef}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Put
import com.github.mkorman9.districron.ExecutionStatus
import com.github.mkorman9.districron.Messages.ConfirmExecutionCompleted
import com.github.mkorman9.districron.actor.Messages.{BroadcastTaskExecutionToRecipients, ReadTasksRegistry, StartTasksScheduler, TaskExecutionTick}
import com.github.mkorman9.districron.logic.{MessageSender, TasksManager}
import com.github.mkorman9.districron.model.Task
object TasksHandlingActor {
final val name = com.github.mkorman9.districron.Actor.Name
}
class TasksHandlingActor(tasksManager: TasksManager, messageSender: MessageSender) extends Actor {
private var clusterMediator: ActorRef = _
override def receive: Receive = {
case StartTasksScheduler => {
clusterMediator = DistributedPubSub(context.system).mediator
clusterMediator ! Put(self)
self ! ReadTasksRegistry
}
case ReadTasksRegistry =>
tasksManager.scheduleAllExistingTasks
case TaskExecutionTick(task: Task) =>
tasksManager.executeTask(task)
case BroadcastTaskExecutionToRecipients(executionId: Long, task: Task) =>
messageSender.broadcastExecutionMessage(clusterMediator, self, executionId, task)
case ConfirmExecutionCompleted(executionId: Long, status: ExecutionStatus) =>
tasksManager.completeTaskExecution(executionId, status)
}
}
开发者ID:mkorman9,项目名称:districron,代码行数:39,代码来源:TasksHandlingActor.scala
示例16: UserActor
//设置package包名称以及导入依赖的类
package actor
import akka.actor.{Actor, ActorRef, ActorSelection, Props}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
import models.{ToPublish, _}
import play.api.libs.json.Json
import models.Formats._
import models.MsgTypes._
class UserActor(out: ActorRef) extends Actor {
val mediator: ActorRef = DistributedPubSub(context.system).mediator
mediator ! Subscribe("content", self)
val publisher: ActorSelection = context.system.actorSelection("/user/publisher")
val playlist: ActorSelection = context.system.actorSelection("/user/playlist")
playlist ! GetPlaylist
def receive = {
case SubscribeAck(Subscribe("content", None, `self`)) ? println("subscribing")
case msg: Payload if msg.event == PUBLISH =>
publisher ! ToPublish(msg)
case ToPublish(payload) =>
out ! payload
case msg: Payload if msg.event == PLAY || msg.event == STOP =>
playlist ! msg
case msg: Payload if msg.event == NEXT =>
playlist ! PlayNext
case msg: Payload if msg.event == ADD =>
playlist ! AddToPlaylist(msg.content.get)
case msg: Payload if msg.event == CLEAR =>
playlist ! ClearPlaylist
publisher ! ToPublish(msg)
case msg: Playlist =>
out ! Payload(PLAYLIST, Some(Json.toJson(msg).toString()))
case Payload(event, content) =>
out ! Payload("response", content)
}
}
object UserActor {
def props(out: ActorRef) = Props(new UserActor(out))
}
开发者ID:oen9,项目名称:bard-api,代码行数:49,代码来源:UserActor.scala
示例17: Notification
//设置package包名称以及导入依赖的类
package com.nossin.ndb.cluster
import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
case class Notification(title: String, body: String)
class NotificationSubscriber extends Actor {
val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe("notification", self)
val cluster = Cluster(context.system)
val clusterAddress = cluster.selfUniqueAddress
def receive = {
case notification: Notification =>
println(s"Got notification in node $clusterAddress => $notification")
case SubscribeAck(Subscribe("notification", None, `self`)) ? println("subscribing");
}
}
开发者ID:michelnossin,项目名称:ndb,代码行数:23,代码来源:NotificationSubscriber.scala
示例18: Subscriber
//设置package包名称以及导入依赖的类
package org.dmonix.area51.akka.cluster.extensions
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator._
import org.dmonix.area51.akka.cluster.ClusterSettings
import org.dmonix.area51.akka.cluster.Messages.{Message, Response}
class Subscriber extends Actor with ActorLogging {
def receive = {
case Message(msg) =>
log.info(s"Subscriber [$self] Got [$msg] from [$sender]")
sender ! Response(s"Response to [$msg]")
case SubscribeAck(Subscribe(topic, group, _)) =>
log.info(s"Subscriber [$self] is now subscribed to topic [$topic] with group [$group]");
case a:Any =>
log.warning(s"Subscriber [$self] got unexpected message [$a] from [$sender]")
}
}
object SubscriberStarter extends App with ClusterSettings {
System.setProperty("config.file", "src/main/resources/akka-cfg/cluster-ext-member-tcp.conf");
val actorSystem = ActorSystem(actorSystemName)
val cluster = Cluster(actorSystem)
cluster.joinSeedNodes(seedNodes)
val mediator = DistributedPubSub(actorSystem).mediator
def registerService(serviceName:String): Unit = {
val actor = actorSystem.actorOf(Props(new Subscriber), serviceName)
//Get different behavior if subscribed with a group or not.
// val group = Option("group")
val group = None
mediator.tell(Subscribe(serviceName, group, actor),actor) //registers the actor to subscribe to the topic with the provided service name
}
registerService("ServiceA")
registerService("ServiceB")
}
开发者ID:pnerg,项目名称:area51-akka,代码行数:44,代码来源:Subscriber.scala
示例19: Publisher
//设置package包名称以及导入依赖的类
package org.dmonix.area51.akka.cluster.extensions
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import org.dmonix.area51.akka.cluster.Messages.{Broadcast, Message, Response, Unicast}
import org.dmonix.area51.akka.cluster.{ClusterSettings, SameThreadExecutionContext}
class Publisher(mediator:ActorRef) extends Actor with ActorLogging{
def receive = {
//sends the provided message to ONE instance of all the provided service names
case Unicast(serviceNames, message) =>
serviceNames.foreach(sName => mediator ! Publish(sName,message,true))
//sends the provided message to ALL instances of all the provided service names
case Broadcast(serviceNames, message) =>
serviceNames.foreach(sName => mediator ! Publish(sName,message,false))
case Response(rsp) =>
log.info(s"Publisher [$self] got response [$rsp] from [$sender]")
case a:Any ?
log.warning(s"Publisher [$self] got unexpected message [$a] from [$sender]")
}
}
object PublisherStarter extends App with SameThreadExecutionContext with ClusterSettings {
System.setProperty("config.file", "src/main/resources/akka-cfg/cluster-ext-member-tcp.conf");
val actorSystem = ActorSystem(actorSystemName)
val mediator = DistributedPubSub(actorSystem).mediator
val cluster = Cluster(actorSystem)
cluster.joinSeedNodes(seedNodes)
val actor = actorSystem.actorOf(Props(new Publisher(mediator)), "Publisher")
//appears that this sleep is needed for the communication to be up
//sending something too early just causes the message to disappear into void without any trace nor log
Thread.sleep(2000)
actor ! Unicast(Seq("ServiceA", "ServiceB"), Message("Unicast Hello-1!!!"))
actor ! Unicast(Seq("ServiceA", "ServiceB"), Message("Unicast Hello-2!!!"))
actor ! Broadcast(Seq("ServiceA"), Message("Broadcast Hello Again!!!"))
//stupid pause to allow for responses to propagate and be logged
Thread.sleep(2000)
actorSystem.terminate().onComplete(_ => System.exit(1))
}
开发者ID:pnerg,项目名称:area51-akka,代码行数:49,代码来源:Publisher.scala
示例20: ServiceProvider
//设置package包名称以及导入依赖的类
package org.dmonix.area51.akka.cluster.extensions
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Put
import org.dmonix.area51.akka.cluster.ClusterSettings
import org.dmonix.area51.akka.cluster.Messages._
class ServiceProvider extends Actor with ActorLogging {
override def preStart():Unit = {
println(s"Starting [$self]")
}
def receive = {
case Message(msg) ?
log.info(s"Service provider [$self] Got [$msg] from [$sender]")
sender ! Response(s"Response to [$msg]")
case a:Any ?
log.warning(s"Service provider [$self] got unexpected message [$a] from [$sender]")
}
}
object ServiceProviderStarter extends App with ClusterSettings {
System.setProperty("config.file", "src/main/resources/akka-cfg/cluster-ext-member-tcp.conf");
val actorSystem = ActorSystem(actorSystemName)
val cluster = Cluster(actorSystem)
cluster.joinSeedNodes(seedNodes)
val mediator = DistributedPubSub(actorSystem).mediator
def registerService(serviceName:String): Unit = {
val actor = actorSystem.actorOf(Props(new ServiceProvider), serviceName)
mediator ! Put(actor) //this registers the actor to the cluster using the name of the actor (/user/serviceName)
}
registerService("ServiceA")
registerService("ServiceB")
}
开发者ID:pnerg,项目名称:area51-akka,代码行数:42,代码来源:ServiceProvider.scala
注:本文中的akka.cluster.pubsub.DistributedPubSub类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论