• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Subscribe类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.cluster.pubsub.DistributedPubSubMediator.Subscribe的典型用法代码示例。如果您正苦于以下问题:Scala Subscribe类的具体用法?Scala Subscribe怎么用?Scala Subscribe使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Subscribe类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: UserSocket

//设置package包名称以及导入依赖的类
package actors

import actors.UserSocket.{ChatMessage, Message}
import actors.UserSocket.Message.messageReads
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
import akka.event.LoggingReceive
import play.api.libs.json.{Writes, JsValue, Json}
import play.twirl.api.HtmlFormat

import scala.xml.Utility

object UserSocket {
  def props(user: String)(out: ActorRef) = Props(new UserSocket(user, out))

  case class Message(msg: String)

  object Message {
    implicit val messageReads = Json.reads[Message]
  }

  case class ChatMessage(user: String, text: String)

  object ChatMessage {
    implicit val chatMessageWrites = new Writes[ChatMessage] {
      def writes(chatMessage: ChatMessage): JsValue = {
        Json.obj(
          "type" -> "message",
          "user" -> chatMessage.user,
          "text" -> multiLine(chatMessage.text)
        )
      }
    }

    private def multiLine(text: String) = {
      HtmlFormat.raw(text).body.replace("\n", "<br/>")
    }
  }
}

class UserSocket(uid: String, out: ActorRef) extends Actor with ActorLogging {

  val topic = "chat"

  val mediator = DistributedPubSub(context.system).mediator

  mediator ! Subscribe(topic, self)

  def receive = LoggingReceive {
    case js: JsValue =>
      js.validate[Message](messageReads)
        .map(message => Utility.escape(message.msg))
        .foreach { msg => mediator ! Publish(topic, ChatMessage(uid, msg))}

    case c:ChatMessage => out ! Json.toJson(c)
  }
} 
开发者ID:onegrx,项目名称:playakkachat,代码行数:59,代码来源:UserSocket.scala


示例2: UserSocketSpec

//设置package包名称以及导入依赖的类
package actors

import actors.UserSocket.{ChatMessage, Message}
import akka.actor._
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{SubscribeAck, Subscribe}
import akka.testkit.TestProbe
import org.specs2.mutable._
import play.api.libs.json._

import scala.language.postfixOps

class UserSocketSpec extends Specification {
  val UserId = "user1"

  "A user socket" should {
    val topic = "chat"

    "send chat message to all subscribers" in new AkkaTestkitSpecs2Support {
      implicit val messageWrites = Json.writes[Message]

      val mediator = DistributedPubSub(system).mediator
      val browser = TestProbe()
      val chatMember1 = TestProbe()
      val chatMember2 = TestProbe()
      mediator ! Subscribe(topic, chatMember1.ref)
      mediator ! Subscribe(topic, chatMember2.ref)
      val socket = system.actorOf(UserSocket.props("user1")(browser.ref), "userSocket")
      val message = "hello"

      socket ! Json.toJson(Message(message))

      chatMember1.ignoreMsg({ case SubscribeAck => true })
      chatMember1.expectMsg(ChatMessage(UserId, message))
      chatMember2.ignoreMsg({ case SubscribeAck => true })
      chatMember2.expectMsg(ChatMessage(UserId, message))
    }

    "forward chat message to browser" in new AkkaTestkitSpecs2Support {
      val browser = TestProbe()
      val socket = system.actorOf(UserSocket.props(UserId)(browser.ref), "userSocket")
      val chatMessage = ChatMessage(UserId, "There is important thing to do!")

      socket ! chatMessage

      browser.expectMsg(Json.toJson(chatMessage))
    }
  }
} 
开发者ID:onegrx,项目名称:playakkachat,代码行数:50,代码来源:UserSocketSpec.scala


示例3: topics

//设置package包名称以及导入依赖的类
package org.a4r.actors.pubsub

import akka.actor.Actor
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
import org.a4r.Logged
import org.a4r.messages.MessageTopic


