• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala pipe类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.pattern.pipe的典型用法代码示例。如果您正苦于以下问题:Scala pipe类的具体用法?Scala pipe怎么用?Scala pipe使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了pipe类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ResolveArtifactsActor

//设置package包名称以及导入依赖的类
package mesosphere.marathon.upgrade

import java.net.URL

import akka.actor.Status.Failure
import akka.actor.{ Actor, Props }
import akka.pattern.pipe
import mesosphere.marathon.ResolveArtifactsCanceledException
import mesosphere.marathon.io.storage.StorageProvider
import mesosphere.marathon.io.{ CancelableDownload, PathFun }
import mesosphere.util.Logging

import scala.concurrent.Promise

private[this] class ResolveArtifactsActor(
  url2Path: Map[URL, String],
  promise: Promise[Boolean],
  storage: StorageProvider)
    extends Actor
    with PathFun
    with Logging {

  import mesosphere.marathon.upgrade.ResolveArtifactsActor.DownloadFinished

  // all downloads that have to be performed by this actor
  var downloads = url2Path.map { case (url, path) => new CancelableDownload(url, storage, path) }

  override def preStart(): Unit = {
    import context.dispatcher
    downloads.map(_.get.map(DownloadFinished) pipeTo self)
    if (url2Path.isEmpty) promise.success(true) // handle empty list
  }

  override def postStop(): Unit = {
    downloads.foreach(_.cancel()) // clean up not finished artifacts
  }

  override def receive: Receive = {
    case DownloadFinished(download) =>
      downloads = downloads.filter(_ != download)
      if (downloads.isEmpty) promise.success(true)
    case Failure(ex) =>
      log.warn("Can not resolve artifact", ex) // do not fail the promise!
    case DeploymentActor.Shutdown =>
      if (!promise.isCompleted)
        promise.tryFailure(new ResolveArtifactsCanceledException("Artifact Resolving has been cancelled"))
      context.stop(self)
  }
}

object ResolveArtifactsActor {
  def props(
    url2Path: Map[URL, String],
    promise: Promise[Boolean],
    storage: StorageProvider): Props = Props(new ResolveArtifactsActor(url2Path, promise, storage))

  case class DownloadFinished(download: CancelableDownload)
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:59,代码来源:ResolveArtifactsActor.scala


示例2: ForwardActorSpec

//设置package包名称以及导入依赖的类
package akka.actor

import language.postfixOps

import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Actor._
import scala.concurrent.Await
import akka.pattern.{ ask, pipe }

object ForwardActorSpec {
  val ExpectedMessage = "FOO"

  def createForwardingChain(system: ActorSystem): ActorRef = {
    val replier = system.actorOf(Props(new Actor {
      def receive = { case x ? sender() ! x }
    }))

    def mkforwarder(forwardTo: ActorRef) = system.actorOf(Props(
      new Actor {
        def receive = { case x ? forwardTo forward x }
      }))

    mkforwarder(mkforwarder(mkforwarder(replier)))
  }
}

@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ForwardActorSpec extends AkkaSpec {
  import ForwardActorSpec._
  implicit val ec = system.dispatcher
  "A Forward Actor" must {

    "forward actor reference when invoking forward on tell" in {
      val replyTo = system.actorOf(Props(new Actor { def receive = { case ExpectedMessage ? testActor ! ExpectedMessage } }))

      val chain = createForwardingChain(system)

      chain.tell(ExpectedMessage, replyTo)
      expectMsg(5 seconds, ExpectedMessage)
    }

    "forward actor reference when invoking forward on ask" in {
      val chain = createForwardingChain(system)
      chain.ask(ExpectedMessage)(5 seconds) pipeTo testActor
      expectMsg(5 seconds, ExpectedMessage)
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:ForwardActorSpec.scala


示例3: FactorialBackend

//设置package包名称以及导入依赖的类
package sample.cluster.factorial

import scala.annotation.tailrec
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.pipe

//#backend
class FactorialBackend extends Actor with ActorLogging {

  import context.dispatcher

  def receive = {
    case (n: Int) =>
      Future(factorial(n)) map { result => (n, result) } pipeTo sender()
  }

  def factorial(n: Int): BigInt = {
    @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
      if (n <= 1) acc
      else factorialAcc(acc * n, n - 1)
    }
    factorialAcc(BigInt(1), n)
  }

}
//#backend

object FactorialBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load("factorial"))

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[FactorialBackend], name = "factorialBackend")

    system.actorOf(Props[MetricsListener], name = "metricsListener")
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:47,代码来源:FactorialBackend.scala


示例4: HttpClientAsActor

//设置package包名称以及导入依赖的类
package com.scalaio.http.client.actor

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.{ByteString, Timeout}
import play.api.libs.json.Json

import scala.concurrent.Future
import scala.concurrent.duration._

class HttpClientAsActor(notifier: ActorRef) extends Actor with ActorLogging {

  import akka.pattern.pipe
  import context.dispatcher
  implicit val timeout = Timeout(5 seconds)
  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system))

  val http = Http(context.system)

  override def preStart() = {
    http
      .singleRequest(HttpRequest(method = GET, uri = "https://jsonplaceholder.typicode.com/posts/1"))
      .pipeTo(self)
  }

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      val response: Future[ByteString] = entity.dataBytes.runFold(ByteString(""))(_ ++ _)
      log.info(s"got response $headers $entity")
      response pipeTo self
      context become handlingMessage

    case [email protected](code, _, _, _) =>
      log.warning("Request failed, response code: " + code)
      resp.discardEntityBytes()
  }

  def handlingMessage: Receive = {
    case content: ByteString =>
      log.info("Success was OK: " + content)
      val contentAsString = (Json.parse(content.utf8String) \ "title").as[String]
      notifier ! contentAsString
      context become receive
  }

}

object HttpClientAsActor {

  def props(notifier: ActorRef) = Props(classOf[HttpClientAsActor], notifier)

} 
开发者ID:fagossa,项目名称:scalaio_akka,代码行数:56,代码来源:HttpClientAsActor.scala


示例5: FactorialBackend

//设置package包名称以及导入依赖的类
package sample.cluster.factorial

import scala.annotation.tailrec
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.pipe

class FactorialBackend extends Actor with ActorLogging {

  import context.dispatcher

  def receive = {
    case (n: Int) =>
      Future(factorial(n)) map { result => (n, result) } pipeTo sender()
  }

  def factorial(n: Int): BigInt = {
    @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
      if (n <= 1) acc
      else factorialAcc(acc * n, n - 1)
    }
    factorialAcc(BigInt(1), n)
  }

}

object FactorialBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load("factorial"))

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[FactorialBackend], name = "factorialBackend")

    system.actorOf(Props[MetricsListener], name = "metricsListener")
  }
} 
开发者ID:seglo,项目名称:akka-cluster-conductr,代码行数:45,代码来源:FactorialBackend.scala


示例6: NewsletterService

//设置package包名称以及导入依赖的类
package com.tpalanga.account.service

import akka.actor.{Actor, ActorLogging, Props, Status}
import akka.http.scaladsl.model.StatusCodes
import com.tpalanga.account.model.{User, UserId}
import com.tpalanga.testlib.test.client.impl.NewsletterServiceRestClient.NewsletterServiceRestClientFactory
import com.tpalanga.testlib.test.client.impl.{NewsletterServiceRestClient, Subscriber}
import com.tpalanga.testlib.test.client.{NoEntity, Response}
import com.tpalanga.testlib.test.config.RestServiceConfig

object NewsletterService {
  case class Subscribe(user: User)
  case class Unsubscribe(id: UserId)

  case class CreateResponse(response: Response[Subscriber])
  case class DeleteResponse(response: Response[NoEntity])

  def props(restServiceConfig: RestServiceConfig, clientFactory: NewsletterServiceRestClientFactory = NewsletterServiceRestClient.defaultFactory): Props =
    Props(new NewsletterService(restServiceConfig, clientFactory))
}

class NewsletterService(restServiceConfig: RestServiceConfig, clientFactory: NewsletterServiceRestClientFactory) extends Actor with ActorLogging {
  import NewsletterService._
  import akka.pattern.pipe
  import context.dispatcher

  override def receive: Receive = {
    case Subscribe(user) =>
      newClient().subscriberCreate(Subscriber(user.id, user.name, user.email)).map(CreateResponse) pipeTo self

    case Unsubscribe(userId) =>
      newClient().subscriberDelete(userId).map(DeleteResponse) pipeTo self

    case CreateResponse(response) if response.status == StatusCodes.Created =>
      log.info("Subscribed to newsletter")

    case CreateResponse(response) =>
      log.info(s"Unexpected response while subscribing to newsletter $response")

    case DeleteResponse(response) if response.status == StatusCodes.OK =>
      log.info("Unsubscribed from newsletter")

    case DeleteResponse(response) =>
      log.info("Unsubscribed from newsletter")

    case Status.Failure(th) =>
      log.error(th, "Error on newsletter request")
  }

  private def newClient() = clientFactory(restServiceConfig, context.system)
} 
开发者ID:tpalanga,项目名称:akka-http-microservice,代码行数:52,代码来源:NewsletterService.scala


示例7: respondWith

//设置package包名称以及导入依赖的类
package com.pacbio.common

import akka.actor.{Status, Actor}
import akka.pattern.pipe

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal

package object actors {
  trait PacBioActor extends Actor {
    def respondWith(x: => Any): Unit = {
      sender ! Try(x).recover{ case NonFatal(e) => Status.Failure(e) }.get
    }

    def pipeWith(x: => Future[Any])(implicit ec: ExecutionContext): Unit = {
      pipe(x.recover{ case NonFatal(e) => Status.Failure(e) }) to sender
    }
  }
} 
开发者ID:PacificBiosciences,项目名称:smrtflow,代码行数:21,代码来源:package.scala


示例8: BotsCliService

//设置package包名称以及导入依赖的类
package im.actor.server.cli

import akka.actor.{ Props, Actor, ActorLogging }
import akka.cluster.client.ClusterClientReceptionist
import akka.pattern.pipe
import im.actor.server.bot.BotExtension

object BotsCliService {
  def props = Props(new BotsCliService)
}

private final class BotsCliService extends Actor with ActorLogging {
  import context.dispatcher

  ClusterClientReceptionist(context.system).registerService(self)

  private val botExt = BotExtension(context.system)

  def receive = {
    case CreateBot(username, name, isAdmin) ?
      (for {
        (token, _) ? botExt.create(username, name, isAdmin)
      } yield CreateBotResponse(token)) pipeTo sender()
  }
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:26,代码来源:BotsCliService.scala


示例9: HttpCliService

//设置package包名称以及导入依赖的类
package im.actor.server.cli

import akka.actor.{ Props, Actor }
import akka.cluster.client.ClusterClientReceptionist
import akka.pattern.pipe
import im.actor.server.acl.ACLUtils
import im.actor.server.db.DbExtension
import im.actor.server.persist.HttpApiTokenRepo

object HttpCliService {
  def props = Props(new HttpCliService)
}

final class HttpCliService extends Actor {
  import context.dispatcher

  ClusterClientReceptionist(context.system).registerService(self)

  private val db = DbExtension(context.system).db

  def receive = {
    case HttpTokenCreate(isAdmin) ?
      val token = ACLUtils.accessToken()
      (for {
        _ ? db.run(HttpApiTokenRepo.create(token, isAdmin = isAdmin))
      } yield HttpTokenCreateResponse(token)) pipeTo sender()
  }
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:29,代码来源:HttpCliService.scala


示例10: UsersCliService

//设置package包名称以及导入依赖的类
package im.actor.server.cli

import akka.actor.{ Props, Actor, ActorLogging }
import akka.cluster.client.ClusterClientReceptionist
import akka.pattern.pipe
import im.actor.server.user.UserExtension

object UsersCliService {
  def props = Props(new UsersCliService)
}

private final class UsersCliService extends Actor with ActorLogging {
  import context.dispatcher

  ClusterClientReceptionist(context.system).registerService(self)

  private val userExt = UserExtension(context.system)

  def receive = {
    case UpdateIsAdmin(userId, isAdmin) ?
      (for {
        _ ? userExt.updateIsAdmin(userId, isAdmin)
      } yield UpdateIsAdminResponse()) pipeTo sender()
  }
} 
开发者ID:wex5,项目名称:dangchat-server,代码行数:26,代码来源:UsersCliService.scala


示例11: QueueSubscriber

//设置package包名称以及导入依赖的类
package reactive.queue.router

import akka.actor.{ActorLogging, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError, OnNext}
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.util.ByteString
import io.scalac.amqp.{Connection, Message}

object QueueSubscriber {
  def props(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri): Props =
    Props(classOf[QueueSubscriber],queueConnection,  queueName, queueSubscriberUri)
}

class QueueSubscriber(queueConnection: Connection, queueName: String, queueSubscriberUri: Uri) extends
  ActorSubscriber with ActorLogging {
  implicit val system = context.system
  implicit val ec = context.dispatcher
  implicit val materlizer = ActorMaterializer()

  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

  def receive = {
    case OnNext(message: Message) => route(ByteString(message.body.toArray).decodeString("UTF-8"))
    case OnComplete => log.info("*** on complete")
    case OnError(error) => log.error(s"*** on error: $error")
    case HttpResponse(status, _, _, _) => log.info(s"*** route response: ${status.intValue}")
  }

  def route(message: String): Unit = {
    log.info(s"*** on next: $message")
    try {
      val httpResponse = for {
        request <- Marshal(message).to[RequestEntity]
        response <- Http().singleRequest(HttpRequest(method = HttpMethods.POST, uri = queueSubscriberUri, entity = request))
        entity <- Unmarshal(response).to[HttpResponse]
      } yield entity
      httpResponse.pipeTo(self)
    } catch {
      case t: Throwable =>
        log.error(s"*** on next: forward to uri $queueSubscriberUri failed on: $message with error: ${t.getMessage}")
        queueConnection.publish(queueName, message)
        log.info(s"*** on next: republished to queue $queueName")
    }
  }
} 
开发者ID:objektwerks,项目名称:reactive.queue.router,代码行数:52,代码来源:QueueSubscriber.scala


示例12: ProviderActor

//设置package包名称以及导入依赖的类
package uk.mm.mpp.actors

import akka.actor.{Actor, Props}
import akka.pattern.pipe
import org.apache.commons.lang3.StringUtils._
import org.json4s._
import org.json4s.native.JsonMethods._
import play.api.Logger
import play.api.Play.current
import play.api.libs.ws.{WS, WSRequest, WSResponse}
import uk.mm.mpp.actors.ProviderActor.{ProductRequest, ProductResponse}
import uk.mm.mpp.globals._

import scala.concurrent.ExecutionContext.Implicits.global

object ProviderActor {
  def props(uid: String, port: Int) = Props(classOf[ProviderActor], uid, port)

  case class ProductRequest()

  case class ProductResponse(products: JArray)

}

class ProviderActor(uid: String, port: Int) extends Actor {

  private lazy val request: WSRequest = WS.client.url(providerUrl)
    .withFollowRedirects(false)
    .withRequestTimeout(15000)

  val logger = Logger(MPP_WORKER_PREFIX + getClass.getSimpleName + "_" + uid + "_" + port)
  val providerUrl: String = "http://localhost:" + port + "/3rd/products"

  def receive = {
    case ProductRequest =>
      request.get()
        .map(productUpdateFrom)
        .recover(withEmptyJsonArray)
        .pipeTo(sender)
  }

  val withEmptyJsonArray: PartialFunction[Throwable, ProductResponse] = {
    case _ => ProductResponse(JArray(List()))
  }

  def productUpdateFrom(response: WSResponse): ProductResponse = if (response.status == 200) {
    logger.debug(s"from: [$providerUrl]: [${piedPiper(response)}]")
    ProductResponse(parseJsonFrom(response))
  } else {
    logger.warn(s"from: [$providerUrl]: [${response.body}]")
    ProductResponse(JArray(List()))
  }

  def piedPiper(response: WSResponse) = {
    abbreviate(replacePattern(response.body, """\s{2,}""", " "), 30)
  }

  def parseJsonFrom(response: WSResponse) = parse(response.body).asInstanceOf[JArray]
} 
开发者ID:mikemey,项目名称:mpp,代码行数:60,代码来源:ProviderActor.scala


示例13: Authenticator

//设置package包名称以及导入依赖的类
package authentication

import akka.actor.{Actor, ActorLogging, Props}
import akka.pattern.pipe
import authentication.entities.{AuthToken, _}
import rest.client.RestClient
import rest.client.entities.ExecutionResultCode

import scala.concurrent.ExecutionContext

class Authenticator(restClient: RestClient) extends Actor with ActorLogging {

  implicit val ec: ExecutionContext = context.dispatcher

  override def receive: Receive = {
    case SignIn(user, password) =>
      restClient.signIn(user, password)
        .map {
          case (token: AuthToken) => AuthTokenStore.updateToken(token); Authenticated
        }
        .recover {
          case ex: Throwable => AuthFailure(ex)
        } pipeTo sender()

    case SignUp(user, password) =>
      restClient.signUp(user, password)
        .map {
          case (token: AuthToken) => AuthTokenStore.updateToken(token); Authenticated
        }
        .recover {
          case ex: Throwable => AuthFailure(ex)
        } pipeTo sender()

    case SignOut =>
      restClient.signOut()
        .map {
          case ExecutionResultCode.OK => AuthTokenStore.clear(); Disconnected
          case _ => AuthFailure(new RuntimeException("Operation failed with unknown error"))
        }
        .recover {
          case ex: Throwable => AuthFailure(ex)
        } pipeTo sender()
  }
}

object Authenticator {
  def props(restClient: RestClient): Props = Props(new Authenticator(restClient))
} 
开发者ID:lymr,项目名称:fun-chat,代码行数:49,代码来源:Authenticator.scala


示例14: HmdaFilingApi

//设置package包名称以及导入依赖的类
package hmda.api

import akka.actor.{ ActorSystem, Props }
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.util.Timeout
import hmda.api.http.{ BaseHttpApi, HmdaCustomDirectives, InstitutionsHttpApi, LarHttpApi }
import hmda.api.HmdaConfig._
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }

object HmdaFilingApi {
  def props(): Props = Props(new HmdaFilingApi)
}

class HmdaFilingApi
    extends HttpApi
    with BaseHttpApi
    with LarHttpApi
    with InstitutionsHttpApi
    with HmdaCustomDirectives {

  implicit val flowParallelism = configuration.getInt("hmda.actor-flow-parallelism")

  override val name = "hmda-filing-api"

  lazy val httpTimeout = configuration.getInt("hmda.http.timeout")
  implicit val timeout = Timeout(httpTimeout.seconds)

  override lazy val host = configuration.getString("hmda.http.host")
  override lazy val port = configuration.getInt("hmda.http.port")

  implicit val system: ActorSystem = context.system
  override implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val ec: ExecutionContext = context.dispatcher
  override val log = Logging(system, getClass)

  val paths: Route = routes(s"$name") ~ larRoutes ~ institutionsRoutes

  override val http: Future[ServerBinding] = Http(system).bindAndHandle(
    paths,
    host,
    port
  )

  http pipeTo self

} 
开发者ID:cfpb,项目名称:hmda-platform,代码行数:54,代码来源:HmdaFilingApi.scala


示例15: HmdaAdminApi

//设置package包名称以及导入依赖的类
package hmda.api

import akka.actor.{ ActorSystem, Props }
import akka.event.Logging
import akka.pattern.pipe
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import hmda.api.http.BaseHttpApi
import hmda.api.http.admin.InstitutionAdminHttpApi
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }

object HmdaAdminApi {
  def props(): Props = Props(new HmdaAdminApi)
}

class HmdaAdminApi extends HttpApi with BaseHttpApi with InstitutionAdminHttpApi {

  val config = ConfigFactory.load()

  lazy val httpTimeout = config.getInt("hmda.http.timeout")
  override implicit val timeout = Timeout(httpTimeout.seconds)

  override val name = "hmda-admin-api"

  override val host: String = config.getString("hmda.http.adminHost")
  override val port: Int = config.getInt("hmda.http.adminPort")

  override implicit val system: ActorSystem = context.system
  override implicit val materializer: ActorMaterializer = ActorMaterializer()
  override implicit val ec: ExecutionContext = context.dispatcher
  override val log = Logging(system, getClass)

  override val paths: Route = routes(s"$name") ~ institutionAdminRoutes

  override val http: Future[ServerBinding] = Http(system).bindAndHandle(
    paths,
    host,
    port
  )

  http pipeTo self
} 
开发者ID:cfpb,项目名称:hmda-platform,代码行数:49,代码来源:HmdaAdminApi.scala


示例16: DeviceService

//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, Props}
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import reactivehub.akka.stream.apns.manager.DeviceService._
import reactivehub.akka.stream.apns.pusher.PushData
import scala.concurrent.Future
import scala.util.{Failure, Success}

object DeviceService {
  case class CreateDevices(devices: List[NewDevice])

  sealed trait CreateDevicesResponse
  case class DevicesCreated(ids: List[Long]) extends CreateDevicesResponse

  case object PingDevices

  def props(store: DeviceStore, queue: PushQueue)(implicit m: ActorMaterializer): Props =
    Props(classOf[DeviceService], store, queue, m)
}

class DeviceService(store: DeviceStore, queue: PushQueue)(implicit m: ActorMaterializer)
  extends Actor with ActorLogging {

  import context.dispatcher

  override def receive: Receive = {
    case CreateDevices(devices) =>
      log.debug("Creating new devices for tokens {}", devices.map(_.token).mkString(", "))
      store.createDevices(devices).map(DevicesCreated).pipeTo(sender())

    case PingDevices =>
      log.debug("Pinging all devices")
      store.allDevices()
        .map(device => device.id -> PushData(device.token, Some("Hello"), Some(1)))
        .runWith(queue.pushDataSink)
        .onComplete {
          case Success(num) => log.debug("{} devices pinged", num)
          case Failure(t) => log.error(t, "Failed to ping devices")
        }
  }
}

trait DeviceStore {
  def createDevices(devices: List[NewDevice]): Future[List[Long]]
  def allDevices(): Source[Device, NotUsed]
}

trait PushQueue {
  def pushDataSink: Sink[(Long, PushData), Future[Long]]
} 
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:55,代码来源:DeviceService.scala


示例17: SqsQueue

//设置package包名称以及导入依赖的类
package com.jensraaby.queuevis.aws

import akka.actor.{Props, Actor}
import akka.pattern.pipe
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.{SendMessageRequest, SendMessageResult, ReceiveMessageResult, ReceiveMessageRequest}

import scala.concurrent.ExecutionContext

object SqsQueue {
  case class Poll(maxMessages: Int, maxWaitSeconds: Int)
  case class Write(body: String)


  def apply(queueUrl: String)(implicit executionContext: ExecutionContext,
                              sqsClient: AmazonSQSAsyncClient) = Props(new SqsQueue(queueUrl)(executionContext, sqsClient))
}

class SqsQueue(queueUrl: String)(implicit ec: ExecutionContext, sqsClient: AmazonSQSAsyncClient) extends Actor with AsyncWrapper {
  def receive = {
    case SqsQueue.Write(body) =>
      println("Writing message: " + body)
      val request = new SendMessageRequest()
        .withMessageBody(body)
        .withQueueUrl(queueUrl)

      val (result, handler) = asyncHandler[SendMessageRequest, SendMessageResult]
      sqsClient.sendMessageAsync(request, handler)
      println("Sent message")
      result pipeTo sender()

    case SqsQueue.Poll(msgs, wait) =>
      val destination = sender()
      val request = new ReceiveMessageRequest()
        .withQueueUrl(queueUrl)
        .withMaxNumberOfMessages(msgs)
        .withWaitTimeSeconds(wait)

      val (result, handler) = asyncHandler[ReceiveMessageRequest, ReceiveMessageResult]
      sqsClient.receiveMessageAsync(request, handler)
      import scala.collection.JavaConverters._
      for (results <- result) {
        destination ! results.getMessages.asScala.toList
      }
  }

} 
开发者ID:jensraaby,项目名称:queuevis,代码行数:48,代码来源:SqsQueue.scala


示例18: Collector

//设置package包名称以及导入依赖的类
package com.darienmt.keepers

import akka.Done
import akka.actor.Status.Failure
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import com.darienmt.keepers.Collector.{ StreamFinished }

object Collector {
  sealed trait CollectorMessage
  case object StreamFinished extends CollectorMessage

  def props(
    manager: ActorRef,
    collectorGenerator: Generator
  ): Props = Props(new Collector(manager, collectorGenerator))
}

class Collector(
    manager: ActorRef,
    collectorGenerator: Generator
) extends Actor with ActorLogging {

  import akka.pattern.pipe
  import context.dispatcher

  override def receive: Receive = {
    case Failure(ex) => {
      log.error("Stream stopped with an error")
      throw ex
    }
    case Done => {
      manager ! StreamFinished
    }
    case msg => throw new Exception("Unknown message: " + msg.toString)
  }

  override def preStart(): Unit = collectorGenerator() pipeTo self

  override def postRestart(reason: Throwable): Unit = {}

} 
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:42,代码来源:Collector.scala


示例19: HcdActor

//设置package包名称以及导入依赖的类
package tmt.demo.hcd

import akka.actor.Actor
import akka.pattern.pipe
import tcsstr2._
import tmt.app.configs.{AppSettings, Names}
import tmt.demo.connectors.{ZmqToAkkaFlow, AkkaToZmqFlow}
import tmt.demo.zeromq_drivers.ZmqClient

class HcdActor(
  zmqClient: ZmqClient,
  akkaToZmqFlow: AkkaToZmqFlow,
  zmqToAkkaFlow: ZmqToAkkaFlow,
  appSettings: AppSettings
) extends Actor {

  import context.dispatcher

  override def preStart() = {

    //connection to push cluster tcs_mcs_PositionDemand events to mcs
    akkaToZmqFlow.connect[tcs_mcs_PositionDemand](
      subscriberTopic = Names.PositionDemands,
      publishingPort = appSettings.mcsPositionDemandPort
    )

    //connection to push mcs mcs_Health events to cluster
    zmqToAkkaFlow.connect(
      publishingTopic = Names.Health,
      subscriberPort = appSettings.mcsHealthPort,
      responseParser = mcs_Health
    )
  }

  //send tcs commands to mcs and reply back to sender
  def receive = {
    case command: Tcs_Command =>
      val result = zmqClient.query(command, command_response)
      result pipeTo sender()
  }
} 
开发者ID:mushtaq,项目名称:tcs-jeromq,代码行数:42,代码来源:HcdActor.scala


示例20: FrontendApi

//设置package包名称以及导入依赖的类
package api

import akka.actor.{ ActorSystem, Props }
import akka.cluster.pubsub.DistributedPubSub
import akka.pattern.pipe
import akka.event.{ Logging, LoggingAdapter }
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import api.actors.{ ClusterListener, FileReceiver }
import api.http.{ HttpApi, Service }
import com.typesafe.config.ConfigFactory

import scala.concurrent.{ ExecutionContext, Future }

object FrontendApi {
  def props(): Props = Props(new FrontendApi)
}

class FrontendApi extends HttpApi with Service {
  override val name: String = "frontend-api"

  val config = ConfigFactory.load()
  override val log: LoggingAdapter = Logging(context.system, getClass)

  override val host: String = config.getString("frontend.host")
  override val port: Int = config.getInt("frontend.port")

  override implicit val system: ActorSystem = context.system
  override implicit val materializer: ActorMaterializer = ActorMaterializer()
  override implicit val ec: ExecutionContext = context.dispatcher

  //Start up actors
  val clusterListener = system.actorOf(ClusterListener.props())
  val fileReceiver = system.actorOf(FileReceiver.props())

  val mediator = DistributedPubSub(system).mediator

  override val paths: Route = routes(s"$name", clusterListener, mediator)
  override val http: Future[Http.ServerBinding] = Http(system).bindAndHandle(
    paths,
    host,
    port
  )

  http pipeTo self

} 
开发者ID:jmarin,项目名称:akka-cluster-example,代码行数:49,代码来源:FrontendApi.scala



注:本文中的akka.pattern.pipe类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala GeneratorDrivenPropertyChecks类代码示例发布时间:2022-05-23
下一篇:
Scala Singleton类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap