本文整理汇总了Scala中akka.pattern.pipe类的典型用法代码示例。如果您正苦于以下问题:Scala pipe类的具体用法?Scala pipe怎么用?Scala pipe使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了pipe类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ResolveArtifactsActor
//设置package包名称以及导入依赖的类
package mesosphere.marathon.upgrade
import java.net.URL
import akka.actor.Status.Failure
import akka.actor.{ Actor, Props }
import akka.pattern.pipe
import mesosphere.marathon.ResolveArtifactsCanceledException
import mesosphere.marathon.io.storage.StorageProvider
import mesosphere.marathon.io.{ CancelableDownload, PathFun }
import mesosphere.util.Logging
import scala.concurrent.Promise
private[this] class ResolveArtifactsActor(
url2Path: Map[URL, String],
promise: Promise[Boolean],
storage: StorageProvider)
extends Actor
with PathFun
with Logging {
import mesosphere.marathon.upgrade.ResolveArtifactsActor.DownloadFinished
// all downloads that have to be performed by this actor
var downloads = url2Path.map { case (url, path) => new CancelableDownload(url, storage, path) }
override def preStart(): Unit = {
import context.dispatcher
downloads.map(_.get.map(DownloadFinished) pipeTo self)
if (url2Path.isEmpty) promise.success(true) // handle empty list
}
override def postStop(): Unit = {
downloads.foreach(_.cancel()) // clean up not finished artifacts
}
override def receive: Receive = {
case DownloadFinished(download) =>
downloads = downloads.filter(_ != download)
if (downloads.isEmpty) promise.success(true)
case Failure(ex) =>
log.warn("Can not resolve artifact", ex) // do not fail the promise!
case DeploymentActor.Shutdown =>
if (!promise.isCompleted)
promise.tryFailure(new ResolveArtifactsCanceledException("Artifact Resolving has been cancelled"))
context.stop(self)
}
}
object ResolveArtifactsActor {
def props(
url2Path: Map[URL, String],
promise: Promise[Boolean],
storage: StorageProvider): Props = Props(new ResolveArtifactsActor(url2Path, promise, storage))
case class DownloadFinished(download: CancelableDownload)
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:59,代码来源:ResolveArtifactsActor.scala
示例2: ForwardActorSpec
//设置package包名称以及导入依赖的类
package akka.actor
import language.postfixOps
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Actor._
import scala.concurrent.Await
import akka.pattern.{ ask, pipe }
object ForwardActorSpec {
val ExpectedMessage = "FOO"
def createForwardingChain(system: ActorSystem): ActorRef = {
val replier = system.actorOf(Props(new Actor {
def receive = { case x ? sender() ! x }
}))
def mkforwarder(forwardTo: ActorRef) = system.actorOf(Props(
new Actor {
def receive = { case x ? forwardTo forward x }
}))
mkforwarder(mkforwarder(mkforwarder(replier)))
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ForwardActorSpec extends AkkaSpec {
import ForwardActorSpec._
implicit val ec = system.dispatcher
"A Forward Actor" must {
"forward actor reference when invoking forward on tell" in {
val replyTo = system.actorOf(Props(new Actor { def receive = { case ExpectedMessage ? testActor ! ExpectedMessage } }))
val chain = createForwardingChain(system)
chain.tell(ExpectedMessage, replyTo)
expectMsg(5 seconds, ExpectedMessage)
}
"forward actor reference when invoking forward on ask" in {
val chain = createForwardingChain(system)
chain.ask(ExpectedMessage)(5 seconds) pipeTo testActor
expectMsg(5 seconds, ExpectedMessage)
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:ForwardActorSpec.scala
示例3: FactorialBackend
//设置package包名称以及导入依赖的类
package sample.cluster.factorial
import scala.annotation.tailrec
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.pipe
//#backend
class FactorialBackend extends Actor with ActorLogging {
import context.dispatcher
def receive = {
case (n: Int) =>
Future(factorial(n)) map { result => (n, result) } pipeTo sender()
}
def factorial(n: Int): BigInt = {
@tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
if (n <= 1) acc
else factorialAcc(acc * n, n - 1)
}
factorialAcc(BigInt(1), n)
}
}
//#backend
object FactorialBackend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port when specified as program argument
val port = if (args.isEmpty) "0" else args(0)
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("factorial"))
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[FactorialBackend], name = "factorialBackend")
system.actorOf(Props[MetricsListener], name = "metricsListener")
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:47,代码来源:FactorialBackend.scala
示例4: HttpClientAsActor
//设置package包名称以及导入依赖的类
package com.scalaio.http.client.actor
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.{ByteString, Timeout}
import play.api.libs.json.Json
import scala.concurrent.Future
import scala.concurrent.duration._
class HttpClientAsActor(notifier: ActorRef) extends Actor with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
implicit val timeout = Timeout(5 seconds)
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
override def preStart() = {
http
.singleRequest(HttpRequest(method = GET, uri = "https://jsonplaceholder.typicode.com/posts/1"))
.pipeTo(self)
}
def receive = {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
val response: Future[ByteString] = entity.dataBytes.runFold(ByteString(""))(_ ++ _)
log.info(s"got response $headers $entity")
response pipeTo self
context become handlingMessage
case [email protected](code, _, _, _) =>
log.warning("Request failed, response code: " + code)
resp.discardEntityBytes()
}
def handlingMessage: Receive = {
case content: ByteString =>
log.info("Success was OK: " + content)
val contentAsString = (Json.parse(content.utf8String) \ "title").as[String]
notifier ! contentAsString
context become receive
}
}
object HttpClientAsActor {
def props(notifier: ActorRef) = Props(classOf[HttpClientAsActor], notifier)
}
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:HttpClientAsActor.scala
示例5: FactorialBackend
//设置package包名称以及导入依赖的类
package sample.cluster.factorial
import scala.annotation.tailrec
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.pipe
class FactorialBackend extends Actor with ActorLogging {
import context.dispatcher
def receive = {
case (n: Int) =>
Future(factorial(n)) map { result => (n, result) } pipeTo sender()
}
def factorial(n: Int): BigInt = {
@tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
if (n <= 1) acc
else factorialAcc(acc * n, n - 1)
}
factorialAcc(BigInt(1), n)
}
}
object FactorialBackend {
def main(args: Array[String]): Unit = {
// Override the configuration of the port when specified as program argument
val port = if (args.isEmpty) "0" else args(0)
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
withFallback(ConfigFactory.load("factorial"))
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[FactorialBackend], name = "factorialBackend")
system.actorOf(Props[MetricsListener], name = "metricsListener")
}
}
开发者ID:seglo,项目名称:akka-cluster-conductr,代码行数:45,代码来源:FactorialBackend.scala
示例6: NewsletterService
//设置package包名称以及导入依赖的类
package com.tpalanga.account.service
import akka.actor.{Actor, ActorLogging, Props, Status}
import akka.http.scaladsl.model.StatusCodes
import com.tpalanga.account.model.{User, UserId}
import com.tpalanga.testlib.test.client.impl.NewsletterServiceRestClient.NewsletterServiceRestClientFactory
import com.tpalanga.testlib.test.client.impl.{NewsletterServiceRestClient, Subscriber}
import com.tpalanga.testlib.test.client.{NoEntity, Response}
import com.tpalanga.testlib.test.config.RestServiceConfig
object NewsletterService {
case class Subscribe(user: User)
case class Unsubscribe(id: UserId)
case class CreateResponse(response: Response[Subscriber])
case class DeleteResponse(response: Response[NoEntity])
def props(restServiceConfig: RestServiceConfig, clientFactory: NewsletterServiceRestClientFactory = NewsletterServiceRestClient.defaultFactory): Props =
Props(new NewsletterService(restServiceConfig, clientFactory))
}
class NewsletterService(restServiceConfig: RestServiceConfig, clientFactory: NewsletterServiceRestClientFactory) extends Actor with ActorLogging {
import NewsletterService._
import akka.pattern.pipe
import context.dispatcher
override def receive: Receive = {
case Subscribe(user) =>
newClient().subscriberCreate(Subscriber(user.id, user.name, user.email)).map(CreateResponse) pipeTo self
case Unsubscribe(userId) =>
newClient().subscriberDelete(userId).map(DeleteResponse) pipeTo self
case CreateResponse(response) if response.status == StatusCodes.Created =>
log.info("Subscribed to newsletter")
case CreateResponse(response) =>
log.info(s"Unexpected response while subscribing to newsletter $response")
case DeleteResponse(response) if response.status == StatusCodes.OK =>
log.info("Unsubscribed from newsletter")
case DeleteResponse(response) =>
log.info("Unsubscribed from newsletter")
case Status.Failure(th) =>
log.error(th, "Error on newsletter request")
}
private def newClient() = clientFactory(restServiceConfig, context.system)
}
开发者ID:tpalanga,项目名称:akka-http-microservice,代码行数:52,代码来源:NewsletterService.scala
示例7: respondWith
//设置package包名称以及导入依赖的类
package com.pacbio.common
import akka.actor.{Status, Actor}
import akka.pattern.pipe
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal
package object actors {
trait PacBioActor extends Actor {
def respondWith(x: => Any): Unit = {
sender ! Try(x).recover{ case NonFatal(e) => Status.Failure(e) }.get
}
def pipeWith(x: => Future[Any])(implicit ec: ExecutionContext): Unit = {
pipe(x.recover{ case NonFatal(e) => Status.Failure(e) }) to sender
}
}
}
开发者ID:PacificBiosciences,项目名称:smrtflow,代码行数:21,代码来源:package.scala
示例8: BotsCliService
//设置package包名称以及导入依赖的类
package im.actor.server.cli
import akka.actor.{ Props, Actor, ActorLogging }
import akka.cluster.client.ClusterClientReceptionist
import akka.pattern.pipe
import im.actor.server.bot.BotExtension
object BotsCliService {
def props = Props(new BotsCliService)
}
private final class BotsCliService extends Actor with ActorLogging {
import context.dispatcher
ClusterClientReceptionist(context.system).registerService(self)
private val botExt = BotExtension(context.system)
def receive = {
case CreateBot(username, name, isAdmin) ?
(for {
(token, _) ? botExt.create(username, name, isAdmin)
} yield CreateBotResponse(token)) pipeTo sender()
}
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:26,代码来源:BotsCliService.scala
示例9: HttpCliService
//设置package包名称以及导入依赖的类
package im.actor.server.cli
import akka.actor.{ Props, Actor }
import akka.cluster.client.ClusterClientReceptionist
import akka.pattern.pipe
import im.actor.server.acl.ACLUtils
import im.actor.server.db.DbExtension
import im.actor.server.persist.HttpApiTokenRepo
object HttpCliService {
def props = Props(new HttpCliService)
}
final class HttpCliService extends Actor {
import context.dispatcher
ClusterClientReceptionist(context.system).registerService(self)
private val db = DbExtension(context.system).db
def receive = {
case HttpTokenCreate(isAdmin) ?
val token = ACLUtils.accessToken()
(for {
_ ? db.run(HttpApiTokenRepo.create(token, isAdmin = isAdmin))
} yield HttpTokenCreateResponse(token)) pipeTo sender()
}
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:29,代码来源:HttpCliService.scala
示例10: UsersCliService
//设置package包名称以及导入依赖的类
package im.actor.server.cli
import akka.actor.{ Props, Actor, ActorLogging }
import akka.cluster.client.ClusterClientReceptionist
import akka.pattern.pipe
import im.actor.server.user.UserExtension
object UsersCliService {
def props = Props(new UsersCliService)
}
private final class UsersCliService extends Actor with ActorLogging {
import context.dispatcher
ClusterClientReceptionist(context.system).registerService(self)
private val userExt = UserExtension(context.system)
def receive = {
case UpdateIsAdmin(userId, isAdmin) ?
(for {
_ ? userExt.updateIsAdmin(userId, isAdmin)
} yield UpdateIsAdminResponse()) pipeTo sender()
}
}
开发者ID:wex5,项目名称:dangchat-server,代码行数:26,代码来源:UsersCliService.scala
示例11: QueueSubscriber
//设置package包名称以及导入依赖的类
package reactive.queue.router
import akka.actor.{ActorLogging, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError, OnNext}
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.util.ByteString
import io.scalac.amqp.{Connection, Message}
object QueueSubscriber {
def props(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri): Props =
Props(classOf[QueueSubscriber],queueConnection, queueName, queueSubscriberUri)
}
class QueueSubscriber(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri) extends
ActorSubscriber with ActorLogging {
implicit val system = context.system
implicit val ec = context.dispatcher
implicit val materlizer = ActorMaterializer()
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
def receive = {
case OnNext(message: Message) => route(ByteString(message.body.toArray).decodeString("UTF-8"))
case OnComplete => log.info("*** on complete")
case OnError(error) => log.error(s"*** on error: $error")
case HttpResponse(status, _, _, _) => log.info(s"*** route response: ${status.intValue}")
}
def route(message: String): Unit = {
log.info(s"*** on next: $message")
try {
val httpResponse = for {
request <- Marshal(message).to[RequestEntity]
response <- Http().singleRequest(HttpRequest(method = HttpMethods.POST, uri = queueSubscriberUri, entity = request))
entity <- Unmarshal(response).to[HttpResponse]
} yield entity
httpResponse.pipeTo(self)
} catch {
case t: Throwable =>
log.error(s"*** on next: forward to uri $queueSubscriberUri failed on: $message with error: ${t.getMessage}")
queueConnection.publish(queueName, message)
log.info(s"*** on next: republished to queue $queueName")
}
}
}
开发者ID:objektwerks,项目名称:reactive.queue.router,代码行数:52,代码来源:QueueSubscriber.scala
示例12: ProviderActor
//设置package包名称以及导入依赖的类
package uk.mm.mpp.actors
import akka.actor.{Actor, Props}
import akka.pattern.pipe
import org.apache.commons.lang3.StringUtils._
import org.json4s._
import org.json4s.native.JsonMethods._
import play.api.Logger
import play.api.Play.current
import play.api.libs.ws.{WS, WSRequest, WSResponse}
import uk.mm.mpp.actors.ProviderActor.{ProductRequest, ProductResponse}
import uk.mm.mpp.globals._
import scala.concurrent.ExecutionContext.Implicits.global
object ProviderActor {
def props(uid: String, port: Int) = Props(classOf[ProviderActor], uid, port)
case class ProductRequest()
case class ProductResponse(products: JArray)
}
class ProviderActor(uid: String, port: Int) extends Actor {
private lazy val request: WSRequest = WS.client.url(providerUrl)
.withFollowRedirects(false)
.withRequestTimeout(15000)
val logger = Logger(MPP_WORKER_PREFIX + getClass.getSimpleName + "_" + uid + "_" + port)
val providerUrl: String = "http://localhost:" + port + "/3rd/products"
def receive = {
case ProductRequest =>
request.get()
.map(productUpdateFrom)
.recover(withEmptyJsonArray)
.pipeTo(sender)
}
val withEmptyJsonArray: PartialFunction[Throwable, ProductResponse] = {
case _ => ProductResponse(JArray(List()))
}
def productUpdateFrom(response: WSResponse): ProductResponse = if (response.status == 200) {
logger.debug(s"from: [$providerUrl]: [${piedPiper(response)}]")
ProductResponse(parseJsonFrom(response))
} else {
logger.warn(s"from: [$providerUrl]: [${response.body}]")
ProductResponse(JArray(List()))
}
def piedPiper(response: WSResponse) = {
abbreviate(replacePattern(response.body, """\s{2,}""", " "), 30)
}
def parseJsonFrom(response: WSResponse) = parse(response.body).asInstanceOf[JArray]
}
开发者ID:mikemey,项目名称:mpp,代码行数:60,代码来源:ProviderActor.scala
示例13: Authenticator
//设置package包名称以及导入依赖的类
package authentication
import akka.actor.{Actor, ActorLogging, Props}
import akka.pattern.pipe
import authentication.entities.{AuthToken, _}
import rest.client.RestClient
import rest.client.entities.ExecutionResultCode
import scala.concurrent.ExecutionContext
class Authenticator(restClient: RestClient) extends Actor with ActorLogging {
implicit val ec: ExecutionContext = context.dispatcher
override def receive: Receive = {
case SignIn(user, password) =>
restClient.signIn(user, password)
.map {
case (token: AuthToken) => AuthTokenStore.updateToken(token); Authenticated
}
.recover {
case ex: Throwable => AuthFailure(ex)
} pipeTo sender()
case SignUp(user, password) =>
restClient.signUp(user, password)
.map {
case (token: AuthToken) => AuthTokenStore.updateToken(token); Authenticated
}
.recover {
case ex: Throwable => AuthFailure(ex)
} pipeTo sender()
case SignOut =>
restClient.signOut()
.map {
case ExecutionResultCode.OK => AuthTokenStore.clear(); Disconnected
case _ => AuthFailure(new RuntimeException("Operation failed with unknown error"))
}
.recover {
case ex: Throwable => AuthFailure(ex)
} pipeTo sender()
}
}
object Authenticator {
def props(restClient: RestClient): Props = Props(new Authenticator(restClient))
}
开发者ID:lymr,项目名称:fun-chat,代码行数:49,代码来源:Authenticator.scala
示例14: HmdaFilingApi
//设置package包名称以及导入依赖的类
package hmda.api
import akka.actor.{ ActorSystem, Props }
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.util.Timeout
import hmda.api.http.{ BaseHttpApi, HmdaCustomDirectives, InstitutionsHttpApi, LarHttpApi }
import hmda.api.HmdaConfig._
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
object HmdaFilingApi {
def props(): Props = Props(new HmdaFilingApi)
}
class HmdaFilingApi
extends HttpApi
with BaseHttpApi
with LarHttpApi
with InstitutionsHttpApi
with HmdaCustomDirectives {
implicit val flowParallelism = configuration.getInt("hmda.actor-flow-parallelism")
override val name = "hmda-filing-api"
lazy val httpTimeout = configuration.getInt("hmda.http.timeout")
implicit val timeout = Timeout(httpTimeout.seconds)
override lazy val host = configuration.getString("hmda.http.host")
override lazy val port = configuration.getInt("hmda.http.port")
implicit val system: ActorSystem = context.system
override implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = context.dispatcher
override val log = Logging(system, getClass)
val paths: Route = routes(s"$name") ~ larRoutes ~ institutionsRoutes
override val http: Future[ServerBinding] = Http(system).bindAndHandle(
paths,
host,
port
)
http pipeTo self
}
开发者ID:cfpb,项目名称:hmda-platform,代码行数:54,代码来源:HmdaFilingApi.scala
示例15: HmdaAdminApi
//设置package包名称以及导入依赖的类
package hmda.api
import akka.actor.{ ActorSystem, Props }
import akka.event.Logging
import akka.pattern.pipe
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import hmda.api.http.BaseHttpApi
import hmda.api.http.admin.InstitutionAdminHttpApi
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
object HmdaAdminApi {
def props(): Props = Props(new HmdaAdminApi)
}
class HmdaAdminApi extends HttpApi with BaseHttpApi with InstitutionAdminHttpApi {
val config = ConfigFactory.load()
lazy val httpTimeout = config.getInt("hmda.http.timeout")
override implicit val timeout = Timeout(httpTimeout.seconds)
override val name = "hmda-admin-api"
override val host: String = config.getString("hmda.http.adminHost")
override val port: Int = config.getInt("hmda.http.adminPort")
override implicit val system: ActorSystem = context.system
override implicit val materializer: ActorMaterializer = ActorMaterializer()
override implicit val ec: ExecutionContext = context.dispatcher
override val log = Logging(system, getClass)
override val paths: Route = routes(s"$name") ~ institutionAdminRoutes
override val http: Future[ServerBinding] = Http(system).bindAndHandle(
paths,
host,
port
)
http pipeTo self
}
开发者ID:cfpb,项目名称:hmda-platform,代码行数:49,代码来源:HmdaAdminApi.scala
示例16: DeviceService
//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager
import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import reactivehub.akka.stream.apns.manager.DeviceService._
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future
import scala.util.{Failure, Success}
object DeviceService {
case class CreateDevices(devices: List[NewDevice])
sealed trait CreateDevicesResponse
case class DevicesCreated(ids: List[Long]) extends CreateDevicesResponse
case object PingDevices
def props(store: DeviceStore, queue: PushQueue)(implicit m: ActorMaterializer): Props =
Props(classOf[DeviceService], store, queue, m)
}
class DeviceService(store: DeviceStore, queue: PushQueue)(implicit m: ActorMaterializer)
extends Actor with ActorLogging {
import context.dispatcher
override def receive: Receive = {
case CreateDevices(devices) =>
log.debug("Creating new devices for tokens {}", devices.map(_.token).mkString(", "))
store.createDevices(devices).map(DevicesCreated).pipeTo(sender())
case PingDevices =>
log.debug("Pinging all devices")
store.allDevices()
.map(device => device.id -> PushData(device.token, Some("Hello"), Some(1)))
.runWith(queue.pushDataSink)
.onComplete {
case Success(num) => log.debug("{} devices pinged", num)
case Failure(t) => log.error(t, "Failed to ping devices")
}
}
}
trait DeviceStore {
def createDevices(devices: List[NewDevice]): Future[List[Long]]
def allDevices(): Source[Device, NotUsed]
}
trait PushQueue {
def pushDataSink: Sink[(Long, PushData), Future[Long]]
}
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:55,代码来源:DeviceService.scala
示例17: SqsQueue
//设置package包名称以及导入依赖的类
package com.jensraaby.queuevis.aws
import akka.actor.{Props, Actor}
import akka.pattern.pipe
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageResult, ReceiveMessageResult, ReceiveMessageRequest}
import scala.concurrent.ExecutionContext
object SqsQueue {
case class Poll(maxMessages: Int, maxWaitSeconds: Int)
case class Write(body: String)
def apply(queueUrl: String)(implicit executionContext: ExecutionContext,
sqsClient: AmazonSQSAsyncClient) = Props(new SqsQueue(queueUrl)(executionContext, sqsClient))
}
class SqsQueue(queueUrl: String)(implicit ec: ExecutionContext, sqsClient: AmazonSQSAsyncClient) extends Actor with AsyncWrapper {
def receive = {
case SqsQueue.Write(body) =>
println("Writing message: " + body)
val request = new SendMessageRequest()
.withMessageBody(body)
.withQueueUrl(queueUrl)
val (result, handler) = asyncHandler[SendMessageRequest, SendMessageResult]
sqsClient.sendMessageAsync(request, handler)
println("Sent message")
result pipeTo sender()
case SqsQueue.Poll(msgs, wait) =>
val destination = sender()
val request = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withMaxNumberOfMessages(msgs)
.withWaitTimeSeconds(wait)
val (result, handler) = asyncHandler[ReceiveMessageRequest, ReceiveMessageResult]
sqsClient.receiveMessageAsync(request, handler)
import scala.collection.JavaConverters._
for (results <- result) {
destination ! results.getMessages.asScala.toList
}
}
}
开发者ID:jensraaby,项目名称:queuevis,代码行数:48,代码来源:SqsQueue.scala
示例18: Collector
//设置package包名称以及导入依赖的类
package com.darienmt.keepers
import akka.Done
import akka.actor.Status.Failure
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import com.darienmt.keepers.Collector.{ StreamFinished }
object Collector {
sealed trait CollectorMessage
case object StreamFinished extends CollectorMessage
def props(
manager: ActorRef,
collectorGenerator: Generator
): Props = Props(new Collector(manager, collectorGenerator))
}
class Collector(
manager: ActorRef,
collectorGenerator: Generator
) extends Actor with ActorLogging {
import akka.pattern.pipe
import context.dispatcher
override def receive: Receive = {
case Failure(ex) => {
log.error("Stream stopped with an error")
throw ex
}
case Done => {
manager ! StreamFinished
}
case msg => throw new Exception("Unknown message: " + msg.toString)
}
override def preStart(): Unit = collectorGenerator() pipeTo self
override def postRestart(reason: Throwable): Unit = {}
}
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:42,代码来源:Collector.scala
示例19: HcdActor
//设置package包名称以及导入依赖的类
package tmt.demo.hcd
import akka.actor.Actor
import akka.pattern.pipe
import tcsstr2._
import tmt.app.configs.{AppSettings, Names}
import tmt.demo.connectors.{ZmqToAkkaFlow, AkkaToZmqFlow}
import tmt.demo.zeromq_drivers.ZmqClient
class HcdActor(
zmqClient: ZmqClient,
akkaToZmqFlow: AkkaToZmqFlow,
zmqToAkkaFlow: ZmqToAkkaFlow,
appSettings: AppSettings
) extends Actor {
import context.dispatcher
override def preStart() = {
//connection to push cluster tcs_mcs_PositionDemand events to mcs
akkaToZmqFlow.connect[tcs_mcs_PositionDemand](
subscriberTopic = Names.PositionDemands,
publishingPort = appSettings.mcsPositionDemandPort
)
//connection to push mcs mcs_Health events to cluster
zmqToAkkaFlow.connect(
publishingTopic = Names.Health,
subscriberPort = appSettings.mcsHealthPort,
responseParser = mcs_Health
)
}
//send tcs commands to mcs and reply back to sender
def receive = {
case command: Tcs_Command =>
val result = zmqClient.query(command, command_response)
result pipeTo sender()
}
}
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:42,代码来源:HcdActor.scala
示例20: FrontendApi
//设置package包名称以及导入依赖的类
package api
import akka.actor.{ ActorSystem, Props }
import akka.cluster.pubsub.DistributedPubSub
import akka.pattern.pipe
import akka.event.{ Logging, LoggingAdapter }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import api.actors.{ ClusterListener, FileReceiver }
import api.http.{ HttpApi, Service }
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ ExecutionContext, Future }
object FrontendApi {
def props(): Props = Props(new FrontendApi)
}
class FrontendApi extends HttpApi with Service {
override val name: String = "frontend-api"
val config = ConfigFactory.load()
override val log: LoggingAdapter = Logging(context.system, getClass)
override val host: String = config.getString("frontend.host")
override val port: Int = config.getInt("frontend.port")
override implicit val system: ActorSystem = context.system
override implicit val materializer: ActorMaterializer = ActorMaterializer()
override implicit val ec: ExecutionContext = context.dispatcher
//Start up actors
val clusterListener = system.actorOf(ClusterListener.props())
val fileReceiver = system.actorOf(FileReceiver.props())
val mediator = DistributedPubSub(system).mediator
override val paths: Route = routes(s"$name", clusterListener, mediator)
override val http: Future[Http.ServerBinding] = Http(system).bindAndHandle(
paths,
host,
port
)
http pipeTo self
}
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:49,代码来源:FrontendApi.scala
注:本文中的akka.pattern.pipe类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论