trait DistributedSubscriber extends Actor with Logged {

  def topics:Seq[MessageTopic]

  import akka.cluster.pubsub.DistributedPubSubMediator.Subscribe
  val mediator = DistributedPubSub(context.system).mediator

  info(s"subscribing to topics $topics")
  topics.map(topic => {
    mediator ! Subscribe(topic.name, self)
  })

  def wrappedReceive:Receive

  override def receive: Receive = {
    case ack:SubscribeAck =>
      info(s"Am now subscribed: $ack")
    case msg =>
      wrappedReceive(msg)
  }

} 
开发者ID:brendanobra,项目名称:a4r,代码行数:32,代码来源:DistributedSubscriber.scala


示例4: AkkaToZmqFlow

//设置package包名称以及导入依赖的类
package tmt.demo.connectors

import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Subscribe
import akka.stream.scaladsl.Sink
import com.trueaccord.scalapb.GeneratedMessage
import tmt.app.library.Connector
import tmt.app.utils.ActorRuntime
import tmt.demo.zeromq_drivers.ZmqPublisherFactory

class AkkaToZmqFlow(actorRuntime: ActorRuntime, zmqPublisherFactory: ZmqPublisherFactory) {
  import actorRuntime._

  def connect[Msg <: GeneratedMessage](
    subscriberTopic: String,
    publishingPort: Int
  ) = {
    val (sourceLinkedRef, source) = Connector.coupling[Msg](Sink.asPublisher(fanout = false))
    DistributedPubSub(system).mediator ! Subscribe(subscriberTopic, sourceLinkedRef)
    val zmqPublisher = zmqPublisherFactory.make[Msg](publishingPort)

    zmqPublisher
      .publish(source)
      .onComplete { x =>
        zmqPublisher.shutdown()
      }
  }
} 
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:29,代码来源:AkkaToZmqFlow.scala


示例5: FileProcessor

//设置package包名称以及导入依赖的类
package processing

import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props }
import common.CommonMessages.{ KillYourself, ProcessLine, fileProcessingTopic }
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck }

object FileProcessor {
  def props(): Props = Props(new FileProcessor)
  def createFileProcessor(system: ActorSystem): ActorRef = {
    system.actorOf(FileProcessor.props())
  }
}

class FileProcessor extends Actor with ActorLogging {

  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe(fileProcessingTopic, self)

  override def receive: Receive = {
    case SubscribeAck(Subscribe(topic, None, `self`)) ?
      log.info(s"subscribing to $topic")

    case msg: ProcessLine =>
      log.info(msg.toString)

    case KillYourself =>
      context stop self

  }
} 
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:32,代码来源:FileProcessor.scala


示例6: Notification

//设置package包名称以及导入依赖的类
package com.packt.chapter7

import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}

case class Notification(title: String, body: String)

class NotificationSubscriber extends Actor {
  val mediator = DistributedPubSub(context.system).mediator
  // subscribe to the topic named "notifications"
  mediator ! Subscribe("notification", self)

  val cluster = Cluster(context.system)
  val clusterAddress = cluster.selfUniqueAddress

  def receive = {
    case notification: Notification ?
      println(s"Got notification in node $clusterAddress => $notification")
    case SubscribeAck(Subscribe("notification", None, `self`)) ?
      println("subscribing");
  }
} 
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:25,代码来源:NotificationSubscriber.scala


示例7: UserActor

//设置package包名称以及导入依赖的类
package actor

import akka.actor.{Actor, ActorRef, ActorSelection, Props}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}
import models.{ToPublish, _}
import play.api.libs.json.Json
import models.Formats._
import models.MsgTypes._

class UserActor(out: ActorRef) extends Actor {

  val mediator: ActorRef = DistributedPubSub(context.system).mediator
  mediator ! Subscribe("content", self)

