• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Publish类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.cluster.pubsub.DistributedPubSubMediator.Publish的典型用法代码示例。如果您正苦于以下问题:Scala Publish类的具体用法?Scala Publish怎么用?Scala Publish使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Publish类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: UserSocket

//设置package包名称以及导入依赖的类
package actors

import actors.UserSocket.{ChatMessage, Message}
import actors.UserSocket.Message.messageReads
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
import akka.event.LoggingReceive
import play.api.libs.json.{Writes, JsValue, Json}
import play.twirl.api.HtmlFormat

import scala.xml.Utility

object UserSocket {
  def props(user: String)(out: ActorRef) = Props(new UserSocket(user, out))

  case class Message(msg: String)

  object Message {
    implicit val messageReads = Json.reads[Message]
  }

  case class ChatMessage(user: String, text: String)

  object ChatMessage {
    implicit val chatMessageWrites = new Writes[ChatMessage] {
      def writes(chatMessage: ChatMessage): JsValue = {
        Json.obj(
          "type" -> "message",
          "user" -> chatMessage.user,
          "text" -> multiLine(chatMessage.text)
        )
      }
    }

    private def multiLine(text: String) = {
      HtmlFormat.raw(text).body.replace("\n", "<br/>")
    }
  }
}

class UserSocket(uid: String, out: ActorRef) extends Actor with ActorLogging {

  val topic = "chat"

  val mediator = DistributedPubSub(context.system).mediator

  mediator ! Subscribe(topic, self)

  def receive = LoggingReceive {
    case js: JsValue =>
      js.validate[Message](messageReads)
        .map(message => Utility.escape(message.msg))
        .foreach { msg => mediator ! Publish(topic, ChatMessage(uid, msg))}

    case c:ChatMessage => out ! Json.toJson(c)
  }
} 
开发者ID:onegrx,项目名称:playakkachat,代码行数:59,代码来源:UserSocket.scala


示例2: MainController

//设置package包名称以及导入依赖的类
package bar.controllers

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.cluster.pubsub.DistributedPubSub
import com.google.inject.Inject
import common.BarEvents
import common.BarEvents.EntityInserted
import play.api.libs.json.Json
import play.api.mvc._

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import akka.pattern._
import akka.util.Timeout

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class MainController @Inject()(system: ActorSystem)(implicit ec: ExecutionContext)
    extends Controller {

  val entities  = system.actorOf(Props(new EntitiesService))
  val random    = new Random()
  implicit val timeout = Timeout(5.seconds)

  def list() = Action.async {
    val currently = (entities ? "get").mapTo[Seq[Int]]
    currently.map(x => Ok(Json.toJson(x)))
  }

  def insert() = Action {
    val next = random.nextInt
    entities ! next
    Created(Json.toJson(random.nextInt))
  }
}

class EntitiesService extends Actor with ActorLogging {
  import akka.cluster.pubsub.DistributedPubSubMediator.Publish
  val mediator = DistributedPubSub(context.system).mediator
  val topic = BarEvents.KEY

  val existingEntities = ArrayBuffer[Int]()

  def receive = {
    case "get" => sender ! existingEntities
    case next: Int =>
      mediator ! Publish(topic, existingEntities)
      existingEntities.append(next)
      log.warning(s"publishing $existingEntities")
  }
} 
开发者ID:KadekM,项目名称:example-play-microservice-cluster,代码行数:53,代码来源:MainController.scala


示例3: topics

//设置package包名称以及导入依赖的类
package org.a4r.actors.pubsub

import akka.actor.Actor
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import org.a4r.Logged
import org.a4r.messages.MessageTopic


trait DistributedPublisher extends Actor with Logged{
  // activate the extension
  def topics:Seq[MessageTopic]
  val mediator = DistributedPubSub(context.system).mediator

  def publish( msg:Any ) = {
    topics.map(topic => {
      info(s"publishing $msg to ${topic.name}")
      mediator ! Publish(topic.name, msg)
    })
  }


} 
开发者ID:brendanobra,项目名称:a4r,代码行数:24,代码来源:DistributedPublisher.scala


示例4: PublisherUpdate

//设置package包名称以及导入依赖的类
package com.mooneyserver.akkapubsub

import akka.actor.{Props, ActorLogging, Actor}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.cluster.sharding.ShardRegion

// Domain
final case class PublisherUpdate(publisher: UpdateableEvent)

object PublisherActor {

  lazy val props = Props(classOf[PublisherActor])

  val idExtractor: ShardRegion.ExtractEntityId = {
    case s: PublisherUpdate => (s.publisher.id.toString, s.publisher)
  }

  val shardExtractor: ShardRegion.ExtractShardId = {
    case msg: PublisherUpdate => (msg.publisher.id % 100).toString
  }
}

class PublisherActor extends Actor with ActorLogging {

  log.info(s"Publisher ${self.path.name} actor instance created")

  val mediator = DistributedPubSub(context.system).mediator

  override def receive: Receive = {
    case event: UpdateableEvent => {
      log.info(s"$event to be routed to all listeners")
      mediator ! Publish(s"Publisher-${self.path.name}", event)
    }
  }
} 
开发者ID:irishshagua,项目名称:akka-distributed-pub-sub-testing,代码行数:37,代码来源:PublisherActor.scala


示例5: ZmqToAkkaFlow

//设置package包名称以及导入依赖的类
package tmt.demo.connectors

import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import com.trueaccord.scalapb.GeneratedMessageCompanion
import tmt.app.utils.{ActorRuntime, PbMessage}
import tmt.demo.zeromq_drivers.ZmqSubscriberFactory

class ZmqToAkkaFlow(actorRuntime: ActorRuntime, zmqSubscriberFactory: ZmqSubscriberFactory) {
  import actorRuntime._

  def connect[Msg <: PbMessage.Of[Msg]](
    publishingTopic: String,
    subscriberPort: Int,
    responseParser: GeneratedMessageCompanion[Msg]
  ) = {
    val zmqSubscriber = zmqSubscriberFactory.make(subscriberPort, responseParser)
    zmqSubscriber.stream
      .runForeach { message =>
        DistributedPubSub(system).mediator ! Publish(publishingTopic, message)
      }.onComplete { x =>
        zmqSubscriber.shutdown()
      }
  }
} 
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:26,代码来源:ZmqToAkkaFlow.scala


示例6: EventPublisher

//设置package包名称以及导入依赖的类
package tmt.demo.hcd_drivers

import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.stream.scaladsl.Source
import com.trueaccord.scalapb.GeneratedMessage
import tmt.app.utils.ActorRuntime

class EventPublisher(actorRuntime: ActorRuntime) {

  import actorRuntime._

  def publish[Msg <: GeneratedMessage](
    messages: Source[Msg, Any],
    topic: String
  ) = {
    messages.runForeach { message =>
      println(s"********* PublisherClient is publishing: $message")
      DistributedPubSub(system).mediator ! Publish(topic, message)
    }
  }
} 
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:23,代码来源:EventPublisher.scala


示例7: FileReceiver

//设置package包名称以及导入依赖的类
package api.actors

import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props }
import common.CommonMessages.{ ProcessLine, Received, fileProcessingTopic }
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish

object FileReceiver {
  def props(): Props = Props(new FileReceiver)
  def createFileReceiver(system: ActorSystem): ActorRef = {
    system.actorOf(FileReceiver.props())
  }
}

class FileReceiver extends Actor with ActorLogging {

  val mediator = DistributedPubSub(context.system).mediator

  override def receive: Receive = {
    case msg: ProcessLine =>
      mediator ! Publish(fileProcessingTopic, msg)
      sender() ! Received
  }
} 
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:25,代码来源:FileReceiver.scala


示例8: MessageSender

//设置package包名称以及导入依赖的类
package com.github.mkorman9.districron.logic

import akka.actor.{ActorRef, ActorSystem}
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Send}
import com.github.mkorman9.districron.Messages.ExecuteTask
import com.github.mkorman9.districron.model.Task
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service

@Service
class MessageSender @Autowired() (actorSystem: ActorSystem) {
  def sendMessageToTasksHandlingActor(message: Any): Unit = {
    val tasksHandlingActor = actorSystem.actorSelection(com.github.mkorman9.districron.Actor.Path)
    tasksHandlingActor ! message
  }

  def broadcastExecutionMessage(clusterMediator: ActorRef, sender: ActorRef, executionId: Long, task: Task): Unit = {
    val executeTaskMessage = ExecuteTask(executionId, task.name)

    if (task.globalDistribution)
      clusterMediator.tell(
        Publish(task.recipient, executeTaskMessage),
        sender
      )
    else
      clusterMediator.tell(
        Send(task.recipient, executeTaskMessage, localAffinity = false),
        sender
      )
  }
} 
开发者ID:mkorman9,项目名称:districron,代码行数:32,代码来源:MessageSender.scala


示例9: Publisher

//设置package包名称以及导入依赖的类
package org.dmonix.area51.akka.cluster.extensions

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import org.dmonix.area51.akka.cluster.Messages.{Broadcast, Message, Response, Unicast}
import org.dmonix.area51.akka.cluster.{ClusterSettings, SameThreadExecutionContext}


class Publisher(mediator:ActorRef) extends Actor with ActorLogging{

  def receive = {
    //sends the provided message to ONE instance of all the provided service names
    case Unicast(serviceNames, message) =>
      serviceNames.foreach(sName => mediator ! Publish(sName,message,true))
    //sends the provided message to ALL instances of all the provided service names
    case Broadcast(serviceNames, message) =>
      serviceNames.foreach(sName => mediator ! Publish(sName,message,false))
    case Response(rsp) =>
      log.info(s"Publisher [$self] got response [$rsp] from [$sender]")
    case a:Any ?
      log.warning(s"Publisher [$self] got unexpected message [$a] from [$sender]")
  }

}

object PublisherStarter extends App with SameThreadExecutionContext with ClusterSettings {
  System.setProperty("config.file", "src/main/resources/akka-cfg/cluster-ext-member-tcp.conf");
  val actorSystem = ActorSystem(actorSystemName)
  val mediator = DistributedPubSub(actorSystem).mediator
  val cluster = Cluster(actorSystem)
  cluster.joinSeedNodes(seedNodes)

  val actor = actorSystem.actorOf(Props(new Publisher(mediator)), "Publisher")

  //appears that this sleep is needed for the communication to be up
  //sending something too early just causes the message to disappear into void without any trace nor log
  Thread.sleep(2000)

  actor ! Unicast(Seq("ServiceA", "ServiceB"), Message("Unicast Hello-1!!!"))
  actor ! Unicast(Seq("ServiceA", "ServiceB"), Message("Unicast Hello-2!!!"))
  actor ! Broadcast(Seq("ServiceA"), Message("Broadcast Hello Again!!!"))

  //stupid pause to allow for responses to propagate and be logged
  Thread.sleep(2000)
  actorSystem.terminate().onComplete(_ => System.exit(1))
} 
开发者ID:pnerg,项目名称:area51-akka,代码行数:49,代码来源:Publisher.scala



注:本文中的akka.cluster.pubsub.DistributedPubSubMediator.Publish类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala CustomSecuredErrorHandler类代码示例发布时间:2022-05-23
下一篇:
Scala TreeSet类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap