本文整理汇总了Scala中akka.stream.Materializer类的典型用法代码示例。如果您正苦于以下问题:Scala Materializer类的具体用法?Scala Materializer怎么用?Scala Materializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Materializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: materializer
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ironmq
import akka.stream.{ActorMaterializer, Materializer}
import org.scalatest.{BeforeAndAfterEach, Suite}
trait AkkaStreamFixture extends AkkaFixture with BeforeAndAfterEach { _: Suite =>
private var mutableMaterializer = Option.empty[ActorMaterializer]
implicit def materializer: Materializer =
mutableMaterializer.getOrElse(throw new IllegalStateException("Materializer not initialized"))
override protected def beforeEach(): Unit = {
super.beforeEach()
mutableMaterializer = Option(ActorMaterializer())
}
override protected def afterEach(): Unit = {
mutableMaterializer.foreach(_.shutdown())
super.afterEach()
}
}
开发者ID:akka,项目名称:alpakka,代码行数:23,代码来源:AkkaStreamFixture.scala
示例2: Session
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.googlecloud.pubsub
import java.security.PrivateKey
import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.Materializer
import scala.concurrent.Future
@akka.annotation.InternalApi
private[pubsub] class Session(clientEmail: String, privateKey: PrivateKey) {
protected var maybeAccessToken: Option[Future[AccessTokenExpiry]] = None
protected def now = Instant.now()
protected val httpApi: HttpApi = HttpApi
private def getNewToken()(implicit as: ActorSystem, materializer: Materializer): Future[AccessTokenExpiry] = {
val accessToken = httpApi.getAccessToken(clientEmail = clientEmail, privateKey = privateKey, when = now)
maybeAccessToken = Some(accessToken)
accessToken
}
private def expiresSoon(g: AccessTokenExpiry): Boolean =
g.expiresAt < (now.getEpochSecond + 60)
def getToken()(implicit as: ActorSystem, materializer: Materializer): Future[String] = {
import materializer.executionContext
maybeAccessToken
.getOrElse(getNewToken())
.flatMap { result =>
if (expiresSoon(result)) {
getNewToken()
} else {
Future.successful(result)
}
}
.map(_.accessToken)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:40,代码来源:Session.scala
示例3: PlayerServiceImpl
//设置package包名称以及导入依赖的类
package com.chriswk.gameranker.player.impl
import java.util.UUID
import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.chriswk.gameranker.player.api
import com.chriswk.gameranker.player.api.PlayerService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.transport.NotFound
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import scala.concurrent.ExecutionContext
class PlayerServiceImpl(registry: PersistentEntityRegistry, system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) extends PlayerService {
private val currentIdsQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
override def createPlayer = ServiceCall { createPlayer =>
val playerId = UUID.randomUUID()
refFor(playerId).ask(CreatePlayer(createPlayer.name)).map { _ =>
api.Player(playerId, createPlayer.name)
}
}
override def getPlayer(playerId: UUID) = ServiceCall { _ =>
refFor(playerId).ask(GetPlayer).map {
case Some(player) => api.Player(playerId, player.name)
case None => throw NotFound(s"Player with id $playerId")
}
}
private def refFor(playerId: UUID) = registry.refFor[PlayerEntity](playerId.toString)
override def getPlayers = ServiceCall { _ =>
currentIdsQuery.currentPersistenceIds()
.filter(_.startsWith("PlayerEntity|"))
.mapAsync(4) { id =>
val entityId = id.split("\\|", 2).last
registry.refFor[PlayerEntity](entityId)
.ask(GetPlayer)
.map(_.map(player => api.Player(UUID.fromString(entityId), player.name)))
}
.collect {
case Some(p) => p
}
.runWith(Sink.seq)
}
}
开发者ID:chriswk,项目名称:gameranker,代码行数:53,代码来源:PlayerServiceImpl.scala
示例4: FileTailSourceSpec
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.file.scaladsl
import java.nio.file.FileSystems
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.file.scaladsl
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import scala.concurrent.duration._
object FileTailSourceSpec {
// small sample of usage, tails the first argument file path
def main(args: Array[String]): Unit = {
if (args.length != 1) throw new IllegalArgumentException("Usage: FileTailSourceTest [path]")
val path: String = args(0)
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
// #simple-lines
val fs = FileSystems.getDefault
val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines(
path = fs.getPath(path),
maxLineSize = 8192,
pollingInterval = 250.millis
)
lines.runForeach(line => System.out.println(line))
// #simple-lines
}
}
开发者ID:akka,项目名称:alpakka,代码行数:36,代码来源:FileTailSourceSpec.scala
示例5: ExampleFilter
//设置package包名称以及导入依赖的类
package filters
import akka.stream.Materializer
import javax.inject._
import play.api.mvc._
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class ExampleFilter @Inject()(
implicit override val mat: Materializer,
exec: ExecutionContext) extends Filter {
override def apply(nextFilter: RequestHeader => Future[Result])
(requestHeader: RequestHeader): Future[Result] = {
// Run the next filter in the chain. This will call other filters
// and eventually call the action. Take the result and modify it
// by adding a new header.
nextFilter(requestHeader).map { result =>
result.withHeaders("X-ExampleFilter" -> "foo")
}
}
}
开发者ID:dgrald,项目名称:locals-only-service-play,代码行数:25,代码来源:ExampleFilter.scala
示例6: ExampleFilter
//设置package包名称以及导入依赖的类
package filters
import akka.stream.Materializer
import javax.inject._
import play.api.mvc._
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class ExampleFilter @Inject()(
implicit override val mat: Materializer,
exec: ExecutionContext) extends Filter {
override def apply(nextFilter: RequestHeader => Future[Result])
(requestHeader: RequestHeader): Future[Result] = {
// Run the next filter in the chain. This will call other filters
// and eventually call the action. Take the result and modify it
// by adding a new header.
nextFilter(requestHeader).map { result =>
// result.withHeaders("X-ExampleFilter" -> "foo")
result
}
}
}
开发者ID:morpheby,项目名称:ntcu-control-server,代码行数:26,代码来源:ExampleFilter.scala
示例7: ReplyMessageClient
//设置package包名称以及导入依赖的类
package bot.line.client
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.Materializer
import scala.concurrent.{ExecutionContext, Future}
class ReplyMessageClient(accessToken: String)
(implicit val system: ActorSystem,
implicit val materializer: Materializer,
implicit val ec: ExecutionContext) extends MessageReplier {
override def replyMessage(replyToken: String, message: String): Future[Unit] = {
val request = ReplyMessageRequest(accessToken, replyToken, message).httpRequest
val responseFuture = Http().singleRequest(request)
responseFuture.collect {
case response if response.status.isSuccess() => println(s"message sent!")
case error => println(s"request failed: $error")
}
}
}
开发者ID:xoyo24,项目名称:akka-http-line-bot,代码行数:25,代码来源:ReplyMessageClient.scala
示例8: HttpsOnlyFilter
//设置package包名称以及导入依赖的类
package utils
import javax.inject.Inject
import scala.concurrent.Future
import play.api._
import play.api.i18n.{I18nSupport, Messages, MessagesApi}
import play.api.mvc._
import Results.Ok
import akka.stream.Materializer
import play.api.http.HttpFilters
import play.filters.csrf.CSRFFilter
import controllers.routes
// If we configure play.http.forwarded.trustedProxies correctly, we don't need this filter... right? right!?
class HttpsOnlyFilter @Inject() (implicit val mat:Materializer, val messagesApi:MessagesApi) extends Filter with I18nSupport {
def apply(nextFilter:RequestHeader => Future[Result])(request:RequestHeader):Future[Result] = {
implicit val r = request
request.headers.get("X-Forwarded-Proto").map {
case "https" => nextFilter(request)
case _ => Future.successful(Ok(views.html.errors.onlyHttpsAllowed()))
}.getOrElse(nextFilter(request))
}
}
class Filters @Inject() (
configuration:Configuration,
csrfFilter:CSRFFilter,
httpsOnlyFilter:HttpsOnlyFilter) extends HttpFilters {
val map = Map("application.proxy.httpsOnly" -> httpsOnlyFilter)
override def filters = csrfFilter +: map.foldRight[Seq[EssentialFilter]](Seq.empty) { case ((k,f),filters) =>
configuration.getBoolean(k) collect {
case true => f +: filters
} getOrElse filters
}
}
开发者ID:tm-sukehiro,项目名称:play-hands-on,代码行数:39,代码来源:Filters.scala
示例9: HttpFetcher
//设置package包名称以及导入依赖的类
package au.csiro.data61.magda.external
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.model._
import akka.stream.Materializer
import akka.stream.scaladsl._
import au.csiro.data61.magda.util.Http.getPort
import scala.concurrent.{ ExecutionContext, Future }
class HttpFetcher(interfaceConfig: InterfaceConfig, implicit val system: ActorSystem,
implicit val materializer: Materializer, implicit val ec: ExecutionContext) {
lazy val connectionFlow: Flow[HttpRequest, HttpResponse, Any] =
Http().outgoingConnection(interfaceConfig.baseUrl.getHost, getPort(interfaceConfig.baseUrl))
def request(path: String): Future[HttpResponse] =
interfaceConfig.fakeConfig match {
case Some(fakeConfig) => Future {
val file = io.Source.fromInputStream(getClass.getResourceAsStream(fakeConfig.datasetPath))
val response = new HttpResponse(
status = StatusCodes.OK,
headers = scala.collection.immutable.Seq(),
protocol = HttpProtocols.`HTTP/1.1`,
entity = HttpEntity(ContentTypes.`application/json`, file.mkString))
file.close()
response
}
case None => {
val request = RequestBuilding.Get(s"${interfaceConfig.baseUrl.getPath}${path}")
Source.single(request).via(connectionFlow).runWith(Sink.head)
}
}
}
开发者ID:TerriaJS,项目名称:magda-ckan,代码行数:40,代码来源:HttpFetcher.scala
示例10: Rejection
//设置package包名称以及导入依赖的类
package com.shashank.akkahttp.basic.routing
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.{ActorMaterializer, Materializer}
object Rejection {
def main(args: Array[String]) {
implicit val sys = ActorSystem("IntroductionToAkkaHttp")
implicit val mat:Materializer = ActorMaterializer()
implicit def myRejectionHandler = RejectionHandler.newBuilder().handle{
case MissingCookieRejection(cookieName) =>
complete(HttpResponse(StatusCodes.BadRequest, entity = "No cookies, no service!!!"))
}.handleNotFound {
complete((StatusCodes.NotFound, "Not here!"))
}.result()
val route =
path("welcome"){
get{
complete {
"welcome to rest service"
}
}
} ~
path("demo"){
get{
complete {
"welcome to demonstration"
}
}
} ~
path("wrong"){
reject{
ValidationRejection("Invalid path", None)
}
}
Http().bindAndHandle(route, "localhost", 8090)
}
}
开发者ID:shashankgowdal,项目名称:introduction-to-akkahttp,代码行数:51,代码来源:Rejection.scala
示例11: Failure
//设置package包名称以及导入依赖的类
package com.shashank.akkahttp.basic.routing
import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.ExceptionHandler
import akka.stream.{ActorMaterializer, Materializer}
object Failure {
def main(args: Array[String]) {
implicit val sys = ActorSystem("IntroductionToAkkaHttp")
implicit val mat:Materializer = ActorMaterializer()
implicit def myExceptionHandler = ExceptionHandler {
case _: ArithmeticException =>
complete(HttpResponse(StatusCodes.BadRequest, entity = "Bad numbers, bad result!!!"))
case e: Throwable => {
println(e.getMessage)
println(e.getStackTraceString)
complete(HttpResponse(StatusCodes.BadRequest, entity = e.getMessage))
}
}
val route =
path("welcome"){
get{
complete {
"welcome to rest service"
}
}
} ~
path("demo"){
get {
complete {
100/0
"welcome to demonstration"
}
}
}
Http().bindAndHandle(route, "localhost", 8090)
}
}
开发者ID:shashankgowdal,项目名称:introduction-to-akkahttp,代码行数:50,代码来源:Failure.scala
示例12: ReverseProxy
//设置package包名称以及导入依赖的类
package com.shashank.akkahttp.basic.serving
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Host, `Access-Control-Allow-Origin`}
import akka.stream.scaladsl.Flow
import akka.stream.{ActorMaterializer, Materializer}
object ReverseProxy {
def main(args: Array[String]) {
implicit val sys = ActorSystem("IntroductionToAkkaHttp")
implicit val mat:Materializer = ActorMaterializer()
val redirectHost = "localhost"
val redirectPort = 8090
val requestFlow = Flow.fromFunction[HttpRequest, HttpRequest]( request => {
request
.withUri(request.uri.withAuthority(redirectHost, redirectPort))
.mapHeaders(headers => headers.filterNot(_.lowercaseName() == Host.lowercaseName))
.addHeader(Host(redirectHost, redirectPort))
})
val outgoingConnection = Http().outgoingConnection(redirectHost, redirectPort)
val responseFlow = Flow.fromFunction[HttpResponse, HttpResponse]( response => {
response.withHeaders(`Access-Control-Allow-Origin`.*)
})
Http().bindAndHandle(requestFlow via outgoingConnection via responseFlow, "localhost", 8080)
}
}
开发者ID:shashankgowdal,项目名称:introduction-to-akkahttp,代码行数:37,代码来源:ReverseProxy.scala
示例13: KafkaWriter
//设置package包名称以及导入依赖的类
package wiii
import akka.actor._
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.softwaremill.react.kafka.{ProducerMessage, ProducerProperties, ReactiveKafka}
import twitter4j.{Status, _}
object KafkaWriter {
def props(topic: String)(implicit mat: ActorMaterializer) = Props(new KafkaWriter(topic))
}
class KafkaWriter(topicName: String)(implicit mat: Materializer) extends Actor with ActorLogging {
override def preStart(): Unit = initWriter()
override def receive: Receive = {
case _ =>
}
def initWriter(): Unit = {
val subscriberProps = new ReactiveKafka().producerActorProps(ProducerProperties(
bootstrapServers = "localhost:9092",
topic = topicName,
valueSerializer = TweetSerializer
))
val subscriber = context.actorOf(subscriberProps)
val (actorRef, publisher) = Source.actorRef[Status](1000, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
val factory = new TwitterStreamFactory()
val twitterStream = factory.getInstance()
twitterStream.addListener(new StatusForwarder(actorRef))
twitterStream.filter(new FilterQuery("espn"))
Source.fromPublisher(publisher).map(s => ProducerMessage(Tweet(s.getUser.getName, s.getText)))
.runWith(Sink.fromSubscriber(ActorSubscriber[ProducerMessage[Array[Byte], Tweet]](subscriber)))
}
}
class StatusForwarder(publisher: ActorRef) extends StatusListener {
def onStatus(status: Status): Unit = publisher ! status
//\\ nop all the others for now //\\
def onStallWarning(warning: StallWarning): Unit = {}
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
def onException(ex: Exception): Unit = {}
}
开发者ID:jw3,项目名称:example-kafka-tweets,代码行数:50,代码来源:FeedToKafka.scala
示例14: HttpsFilter
//设置package包名称以及导入依赖的类
package filters
import javax.inject.Inject
import akka.stream.Materializer
import scala.concurrent.Future
import play.api.mvc._
import play.api.Play
import play.api.Configuration
import play.api.libs.concurrent.Execution.Implicits.defaultContext
class HttpsFilter @Inject() (config: Configuration)(implicit val mat: Materializer) extends Filter {
def apply(nextFilter: (RequestHeader) => Future[Result])(requestHeader: RequestHeader): Future[Result] = {
val isEnable = config.getBoolean("application.is.secure").getOrElse(true)
if (!isEnable) {
nextFilter(requestHeader)
} else {
requestHeader.secure match {
case true => nextFilter(requestHeader).map(_.withHeaders("Strict-Transport-Security" -> "max-age=31536000; includeSubDomains"))
case false => redirectToHttps(requestHeader)
}
}
}
private def redirectToHttps(requestHeader: RequestHeader) = {
Future.successful(Results.MovedPermanently("https://" + requestHeader.host + requestHeader.uri))
}
}
开发者ID:Driox,项目名称:play-app-seed,代码行数:31,代码来源:HttpsFilter.scala
示例15: InstanceTrackerModule
//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.task.tracker
import akka.actor.ActorRef
import akka.stream.Materializer
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.instance.update.{ InstanceChangeHandler, InstanceUpdateOpResolver }
import mesosphere.marathon.core.leadership.LeadershipModule
import mesosphere.marathon.core.task.tracker.impl._
import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.storage.repository.InstanceRepository
class InstanceTrackerModule(
clock: Clock,
metrics: Metrics,
config: InstanceTrackerConfig,
leadershipModule: LeadershipModule,
instanceRepository: InstanceRepository,
updateSteps: Seq[InstanceChangeHandler])(implicit mat: Materializer) {
lazy val instanceTracker: InstanceTracker =
new InstanceTrackerDelegate(Some(metrics), config, instanceTrackerActorRef)
lazy val instanceTrackerUpdateStepProcessor: InstanceTrackerUpdateStepProcessor =
new InstanceTrackerUpdateStepProcessorImpl(updateSteps, metrics)
def instanceCreationHandler: InstanceCreationHandler = instanceStateOpProcessor
def stateOpProcessor: TaskStateOpProcessor = instanceStateOpProcessor
def instanceReservationTimeoutHandler: TaskReservationTimeoutHandler = instanceStateOpProcessor
private[this] def updateOpResolver(instanceTrackerRef: ActorRef): InstanceUpdateOpResolver =
new InstanceUpdateOpResolver(
new InstanceTrackerDelegate(None, config, instanceTrackerRef), clock)
private[this] def instanceOpProcessor(instanceTrackerRef: ActorRef): InstanceOpProcessor =
new InstanceOpProcessorImpl(instanceTrackerRef, instanceRepository, updateOpResolver(instanceTrackerRef), config)
private[this] lazy val instanceUpdaterActorMetrics = new InstanceUpdateActor.ActorMetrics(metrics)
private[this] def instanceUpdaterActorProps(instanceTrackerRef: ActorRef) =
InstanceUpdateActor.props(clock, instanceUpdaterActorMetrics, instanceOpProcessor(instanceTrackerRef))
private[this] lazy val instancesLoader = new InstancesLoaderImpl(instanceRepository)
private[this] lazy val instanceTrackerMetrics = new InstanceTrackerActor.ActorMetrics(metrics)
private[this] lazy val instanceTrackerActorProps = InstanceTrackerActor.props(
instanceTrackerMetrics, instancesLoader, instanceTrackerUpdateStepProcessor, instanceUpdaterActorProps)
protected lazy val instanceTrackerActorName = "instanceTracker"
private[this] lazy val instanceTrackerActorRef = leadershipModule.startWhenLeader(
instanceTrackerActorProps, instanceTrackerActorName
)
private[this] lazy val instanceStateOpProcessor =
new InstanceCreationHandlerAndUpdaterDelegate(clock, config, instanceTrackerActorRef)
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:48,代码来源:InstanceTrackerModule.scala
示例16: AccountServiceRestClient
//设置package包名称以及导入依赖的类
package com.tpalanga.test.account.api.users
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.{ContentTypes, RequestEntity}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
import com.tpalanga.test.account.api.users.model.{NewUser, User, Users}
import com.tpalanga.test.config.TestConfig
import com.tpalanga.testlib.test.client.{NoEntity, Response, RestServiceClient}
import com.tpalanga.testlib.test.config.RestServiceConfig
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.{ExecutionContext, Future}
class AccountServiceRestClient(val restServiceConfig: RestServiceConfig)
(implicit val testConfig: TestConfig, val system: ActorSystem)
extends RestServiceClient with LazyLogging {
import NoEntity.DataFormats._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import com.tpalanga.test.account.api.users.model.UserJsonProtocol._
logger.debug(s"AccountServiceRestServiceClient: $restServiceConfig")
private implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(system))
def userRetrieve(id: String)(implicit ec: ExecutionContext): Future[Response[User]] =
client.get(s"/data/users/$id").map { httpResponse =>
Response[User](httpResponse)
}
def userCreate(user: NewUser)(implicit ec: ExecutionContext): Future[Response[User]] =
for {
entity <- Marshal(user).to[RequestEntity]
httpResponse <- client.post(s"/data/users", Nil, entity.withContentType(ContentTypes.`application/json`))
} yield Response[User](httpResponse)
def userUpdate(user: User)(implicit ec: ExecutionContext): Future[Response[User]] =
for {
entity <- Marshal(user).to[RequestEntity]
httpResponse <- client.put(s"/data/users/${user.id}", Nil, entity.withContentType(ContentTypes.`application/json`))
} yield Response[User](httpResponse)
def userDelete(id: String)(implicit ec: ExecutionContext): Future[Response[NoEntity]] =
client.delete(s"/data/users/$id").map { httpResponse =>
Response[NoEntity](httpResponse)
}
def userList()(implicit ec: ExecutionContext): Future[Response[Users]] =
client.get(s"/data/users").map { httpResponse =>
Response[Users](httpResponse)
}
}
开发者ID:tpalanga,项目名称:akka-http-microservice,代码行数:52,代码来源:AccountServiceRestClient.scala
示例17: NewsletterServiceRestClient
//设置package包名称以及导入依赖的类
package com.tpalanga.testlib.test.client.impl
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.{ContentTypes, RequestEntity}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
import com.tpalanga.testlib.test.client.{NoEntity, Response, RestServiceClient}
import com.tpalanga.testlib.test.config.RestServiceConfig
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.{ExecutionContext, Future}
object NewsletterServiceRestClient {
type NewsletterServiceRestClientFactory = (RestServiceConfig, ActorSystem) => NewsletterServiceRestClient
def defaultFactory: NewsletterServiceRestClientFactory =
(config, system) => new NewsletterServiceRestClient(config)(system)
}
class NewsletterServiceRestClient(val restServiceConfig: RestServiceConfig)
(implicit val system: ActorSystem)
extends RestServiceClient with LazyLogging {
import NoEntity.DataFormats._
import SubscriberJsonProtocol._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
logger.debug(s"NewsletterServiceRestServiceClient: $restServiceConfig")
private implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(system))
def subscriberRetrieve(id: String)(implicit ec: ExecutionContext): Future[Response[Subscriber]] =
client.get(s"/data/subscribers/$id").map { httpResponse =>
Response[Subscriber](httpResponse)
}
def subscriberCreate(subscriber: Subscriber)(implicit ec: ExecutionContext): Future[Response[Subscriber]] =
for {
entity <- Marshal(subscriber).to[RequestEntity]
httpResponse <- client.post(s"/data/subscribers", Nil, entity.withContentType(ContentTypes.`application/json`))
} yield Response[Subscriber](httpResponse)
def subscriberUpdate(user: Subscriber)(implicit ec: ExecutionContext): Future[Response[Subscriber]] =
for {
entity <- Marshal(user).to[RequestEntity]
httpResponse <- client.put(s"/data/subscribers/${user.id}", Nil, entity.withContentType(ContentTypes.`application/json`))
} yield Response[Subscriber](httpResponse)
def subscriberDelete(id: String)(implicit ec: ExecutionContext): Future[Response[NoEntity]] =
client.delete(s"/data/subscribers/$id").map { httpResponse =>
Response[NoEntity](httpResponse)
}
def subscriberList()(implicit ec: ExecutionContext): Future[Response[Subscribers]] =
client.get(s"/data/subscribers").map { httpResponse =>
Response[Subscribers](httpResponse)
}
}
开发者ID:tpalanga,项目名称:akka-http-microservice,代码行数:60,代码来源:NewsletterServiceRestClient.scala
示例18: RestClient
//设置package包名称以及导入依赖的类
package com.tpalanga.testlib.test.client
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
import com.tpalanga.testlib.test.config.RestServiceConfig
import com.typesafe.scalalogging.LazyLogging
import scala.collection.immutable.Seq
import scala.concurrent.Future
class RestClient(config: RestServiceConfig)(implicit system: ActorSystem) extends LazyLogging {
private implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(system))
private val http = Http(system)
protected def uriFor(path: String): Uri = {
val portext =
if ((config.port == 80 && config.protocol == "http")
|| (config.port == 443 && config.protocol == "https")) ""
else s":${config.port}"
Uri(s"${config.protocol}://${config.host}$portext$path")
}
protected def sendRequest(httpRequest: HttpRequest): Future[HttpResponse] = {
import system.dispatcher
logger.debug(s"Sending request: $httpRequest")
http.singleRequest(httpRequest).map { httpResponse =>
logger.debug(s"Received response: $httpResponse")
httpResponse
}
}
def get(path: String, headers: Seq[HttpHeader] = Nil): Future[HttpResponse] =
sendRequest(HttpRequest(GET, uriFor(path), headers))
def post(path: String, headers: Seq[HttpHeader] = Nil, entity: RequestEntity = HttpEntity.Empty): Future[HttpResponse] =
sendRequest(HttpRequest(POST, uriFor(path), headers, entity))
def put(path: String, headers: Seq[HttpHeader] = Nil, entity: RequestEntity = HttpEntity.Empty): Future[HttpResponse] =
sendRequest(HttpRequest(PUT, uriFor(path), headers, entity))
def delete(path: String, headers: Seq[HttpHeader] = Nil): Future[HttpResponse] =
sendRequest(HttpRequest(DELETE, uriFor(path), headers))
}
开发者ID:tpalanga,项目名称:akka-http-microservice,代码行数:48,代码来源:RestClient.scala
示例19: LoggingFilter
//设置package包名称以及导入依赖的类
package filters
import javax.inject.{Inject, Singleton}
import akka.stream.Materializer
import play.api.Logger
import play.api.mvc._
import scala.concurrent.{Future, ExecutionContext}
@Singleton
class LoggingFilter @Inject()(implicit override val mat: Materializer, ec: ExecutionContext) extends Filter {
def apply(next: RequestHeader => Future[Result])(request: RequestHeader): Future[Result] = {
val startTime = System.currentTimeMillis
next(request).map { result =>
val endTime = System.currentTimeMillis
val requestTime = endTime - startTime
Logger.info(s"${request.method} ${request.uri} took ${requestTime}ms and returned ${result.header.status}")
result.withHeaders("Request-Time" -> requestTime.toString)
}
}
}
开发者ID:stonexx,项目名称:utils,代码行数:23,代码来源:LoggingFilter.scala
示例20: ec
//设置package包名称以及导入依赖的类
package scaladays.akka.http
import akka.http.scaladsl.marshallers.sprayjson.ExtendedSprayJsonSupport
import akka.http.scaladsl.model.ws
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink}
import scala.concurrent.ExecutionContext
import scaladays.akka.kafka.KafkaTopics
trait Step3WebsocketRoutes extends Directives with FramedEntityStreamingDirectives
with ExtendedSprayJsonSupport with MyJsonProtocol {
implicit def ec: ExecutionContext
implicit def materializer: Materializer
def kafkaTopics: KafkaTopics
lazy val namesFromKafka =
kafkaTopics.namesSource.map { comittable =>
comittable.committableOffset.commitScaladsl()
comittable.value
}
lazy val namesAsWebSocketMessages =
namesFromKafka.map(ws.TextMessage(_))
def step3WsRoutes =
path("step3" / "ws") {
val source = Flow.fromSinkAndSource(Sink.ignore, namesAsWebSocketMessages)
handleWebSocketMessages(source)
}
}
开发者ID:ktoso,项目名称:scaladays-berlin-akka-streams,代码行数:35,代码来源:Step3WebsocketRoutes.scala
注:本文中的akka.stream.Materializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论