  val publisher: ActorSelection = context.system.actorSelection("/user/publisher")
  val playlist: ActorSelection = context.system.actorSelection("/user/playlist")

  playlist ! GetPlaylist

  def receive = {
    case SubscribeAck(Subscribe("content", None, `self`)) ? println("subscribing")

    case msg: Payload if msg.event == PUBLISH =>
      publisher ! ToPublish(msg)
    case ToPublish(payload) =>
      out ! payload

    case msg: Payload if msg.event == PLAY || msg.event == STOP =>
      playlist ! msg
    case msg: Payload if msg.event == NEXT =>
      playlist ! PlayNext
    case msg: Payload if msg.event == ADD =>
      playlist ! AddToPlaylist(msg.content.get)
    case msg: Payload if msg.event == CLEAR =>
      playlist ! ClearPlaylist
      publisher ! ToPublish(msg)
    case msg: Playlist =>
      out ! Payload(PLAYLIST, Some(Json.toJson(msg).toString()))

    case Payload(event, content) =>
      out ! Payload("response", content)
  }
}

object UserActor {
  def props(out: ActorRef) = Props(new UserActor(out))
} 
开发者ID:oen9,项目名称:bard-api,代码行数:49,代码来源:UserActor.scala


示例8: Notification

//设置package包名称以及导入依赖的类
package com.nossin.ndb.cluster


import akka.actor.Actor
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Subscribe, SubscribeAck}

case class Notification(title: String, body: String)

class NotificationSubscriber extends Actor {
  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe("notification", self)
  val cluster = Cluster(context.system)
  val clusterAddress = cluster.selfUniqueAddress

  def receive = {
    case notification: Notification =>
      println(s"Got notification in node $clusterAddress => $notification")
        case SubscribeAck(Subscribe("notification", None, `self`)) ? println("subscribing");
  }
} 
开发者ID:michelnossin,项目名称:ndb,代码行数:23,代码来源:NotificationSubscriber.scala


示例9: UserSocketSpec

//设置package包名称以及导入依赖的类
package actors

import actors.UserSocket.{ChatMessage, Message}
import akka.actor._
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{SubscribeAck, Subscribe}
import akka.testkit.TestProbe
import org.specs2.mutable._
import play.api.libs.json._

import scala.language.postfixOps

class UserSocketSpec extends Specification {
  val UserId = "user1"

  "A user socket" should {
    val topic = "chat" 

    "send chat message to all subscribers" in new AkkaTestkitSpecs2Support {
      implicit val messageWrites = Json.writes[Message]

      val mediator = DistributedPubSub(system).mediator
      val browser = TestProbe()
      val chatMember1 = TestProbe()
      val chatMember2 = TestProbe()
      mediator ! Subscribe(topic, chatMember1.ref)
      mediator ! Subscribe(topic, chatMember2.ref)
      val socket = system.actorOf(UserSocket.props("user1")(browser.ref), "userSocket")
      val message = "hello"

      socket ! Json.toJson(Message(message))

      chatMember1.ignoreMsg({ case SubscribeAck => true })
      chatMember1.expectMsg(ChatMessage(UserId, message))
      chatMember2.ignoreMsg({ case SubscribeAck => true })
      chatMember2.expectMsg(ChatMessage(UserId, message))
    }

    "forward chat message to browser" in new AkkaTestkitSpecs2Support {
      val browser = TestProbe()
      val socket = system.actorOf(UserSocket.props(UserId)(browser.ref), "userSocket")
      val chatMessage = ChatMessage(UserId, "There is important thing to do!")

      socket ! chatMessage

      browser.expectMsg(Json.toJson(chatMessage))
    }
  }
} 
开发者ID:GoranSchumacher,项目名称:massive-actors,代码行数:50,代码来源:UserSocketSpec.scala



注:本文中的akka.cluster.pubsub.DistributedPubSubMediator.Subscribe类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala PostgresDriver类代码示例发布时间:2022-05-23
下一篇:
Scala Position类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap