本文整理汇总了Scala中akka.http.scaladsl.model.ws.TextMessage类的典型用法代码示例。如果您正苦于以下问题:Scala TextMessage类的具体用法?Scala TextMessage怎么用?Scala TextMessage使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TextMessage类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: WebSocketHandler
//设置package包名称以及导入依赖的类
package websocket
import akka.NotUsed
import akka.actor._
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import core.entities.AuthTokenContext
import websocket.ClientEndpoint._
import websocket.WebSocketHandler._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
class WebSocketHandler(processingRouter: ActorRef, connectedClientsStore: ActorRef, processingTimeout: FiniteDuration)
(implicit system: ActorSystem, mat: ActorMaterializer, apiDispatcher: ExecutionContext) {
def clientEndpoint(ctx: AuthTokenContext): Flow[Message, Message, NotUsed] = {
val clientEndpoint =
system.actorOf(ClientEndpoint.props(ctx, processingRouter, connectedClientsStore, processingTimeout),
ClientEndpoint.name(ctx))
val incomingMessages: Sink[Message, NotUsed] = Flow[Message]
.map {
case TextMessage.Strict(jsContent) => Some(IncomingMessage(jsContent))
case ts: TextMessage.Streamed =>
ts.textStream.runWith(Sink.ignore)
None
case br: BinaryMessage =>
br.dataStream.runWith(Sink.ignore)
None
}
.collect {
case Some(message: IncomingMessage) => message
}
.to(Sink.actorRef[IncomingMessage](clientEndpoint, PoisonPill))
val outgoingMessages: Source[Message, NotUsed] = Source
.actorRef[OutgoingMessage](BUFFER_SIZE, OverflowStrategy.backpressure)
.mapMaterializedValue { socket =>
clientEndpoint ! Attach(socket)
NotUsed
}
.map {
case OutgoingMessage(jsContent) => TextMessage(jsContent)
}
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
}
object WebSocketHandler {
private val BUFFER_SIZE: Int = 10
}
开发者ID:lymr,项目名称:fun-chat,代码行数:57,代码来源:WebSocketHandler.scala
示例2: PayloadParser
//设置package包名称以及导入依赖的类
package akkord.parser
import akka.actor.{Actor, ActorRef, Props}
import akka.http.scaladsl.model.ws.TextMessage
import akkord.DiscordBot._
import io.circe.parser.parse
import io.circe.{HCursor, Json}
class PayloadParser(bot: ActorRef) extends Actor {
import PayloadParser._
val eventParser = context.actorOf(Props(classOf[EventParser]))
override def receive = {
case TextMessage.Strict(text) => parseText(text)
case Payload(0, cursor) => parseEvent(cursor)
case Payload(3, _) => sender ! StatusUpdate
case Payload(7, _) => sender ! Reconnect
case Payload(9, _) => sender ! InvalidSession
case Payload(10, cursor) => parseHello(cursor)
case Payload(11, _) => sender ! HeartBeatAck
case Payload(-1, cursor) => sender ! UnsupportedMessage(cursor.value.toString())
}
private def parseText(text: String): Unit = {
val json = parse(text).getOrElse(Json.Null)
val cursor = json.hcursor
val op = cursor.get[Int]("op").toOption
self forward Payload(op getOrElse -1, cursor)
}
private def parseEvent(cursor: HCursor): Unit = {
cursor
.get[Int]("s")
.toOption
.foreach(s => bot ! NewSeq(s))
cursor
.get[String]("t")
.toOption
.foreach(t => eventParser forward EventParser.Event(t, cursor))
}
private def parseHello(cursor: HCursor): Unit = {
cursor
.downField("d")
.get[Int]("heartbeat_interval")
.toOption
.foreach(interval => sender ! Hello(interval))
}
}
object PayloadParser {
case class Payload(op: Int, cursor: HCursor)
}
开发者ID:ryanmiville,项目名称:akkord,代码行数:55,代码来源:PayloadParser.scala
示例3: printMessage
//设置package包名称以及导入依赖的类
package net.ruippeixotog.scalafbp.ws
import akka.event.slf4j.SLF4JLogging
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }
import akka.stream.scaladsl.Flow
trait WsUtils extends SLF4JLogging {
private[this] def printMessage(msg: Message): String = msg match {
case TextMessage.Strict(text) => text
case _: TextMessage.Streamed => "(streamed text message)"
case _: BinaryMessage => "(binary message)"
}
def logWsMessages(id: String)(handler: Flow[Message, Message, Any]) = {
Flow[Message]
.map { msg => log.debug(s"[$id] IN : ${printMessage(msg)}"); msg }
.via(handler)
.map { msg => log.debug(s"[$id] OUT: ${printMessage(msg)}"); msg }
}
}
开发者ID:ruippeixotog,项目名称:scalafbp,代码行数:22,代码来源:WsUtils.scala
示例4: ShakespeareRoute
//设置package包名称以及导入依赖的类
package routes
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream._
import akka.stream.scaladsl._
import akka.{ NotUsed }
import akka.actor.ActorSystem
import scala.concurrent.duration._
import akka.http.scaladsl.marshalling.{Marshaller, ToResponseMarshaller}
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import spray.json.DefaultJsonProtocol._
import models.Character
import repositories.ShakespeareRepository
import akka.http.scaladsl.model.ws.{ TextMessage, Message }
import spray.json._
class ShakespeareRoute(workingDirectory: String)(implicit
system: ActorSystem,
materializer: ActorMaterializer
) {
val repository = new ShakespeareRepository()
val source: Source[Character, NotUsed] = Source(repository.romeoEtJuliette)
val romeoEtJulietteIterable = repository.romeoEtJuliette.toIterator
def toNewLineFlow[A]: Flow[A, String, NotUsed] = Flow[A].map(_.toString + "\n")
implicit val characterFormat = jsonFormat2(Character)
implicit val toResponseMarshaller: ToResponseMarshaller[Source[String, Any]] =
Marshaller.opaque { items =>
val data = items.map(item => ChunkStreamPart(item))
HttpResponse(entity = HttpEntity.Chunked(MediaTypes.`application/json`, data))
}
def throttler[A](duration: FiniteDuration): Flow[A, A, NotUsed] =
Flow[A].throttle(1, duration, 1, ThrottleMode.shaping)
def routes = path("romeoEtJuliette") {
handleWebSocketMessages(Flow[Message].mapConcat {
case tm: TextMessage if romeoEtJulietteIterable.hasNext =>
TextMessage(romeoEtJulietteIterable.next().toJson.toString) :: Nil
case other =>
println(other)
Nil
})
} ~
path("romeoAndJuliette") { get(complete(romeoAndJulietteSource)) } ~
pathEnd { getFromFile(s"$workingDirectory/theatre.html") }
private def romeoAndJulietteSource: Source[String, NotUsed] = source
.map(c => s"${c.name} - ${c.text}")
.via(toNewLineFlow)
.via(throttler(0.5.second))
}
开发者ID:Giovannini,项目名称:hackday-akka-vue,代码行数:59,代码来源:ShakespereRoute.scala
示例5: MessageClient
//设置package包名称以及导入依赖的类
package eu.svez.backpressuredemo.C_websockets
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Flow, Sink, Source}
import eu.svez.backpressuredemo.Flows._
import eu.svez.backpressuredemo.StreamDemo
object MessageClient extends StreamDemo {
sourceRate.send(5)
val source = Source.repeat("hello")
.via(valve(sourceRate.get()))
.via(meter("sourceWS"))
.map(TextMessage(_))
val host = "0.0.0.0"
val port = 9090
val clientFlow = Http().singleWebSocketRequest(WebSocketRequest(s"ws://$host:$port/messages"),
Flow.fromSinkAndSource(Sink.ignore, source))
readRatesFromStdIn()
}
开发者ID:svezfaz,项目名称:akka-backpressure-scala-central-talk,代码行数:26,代码来源:MessageClient.scala
示例6: route
//设置package包名称以及导入依赖的类
package phu.quang.le.routes
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.stream.scaladsl.{ Source, Flow }
trait WsRouter extends Directives {
def route =
path("register") {
parameter('name) {
name => handleWebSocketMessages(broadcast(name))
}
}
def broadcast(name: String): Flow[Message, Message, Any] = {
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(Source.single(name + "::") ++ tm.textStream) :: Nil
case _ =>
TextMessage(Source.single(name + "::")) :: Nil
}
}
}
开发者ID:p-le,项目名称:manga-batch,代码行数:24,代码来源:WsRouter.scala
示例7: route
//设置package包名称以及导入依赖的类
package com.knoldus.account
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.{Flow, Sink, Source}
trait WebServer extends Directives {
var accounts: Map[String, String] = Map.empty
def route =
path("register") {
parameter('name) { name ?
handleWebSocketMessages(handler(name))
}
}
def handler(name: String): Flow[Message, Message, Any] = {
Flow[Message].collect {
case TextMessage.Strict(txt) => txt
}.via(validateAndRegister(name))
.map {
case msg: String => TextMessage.Strict(msg)
}
}
private def validateAndRegister(name: String) = {
accounts.get(name) match {
case Some(id) => Flow.fromSinkAndSource(Sink.ignore, Source.single(name + "::You are already registered !!!!"))
case None => {
val id = java.util.UUID.randomUUID().toString
accounts += (name -> id)
Flow.fromSinkAndSource(Sink.ignore, Source.single(id))
}
}
}
}
开发者ID:knoldus,项目名称:akka-http-websocket-microservices.g8,代码行数:39,代码来源:WebServer.scala
示例8: WebServer
//设置package包名称以及导入依赖的类
package com.knoldus.server
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.{Http, HttpExt}
import akka.stream.scaladsl.{Flow, Sink, Source}
class WebServer(implicit system: ActorSystem) extends Directives {
var accounts: Map[String, String] = Map.empty
val http: HttpExt = Http()
def route =
path("register") {
parameter('name) { name ?
handleWebSocketMessages(handler("ws://localhost:9001/register?name=" + name, name))
}
} ~ path("create" / Segment) { id =>
accounts.values.toList.contains(id) match {
case false => {
handleWebSocketMessages(Flow[Message].collect {
case TextMessage.Strict(txt) => txt
}.via(Flow.fromSinkAndSource(Sink.ignore, Source.single("You are not registered!!!!!!!!!!!")))
.map {
case msg: String => TextMessage.Strict(msg)
})
}
case true => handleWebSocketMessages(http.webSocketClientFlow(WebSocketRequest("ws://localhost:9002/create")))
}
}
def handler(url: String, name: String): Flow[Message, Message, Any] = {
http.webSocketClientFlow(WebSocketRequest(url)).map {
case TextMessage.Strict(txt) => {
if (txt contains ("You are already registered")) {
TextMessage(txt)
} else {
accounts += (name -> txt)
TextMessage(s"You are registered !!!! Use this ${txt} to create order!!")
}
}
}
}
}
开发者ID:knoldus,项目名称:akka-http-websocket-microservices.g8,代码行数:49,代码来源:WebServer.scala
示例9: route
//设置package包名称以及导入依赖的类
package com.knoldus.order
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.{Flow, Source}
trait WebServer extends Directives {
def route =
path("create") {
handleWebSocketMessages(handler)
}
def handler: Flow[Message, Message, Any] = {
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(Source.single("Your order is created. Order value is ") ++ tm.textStream) :: Nil
}
}
}
开发者ID:knoldus,项目名称:akka-http-websocket-microservices.g8,代码行数:23,代码来源:WebServer.scala
示例10: system
//设置package包名称以及导入依赖的类
package io.vamp.runner
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
trait WebServer {
implicit def system: ActorSystem
implicit def materializer: ActorMaterializer
def messenger: Hub
val config = Config.config("vamp.runner.http")
val index = config.string("ui.index")
val directory = config.string("ui.directory")
def server = Http().bindAndHandle({
withRequestTimeout(Config.duration("vamp.runner.timeout")) {
encodeResponse {
get {
pathEnd {
redirect("/", MovedPermanently)
} ~ pathSingleSlash {
if (index.isEmpty) reject else getFromFile(index)
} ~ pathPrefix("") {
if (directory.isEmpty) reject else getFromDirectory(directory)
}
}
}
} ~ path("channel") {
handleWebSocketMessages {
Flow[Message].collect {
case TextMessage.Strict(message) ? Future.successful(message)
case TextMessage.Streamed(stream) ? stream.limit(100).completionTimeout(5 seconds).runFold("")(_ + _)
}.mapAsync(parallelism = 3)(identity) via messenger.channel map (message ? TextMessage.Strict(message))
}
}
}, config.string("interface"), config.int("port"))
}
开发者ID:magneticio,项目名称:vamp-runner,代码行数:51,代码来源:WebServer.scala
示例11: chatRoom
//设置package包名称以及导入依赖的类
package name.denyago.yasc.http.websocket
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import name.denyago.yasc.chat.ConnectedUser
import name.denyago.yasc.chat.events.{ConnectionEstablished, MessagePosted, MessageReceived}
trait WsFlow {
implicit val webSocketSys = ActorSystem("websocket-system")
implicit val webSocketMat = ActorMaterializer()
def chatRoom: ActorRef
def wsFlow: Flow[Message, Message, NotUsed] = {
val userActor = webSocketSys.actorOf(Props(new ConnectedUser(chatRoom)))
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message].map {
case TextMessage.Strict(text) => MessageReceived(text)
}.to(Sink.actorRef[MessageReceived](userActor, PoisonPill))
val outgoingMessages: Source[Message, NotUsed] =
Source.actorRef[MessagePosted](10, OverflowStrategy.fail)
.mapMaterializedValue { outgoingActor =>
userActor ! ConnectionEstablished(outgoingActor)
NotUsed
}
.map(
(outgoingMessage: MessagePosted) => TextMessage(outgoingMessage.text))
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}
}
开发者ID:denyago,项目名称:yet-another-simple-chat,代码行数:38,代码来源:WsFlow.scala
示例12: SubscriptionService
//设置package包名称以及导入依赖的类
package me.mmcoulombe.aad.services
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import me.mmcoulombe.aad.EventManager
import me.mmcoulombe.aad.descriptor.SubscriptionServiceDescriptor
import me.mmcoulombe.add.json.JsonSupport
import play.api.libs.json.Json
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Failure
class SubscriptionService(eventManager: EventManager)
(implicit ec: ExecutionContext, materialize: ActorMaterializer) extends SubscriptionServiceDescriptor with JsonSupport {
def manageEvent(): Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(msg) => Future.successful(msg)
case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ + _)
}.via(eventManager.flow)
.map(msg => TextMessage.Strict(Json.stringify(Json.toJson(msg))))
def reportErrorsFlow[T]: Flow[T, T, Any] =
Flow[T]
.watchTermination()((_, f) => f.onComplete {
case Failure(err) =>
println(s"WS stream failed : $err")
case _ =>
})
}
开发者ID:mmcoulombe,项目名称:poc-aad,代码行数:32,代码来源:SubscriptionService.scala
示例13: ExampleFeed
//设置package包名称以及导入依赖的类
package ch.becompany.social
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatter
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.Materializer
import ch.becompany.social.github.GithubFeed
import ch.becompany.social.twitter.TwitterFeed
import scala.util.{Failure, Success, Try}
object ExampleFeed {
import WebServer.system
import WebServer.materializer
private implicit val executionContext = system.dispatcher
private val dateFormatter = DateTimeFormatter.
ofPattern("yyyy-MM-dd HH:mm:ss").
withZone(ZoneId.systemDefault())
private def statusToMessage: (StatusUpdate[String]) => Message = {
case (network, date, tryStatus) =>
val msg = tryStatus match {
case Success(status) => status.html.toString
case Failure(e) => s"Error: ${e.getMessage}"
}
TextMessage(s"${dateFormatter.format(date)} - $network - $msg")
}
private val feed = Feed(
"Google on GitHub" -> new GithubFeed("google"),
"Google on Twitter" -> new TwitterFeed("Google"),
"ASF on GitHub" -> new GithubFeed("apache"),
"ASF on Twitter" -> new TwitterFeed("TheASF")
)(5)
def feedSource =
feed.subscribe.map(statusToMessage)
}
开发者ID:becompany,项目名称:akka-social-stream-example,代码行数:44,代码来源:ExampleFeed.scala
示例14: CompilerService
//设置package包名称以及导入依赖的类
package fiddle.router
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.http.scaladsl.model.ws.TextMessage
class CompilerService(out: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
log.debug("CompilerService starting")
out ! TextMessage("Welcome!")
super.preStart()
}
def receive = {
case msg: TextMessage.Strict =>
log.debug(s"Received: ${msg.text}")
}
override def postStop(): Unit = {
}
}
object CompilerService {
def props(out: ActorRef) = Props(new CompilerService(out))
}
开发者ID:ochrons,项目名称:scalafiddle-router,代码行数:27,代码来源:CompilerService.scala
示例15: WebSockets
//设置package包名称以及导入依赖的类
package mm4s.api
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import mm4s.api.WebSocketModels.WebSocketMessage
import spray.json._
import scala.collection.immutable
object WebSockets {
case object SocketClosed
def flow(token: String, host: String, port: Int = 8080)(implicit system: ActorSystem) = {
Http().webSocketClientFlow(WebSocketRequest(s"ws://$host:$port$mmapi/websocket", extraHeaders = immutable.Seq(auth(token))))
}
def source()(implicit system: ActorSystem) = {
Source.queue[TextMessage](10, OverflowStrategy.fail)
}
def toActor(ref: ActorRef)(implicit system: ActorSystem) = {
Sink.actorRef(ref, SocketClosed)
}
def connect(ref: ActorRef, token: String, host: String, port: Int = 8080)(implicit system: ActorSystem, mat: ActorMaterializer) = {
import WebSocketProtocol._
source()
.viaMat(flow(token, host, port))(Keep.both)
.via(Flow[Message].collect { case m: TextMessage.Strict => m.text })
.map(s => s.parseJson.convertTo[WebSocketMessage])
.toMat(toActor(ref))(Keep.both)
.run()
}
}
object WebSocketModels {
case class WebSocketMessage(team_id: String, channel_id: String, user_id: String, action: Action, props: WsmProps)
case class WsmPost(id: String, create_at: Long, user_id: String, channel_id: String, message: String, `type`: String, hashtags: String, filenames: Seq[String])
case class WsmProps(channel_type: Option[String], otherFile: Option[String], post: Option[String]) {
import WebSocketProtocol.wsmPostFormat
def posted: Option[WsmPost] = post.map(_.parseJson.convertTo[WsmPost])
}
}
object WebSocketProtocol extends DefaultJsonProtocol {
import ActionProtocol._
import WebSocketModels._
implicit val wsmPostFormat: RootJsonFormat[WsmPost] = jsonFormat8(WsmPost)
implicit val wsmPropsFormat: RootJsonFormat[WsmProps] = jsonFormat3(WsmProps)
implicit val wsmFormat: RootJsonFormat[WebSocketMessage] = jsonFormat5(WebSocketMessage)
}
开发者ID:jw3,项目名称:mm4s,代码行数:61,代码来源:WebSockets.scala
示例16: ChatService
//设置package包名称以及导入依赖的类
package co.technius.chatty.server.service
import akka.actor.ActorSystem
import akka.stream.{ FlowShape, Materializer }
import akka.stream.scaladsl._
import akka.stream.actor.{ ActorPublisher, ActorSubscriber }
import akka.http.scaladsl.model.ws.{ BinaryMessage, Message, TextMessage }
class ChatService(implicit system: ActorSystem, mat: Materializer) {
val roomActor = system.actorOf(RoomActor.props("default"), "defaultroom")
val roomPub =
Source
.fromPublisher[String](ActorPublisher(roomActor))
.map(msg => TextMessage(Source.single(msg)))
def flow(name: String): Flow[Message, Message, Any] = {
val userActor = system.actorOf(UserActor.props(name, roomActor))
roomActor.tell(InternalProtocol.Join(name), userActor)
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val userOut = b.add(Source.fromPublisher(ActorPublisher(userActor)))
val userIn = b.add(Sink.fromSubscriber(ActorSubscriber(userActor)))
val fromMessage = b.add(msgToStringFlow)
val toMessage = b.add(Flow[String].map(msg => TextMessage(msg)))
fromMessage ~> userIn
userOut ~> toMessage
FlowShape(fromMessage.in, toMessage.out)
})
}
def msgToStringFlow: Flow[Message, String, Any] = Flow[Message].mapConcat {
case TextMessage.Strict(msg) => msg :: Nil
case tm: TextMessage =>
tm.textStream.runWith(Sink.ignore)
Nil
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore)
Nil
}
}
开发者ID:Technius,项目名称:chatty,代码行数:46,代码来源:ChatService.scala
示例17: route
//设置package包名称以及导入依赖的类
package com.knoldus.server
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.{Flow, Source}
trait WebServer extends Directives {
def route =
path("register") {
parameter('name) { name ?
handleWebSocketMessages(broadcast(name))
}
}
def broadcast(name: String): Flow[Message, Message, Any] = {
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(Source.single(name+"::") ++ tm.textStream) :: Nil
}
}
}
开发者ID:knoldus,项目名称:simple-akka-http-websocket-example.g8,代码行数:25,代码来源:WebServer.scala
示例18: Server
//设置package包名称以及导入依赖的类
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import scala.io.StdIn
object Server extends App {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val echoService: Flow[Message, Message, _] = Flow[Message].map {
case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt)
case _ => TextMessage("Message type unsupported")
}
val route = get {
pathEndOrSingleSlash {
complete("Welcome to akka sample")
}
} ~ path("hello") {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
} ~ path("ws-echo") {
get {
handleWebSocketMessages(echoService)
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
开发者ID:kiberStender,项目名称:sprayWebSocket,代码行数:44,代码来源:Server.scala
示例19: ChatRoom
//设置package包名称以及导入依赖的类
package org.ardlema.chat
import akka.NotUsed
import akka.actor.{ ActorSystem, Props }
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Flow, Sink, Source }
class ChatRoom(roomId: Int, actorSystem: ActorSystem) {
private[this] val chatRoomActor = actorSystem.actorOf(Props(classOf[ChatRoomActor], roomId))
def chatInSink(sender: String) = Sink.actorRef[ChatEvent](chatRoomActor, Left(sender))
def websocketFlow(user: String): Flow[Message, Message, _] = {
val in: Sink[Message, NotUsed] =
Flow[Message]
.map { case TextMessage.Strict(txt) ? IncomingMessage(user, txt) }
.to(chatInSink(user))
// The counter-part which is a source that will create a target ActorRef per
// materialization where the chatActor will send its messages to.
// This source will only buffer one element and will fail if the client doesn't read
// messages fast enough.
val out: Source[Message, Unit] =
Source.actorRef[Message](1, OverflowStrategy.fail)
.mapMaterializedValue(chatRoomActor ! UserJoined(user, _))
Flow.fromSinkAndSource(in, out)
}
def sendMessage(message: ChatMessage): Unit = chatRoomActor ! TextMessage(message.text)
}
object ChatRoom {
def apply(roomId: Int)(implicit actorSystem: ActorSystem) = new ChatRoom(roomId, actorSystem)
}
开发者ID:ardlema,项目名称:akka-websockets-examples,代码行数:38,代码来源:ChatRoom.scala
示例20: LoadTest
//设置package包名称以及导入依赖的类
package com.pkinsky
import java.util.concurrent.atomic.AtomicInteger
import akka.http.scaladsl.model.ws.{InvalidUpgradeResponse, WebsocketUpgradeResponse, WebsocketRequest, TextMessage}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri
import akka.stream.ThrottleMode
import akka.stream.scaladsl.{Keep, Sink, RunnableGraph, Source}
import play.api.libs.json.Json
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
import scala.language.postfixOps
object LoadTest extends App with AppContext {
val clients = 256
val eventsPerClient = 256
val eventsSent = new AtomicInteger(0)
def testData(clientId: String): Source[Event, Unit] =
Source.unfoldInf(1) { n =>
val event = Event(s"msg number $n", clientId, System.currentTimeMillis())
(n + 1, event)
}.take(eventsPerClient).throttle(1, 100 millis, 1, ThrottleMode.Shaping)
def wsClient(clientId: String): RunnableGraph[Future[WebsocketUpgradeResponse]] =
testData(clientId).map(e => TextMessage.Strict(Json.toJson(e).toString))
.map { x => eventsSent.incrementAndGet(); x }
.viaMat(Http().websocketClientFlow(WebsocketRequest(Uri(s"ws://localhost:$port/ws"))))(Keep.right).to(Sink.ignore)
//set up websocket connections
(1 to clients).foreach { id =>
wsClient(s"client $id").run()
}
//watch kafka for messages sent via websocket
val kafkaConsumerGraph: RunnableGraph[Future[Seq[Event]]] =
kafka.consume[Event](eventTopic, "group_new")
.take(clients * eventsPerClient).takeWithin(2 minutes)
.toMat(Sink.seq)(Keep.right)
val res = Await.result(kafkaConsumerGraph.run, 5 minutes)
println(s"sent ${eventsSent.get()} events total")
println(s"res size: ${res.length}")
}
开发者ID:pkinsky,项目名称:ws_to_kafka,代码行数:48,代码来源:LoadTest.scala
注:本文中的akka.http.scaladsl.model.ws.TextMessage类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论