本文整理汇总了Scala中scala.concurrent.duration.FiniteDuration类的典型用法代码示例。如果您正苦于以下问题:Scala FiniteDuration类的具体用法?Scala FiniteDuration怎么用?Scala FiniteDuration使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了FiniteDuration类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: SimpleConsumerFactory
//设置package包名称以及导入依赖的类
package com.box.castle.consumer
import com.box.kafka.Broker
import kafka.consumer.SimpleConsumer
import scala.concurrent.duration.FiniteDuration
// $COVERAGE-OFF$
class SimpleConsumerFactory {
def create(broker: Broker,
brokerTimeout: FiniteDuration,
bufferSize: Int,
clientId: ClientId): SimpleConsumer = {
require(brokerTimeout.toMillis > 0, "broker timeout must be positive")
new SimpleConsumer(broker.host,
broker.port,
brokerTimeout.toMillis.toInt,
bufferSize,
clientId.value)
}
}
// $COVERAGE-ON$
开发者ID:Box-Castle,项目名称:router,代码行数:25,代码来源:SimpleConsumerFactory.scala
示例2: ConstructorsPopulationSpec
//设置package包名称以及导入依赖的类
package com.github.astonbitecode.di
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import org.junit.runner.RunWith
import org.specs2.mutable
import org.specs2.runner.JUnitRunner
import org.specs2.specification.BeforeEach
@RunWith(classOf[JUnitRunner])
class ConstructorsPopulationSpec extends mutable.Specification with BeforeEach {
val timeout = FiniteDuration(1000, TimeUnit.MILLISECONDS)
override def before() {
TestUtil.clean
}
sequential
"A spec for the Constructors population in the DI ".txt
"A constructor should be populated in the DI" >> {
val f = diDefine { () => MyInjectableClass("One") }
Await.result(f, timeout)
cache must haveSize(1)
}
"A constructor should be replaced in the DI if it already exists" >> {
val f1 = diDefine { () => MyInjectableClass("One") }
Await.result(f1, timeout)
val f2 = diDefine { () => MyInjectableClass("Two") }
Await.result(f2, timeout)
cache must haveSize(1)
cache.head._2.constructor.apply().asInstanceOf[MyInjectableClass].id === "Two"
}
"A constructor with scope SINGLETON_EAGER should create the instance upon the call" >> {
val f = diDefine(() => MyInjectableClass("One"), DIScope.SINGLETON_EAGER)
Await.result(f, timeout)
cache must haveSize(1)
cache.head._2.cachedInstance.isDefined === true
}
"A constructor with scope SINGLETON_LAZY should not create the instance upon the call" >> {
val f = diDefine(() => MyInjectableClass("One"), DIScope.SINGLETON_LAZY)
Await.result(f, timeout)
cache must haveSize(1)
cache.head._2.cachedInstance.isDefined === false
}
case class MyInjectableClass(id: String)
}
开发者ID:astonbitecode,项目名称:kind-of-di,代码行数:55,代码来源:ConstructorsPopulationSpec.scala
示例3: TestKitExtension
//设置package包名称以及导入依赖的类
package akka.testkit
import com.typesafe.config.Config
import akka.util.Timeout
import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
import scala.concurrent.duration.FiniteDuration
object TestKitExtension extends ExtensionId[TestKitSettings] {
override def get(system: ActorSystem): TestKitSettings = super.get(system)
def createExtension(system: ExtendedActorSystem): TestKitSettings = new TestKitSettings(system.settings.config)
}
class TestKitSettings(val config: Config) extends Extension {
import akka.util.Helpers._
val TestTimeFactor = config.getDouble("akka.test.timefactor").
requiring(tf ? !tf.isInfinite && tf > 0, "akka.test.timefactor must be positive finite double")
val SingleExpectDefaultTimeout: FiniteDuration = config.getMillisDuration("akka.test.single-expect-default")
val TestEventFilterLeeway: FiniteDuration = config.getMillisDuration("akka.test.filter-leeway")
val DefaultTimeout: Timeout = Timeout(config.getMillisDuration("akka.test.default-timeout"))
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:23,代码来源:TestKitExtension.scala
示例4: TestBarrierTimeoutException
//设置package包名称以及导入依赖的类
package akka.testkit
import scala.concurrent.duration.Duration
import java.util.concurrent.{ CyclicBarrier, TimeUnit, TimeoutException }
import akka.actor.ActorSystem
import scala.concurrent.duration.FiniteDuration
class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
object TestBarrier {
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
def apply(count: Int) = new TestBarrier(count)
}
class TestBarrier(count: Int) {
private val barrier = new CyclicBarrier(count)
def await()(implicit system: ActorSystem): Unit = await(TestBarrier.DefaultTimeout)
def await(timeout: FiniteDuration)(implicit system: ActorSystem) {
try {
barrier.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
} catch {
case e: TimeoutException ?
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s"
format (timeout.toString, TestKitExtension(system).TestTimeFactor))
}
}
def reset(): Unit = barrier.reset()
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:34,代码来源:TestBarrier.scala
示例5: filterEvents
//设置package包名称以及导入依赖的类
package akka
import language.implicitConversions
import akka.actor.ActorSystem
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.reflect.ClassTag
import scala.collection.immutable
import java.util.concurrent.TimeUnit.MILLISECONDS
package object testkit {
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ? T)(implicit system: ActorSystem): T = {
def now = System.currentTimeMillis
system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq]))
try {
val result = block
val testKitSettings = TestKitExtension(system)
val stop = now + testKitSettings.TestEventFilterLeeway.toMillis
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitSettings.TestEventFilterLeeway + ") waiting for " + _)
if (failed.nonEmpty)
throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))
result
} finally {
system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq]))
}
}
def filterEvents[T](eventFilters: EventFilter*)(block: ? T)(implicit system: ActorSystem): T = filterEvents(eventFilters.toSeq)(block)
def filterException[T <: Throwable](block: ? Unit)(implicit system: ActorSystem, t: ClassTag[T]): Unit = EventFilter[T]() intercept (block)
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
def dilated(implicit system: ActorSystem): FiniteDuration =
(duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration]
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:43,代码来源:package.scala
示例6: gracefulStop
//设置package包名称以及导入依赖的类
package akka.pattern
import akka.actor._
import akka.util.{ Timeout }
import akka.dispatch.sysmsg.{ Unwatch, Watch }
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.Success
import scala.concurrent.duration.FiniteDuration
trait GracefulStopSupport {
def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill): Future[Boolean] = {
if (target.isTerminated) Future successful true
else {
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), targetName = target.toString)
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
target.tell(stopMessage, Actor.noSender)
ref.result.future.transform(
{
case Terminated(t) if t.path == target.path ? true
case _ ? { internalTarget.sendSystemMessage(Unwatch(target, ref)); false }
},
t ? { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext)
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:29,代码来源:GracefulStopSupport.scala
示例7: ReceiveTimeout
//设置package包名称以及导入依赖的类
package akka.actor.dungeon
import ReceiveTimeout.emptyReceiveTimeoutData
import akka.actor.ActorCell
import akka.actor.ActorCell.emptyCancellable
import akka.actor.Cancellable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
private[akka] object ReceiveTimeout {
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable)
}
private[akka] trait ReceiveTimeout { this: ActorCell ?
import ReceiveTimeout._
import ActorCell._
private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
final def receiveTimeout: Duration = receiveTimeoutData._1
final def setReceiveTimeout(timeout: Duration): Unit = receiveTimeoutData = receiveTimeoutData.copy(_1 = timeout)
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
//Only reschedule if desired and there are currently no more messages to be processed
if (!mailbox.hasMessages) recvtimeout._1 match {
case f: FiniteDuration ?
recvtimeout._2.cancel() //Cancel any ongoing future
val task = system.scheduler.scheduleOnce(f, self, akka.actor.ReceiveTimeout)(this.dispatcher)
receiveTimeoutData = (f, task)
case _ ? cancelReceiveTimeout()
}
else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:46,代码来源:ReceiveTimeout.scala
示例8: PinnedDispatcher
//设置package包名称以及导入依赖的类
package akka.dispatch
import akka.actor.ActorCell
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
class PinnedDispatcher(
_configurator: MessageDispatcherConfigurator,
_actor: ActorCell,
_id: String,
_shutdownTimeout: FiniteDuration,
_threadPoolConfig: ThreadPoolConfig)
extends Dispatcher(_configurator,
_id,
Int.MaxValue,
Duration.Zero,
_threadPoolConfig.copy(corePoolSize = 1, maxPoolSize = 1),
_shutdownTimeout) {
@volatile
private var owner: ActorCell = _actor
//Relies on an external lock provided by MessageDispatcher.attach
protected[akka] override def register(actorCell: ActorCell) = {
val actor = owner
if ((actor ne null) && actorCell != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner = actorCell
super.register(actorCell)
}
//Relies on an external lock provided by MessageDispatcher.detach
protected[akka] override def unregister(actor: ActorCell) = {
super.unregister(actor)
owner = null
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:37,代码来源:PinnedDispatcher.scala
示例9: Subscribe
//设置package包名称以及导入依赖的类
package akka.stream.testkit
import org.reactivestreams.spi.{ Subscriber, Subscription }
import org.reactivestreams.api.Producer
import scala.concurrent.duration.FiniteDuration
import org.reactivestreams.api.Consumer
sealed trait ProducerEvent
case class Subscribe(subscription: Subscription) extends ProducerEvent
case class CancelSubscription(subscription: Subscription) extends ProducerEvent
case class RequestMore(subscription: Subscription, elements: Int) extends ProducerEvent
abstract case class ActiveSubscription[I](subscriber: Subscriber[I]) extends Subscription {
def sendNext(element: I): Unit
def sendComplete(): Unit
def sendError(cause: Exception): Unit
def expectCancellation(): Unit
def expectRequestMore(n: Int): Unit
def expectRequestMore(): Int
}
trait ProducerProbe[I] extends Producer[I] {
def expectSubscription(): ActiveSubscription[I]
def expectRequestMore(subscription: Subscription, n: Int): Unit
def expectNoMsg(): Unit
def expectNoMsg(max: FiniteDuration): Unit
def produceTo(consumer: Consumer[I]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:33,代码来源:ProducerProbe.scala
示例10: SimpleBackoffStrategy
//设置package包名称以及导入依赖的类
package sample.kamon
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
class SimpleBackoffStrategy(
private val backoffDelta: Long = 3000,
private val maxBackoff: Long = 30000
) extends BackoffStrategy {
private var time: Long = 0
def increment(): Unit = {
val newTime = time + backoffDelta
if(newTime > maxBackoff) {
time = maxBackoff
}
else {
time = newTime
}
}
def backoffTime: FiniteDuration = {
FiniteDuration(time, TimeUnit.MILLISECONDS)
}
def reset(): Unit = { time = 0 }
}
开发者ID:frossi85,项目名称:akka-kamon-sample,代码行数:30,代码来源:SimpleBackoffStrategy.scala
示例11: CastleConfig
//设置package包名称以及导入依赖的类
package com.box.castle.core.config
import java.util.concurrent.TimeUnit
import CastleConfig.{DefaultCacheSizeInBytes, DefaultBufferSizeInBytes, DefaultBrokerTimeout, DefaultGracefulShutdownTimeout}
import com.box.castle.router.RouterConfig
import com.box.kafka.Broker
import scala.concurrent.duration.FiniteDuration
case class CastleConfig(private val namespaceRaw: String,
brokers: Set[Broker],
leaderConfig: LeaderConfig,
committerConfigs: Iterable[CommitterConfig],
castleZookeeperConfig: CastleZookeeperConfig,
routerConfig: RouterConfig,
brokerTimeout: FiniteDuration = DefaultBrokerTimeout,
bufferSizeInBytes: Int = DefaultBufferSizeInBytes,
cacheSizeInBytes: Long = DefaultCacheSizeInBytes,
gracefulShutdownTimeout: FiniteDuration = DefaultGracefulShutdownTimeout) {
require(bufferSizeInBytes > 0, "bufferSizeInBytes must be positive")
require(brokers.nonEmpty, "must specify at least one broker")
require(committerConfigs.nonEmpty, "Must specify at least one committer config")
require(committerConfigs.map(cfg => cfg.id).toSet.size == committerConfigs.size, "Committer ids must be unique")
val namespace = namespaceRaw.trim()
require(namespace.replaceAll("[^A-Za-z0-9-_]", "_") == namespace, "Castle namespace must consist of alphanumeric characters, dashes (-), and underscores (_)")
}
object CastleConfig {
val DefaultBufferSizeInBytes: Int = (1024 * 1024 * 4) - 128 // 4 MB minus overhead
val DefaultCacheSizeInBytes: Long = 1024 * 1024 * 1024 // 1 GB
val DefaultBrokerTimeout = FiniteDuration(60, TimeUnit.SECONDS)
val DefaultGracefulShutdownTimeout = FiniteDuration(10, TimeUnit.SECONDS)
}
开发者ID:Box-Castle,项目名称:core,代码行数:38,代码来源:CastleConfig.scala
示例12: HermesGameTickerModuleSettings
//设置package包名称以及导入依赖的类
package proton.game.hermes
import java.time.Duration
import java.util.concurrent.TimeUnit
import proton.game.GameTickerModuleSettings
import scala.concurrent.duration.FiniteDuration
class HermesGameTickerModuleSettings(baseSettings: GameTickerModuleSettings,
val port: Int,
val chunkSize: Int,
val gameTimeoutDuration: Duration,
val namesDDataTimeoutDuration: Duration,
val chunkedTimeoutDuration: Duration,
val chunkedAppendTimeoutDuration: Duration,
val chunkedRepositoryTimeoutDuration: Duration)
extends GameTickerModuleSettings(baseSettings) {
val namesDDataTimeout = FiniteDuration(namesDDataTimeoutDuration.toNanos, TimeUnit.NANOSECONDS)
val gameTimeout = FiniteDuration(gameTimeoutDuration.toNanos, TimeUnit.NANOSECONDS)
val chunkedTimeout = FiniteDuration(chunkedTimeoutDuration.toNanos, TimeUnit.NANOSECONDS)
val chunkedAppendTimeout = FiniteDuration(chunkedAppendTimeoutDuration.toNanos, TimeUnit.NANOSECONDS)
val chunkedRepositoryTimeout = FiniteDuration(chunkedRepositoryTimeoutDuration.toNanos, TimeUnit.NANOSECONDS)
}
开发者ID:Morgan-Stanley,项目名称:proton,代码行数:25,代码来源:HermesGameTickerModuleSettings.scala
示例13: HermesGameTickerConfig
//设置package包名称以及导入依赖的类
package proton.game.hermes
import java.time.LocalDateTime
import java.util.UUID
import proton.game._
import scala.concurrent.duration.{FiniteDuration, _}
class HermesGameTickerConfig(val gameFile: UUID, override val players: Seq[Player],
override val startTime: Option[LocalDateTime],
override val tickDuration: FiniteDuration,
override val timeoutDuration: Option[FiniteDuration]) extends GameTickerConfig {
override val minPlayers: Option[Int] = Some(1)
override val maxPlayers: Option[Int] = None
}
case class HermesGameTickerConfigFactory(gameFile: UUID, players: Seq[PlayerIdentity], startTime: Option[LocalDateTime],
tickDuration: Option[FiniteDuration], timeoutDuration: Option[FiniteDuration])
extends GameTickerConfigFactory[HermesGameTickerConfig] {
override def build(): HermesGameTickerConfig = {
new HermesGameTickerConfig(gameFile, players.map(p => Player(p)), startTime,
tickDuration.getOrElse(200.milliseconds), timeoutDuration)
}
}
开发者ID:Morgan-Stanley,项目名称:proton,代码行数:27,代码来源:HermesGameConfig.scala
示例14: AccessTokenResponse
//设置package包名称以及导入依赖的类
package models
import play.api.libs.json.{Json, Writes}
import scala.concurrent.duration.FiniteDuration
case class AccessTokenResponse(accessToken: String,
expiresIn: FiniteDuration,
scope: List[String],
grantType: GrantType,
realm: String,
tokenType: TokenType)
object AccessTokenResponse {
implicit val accessTokenResponseWrites: Writes[AccessTokenResponse] = Writes(
(accessTokenResponse: AccessTokenResponse) =>
Json.obj(
"access_token" -> accessTokenResponse.accessToken,
"expires_in" -> accessTokenResponse.expiresIn.toSeconds,
"scope" -> accessTokenResponse.scope,
"grant_type" -> accessTokenResponse.grantType.id,
"token_type" -> accessTokenResponse.tokenType.id,
"realm" -> accessTokenResponse.realm
))
}
开发者ID:zalando-incubator,项目名称:OAuth2-mock-play,代码行数:26,代码来源:AccessTokenResponse.scala
示例15: TokeninfoResponse
//设置package包名称以及导入依赖的类
package models
import play.api.libs.json.{Json, Writes}
import scala.concurrent.duration.FiniteDuration
case class TokeninfoResponse(accessToken: String,
grantType: GrantType,
expiresIn: FiniteDuration,
tokenType: TokenType,
realm: String,
uid: String,
scope: List[String])
object TokeninfoResponse {
implicit val tokeninfoResponseWrites: Writes[TokeninfoResponse] = Writes(
(tokeninfoResponse: TokeninfoResponse) =>
Json.obj(
"access_token" -> tokeninfoResponse.accessToken,
"grant_type" -> tokeninfoResponse.grantType.id,
"expires_in" -> tokeninfoResponse.expiresIn.toSeconds,
"scope" -> tokeninfoResponse.scope,
"realm" -> tokeninfoResponse.realm,
"token_type" -> tokeninfoResponse.tokenType.id,
"uid" -> tokeninfoResponse.uid
)
)
}
开发者ID:zalando-incubator,项目名称:OAuth2-mock-play,代码行数:29,代码来源:TokeninfoResponse.scala
示例16: WebsocketController
//设置package包名称以及导入依赖的类
package walfie.gbf.raidfinder.server.controller
import akka.actor._
import akka.stream.scaladsl.Flow
import akka.stream.{Materializer, OverflowStrategy}
import monix.execution.Scheduler
import play.api.http.websocket.Message
import play.api.libs.streams._
import play.api.mvc._
import play.api.mvc.WebSocket.MessageFlowTransformer
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import walfie.gbf.raidfinder.domain._
import walfie.gbf.raidfinder.protocol._
import walfie.gbf.raidfinder.RaidFinder
import walfie.gbf.raidfinder.server.actor.WebsocketRaidsHandler
import walfie.gbf.raidfinder.server.util.MessageFlowTransformerUtil
import walfie.gbf.raidfinder.server.{BossNameTranslator, MetricsCollector}
class WebsocketController(
raidFinder: RaidFinder[ResponseMessage],
translator: BossNameTranslator,
keepAliveInterval: FiniteDuration,
metricsCollector: MetricsCollector
)(implicit system: ActorSystem, materializer: Materializer, scheduler: Scheduler) extends Controller {
private val jsonTransformer = MessageFlowTransformerUtil.protobufJsonMessageFlowTransformer
private val binaryTransformer = MessageFlowTransformerUtil.protobufBinaryMessageFlowTransformer
private val defaultTransformer = jsonTransformer
val flow = ActorFlow.actorRef(props = props)
transformer.transform(flow)
}
case None => Left {
val unsupportedProtocols = requestedProtocols.mkString("[", ", ", "]")
Results.BadRequest("Unsupported websocket subprotocols " + unsupportedProtocols)
}
}
Future.successful(result)
}
}
开发者ID:Einhalkzt,项目名称:GBF-RAid-finder,代码行数:43,代码来源:WebsocketController.scala
示例17: ServerAnnouncementActor
//设置package包名称以及导入依赖的类
package com.init6.servers
import java.util.concurrent.TimeUnit
import akka.actor.{Cancellable, Props}
import com.init6.Constants._
import com.init6.coders.commands.{BroadcastCommand, Command}
import com.init6.{Config, Init6Actor, Init6Component}
import scala.concurrent.duration.{Duration, FiniteDuration}
object ServerAnnouncementActor extends Init6Component {
def apply(timeToDrop: Long) =
system.actorOf(Props(classOf[ServerAnnouncementActor], timeToDrop), INIT6_SERVER_ANNOUNCEMENT_PATH)
}
case class RepeatingAnnoucement(message: String, duration: FiniteDuration) extends Command
class ServerAnnouncementActor(timeToDrop: Long) extends Init6Actor {
var announcement: Option[Cancellable] = None
override def preStart() = {
super.preStart()
if (!Config().Server.Chat.enabled) {
import context.dispatcher
system.scheduler.scheduleOnce(Duration(
timeToDrop - 15, TimeUnit.SECONDS
))({
usersActor ! BroadcastCommand(WILL_DROP_IN(Config().Server.host, 15))
})
}
}
override def receive = {
case RepeatingAnnoucement(message, duration) =>
import context.dispatcher
if (announcement.isDefined) {
announcement.get.cancel()
}
announcement = Some(system.scheduler.schedule(
Duration(0, TimeUnit.MILLISECONDS),
duration
)({
usersActor ! BroadcastCommand(message)
}))
}
}
开发者ID:fjaros,项目名称:init6,代码行数:52,代码来源:ServerAnnouncementActor.scala
示例18: every
//设置package包名称以及导入依赖的类
package frontend
import akka.stream.{FlowShape, Attributes}
import akka.stream.scaladsl._
import scala.concurrent.duration.{Duration, FiniteDuration}
trait StreamSupport {
def every[T](interval: FiniteDuration): Flow[T, T, akka.NotUsed] =
Flow.fromGraph(
GraphDSL.create() { implicit b ?
import GraphDSL.Implicits._
val zip = b.add(ZipWith[T, Unit, T](Keep.left).withAttributes(Attributes.inputBuffer(1, 1)))
val dropOne = b.add(Flow[T].drop(1))
Source.tick(Duration.Zero, interval, ()) ~> zip.in1
zip.out ~> dropOne.in
FlowShape(zip.in0, dropOne.outlet)
}
)
def responseWindow(duration: FiniteDuration): Flow[play.api.libs.json.JsArray, play.api.libs.json.JsArray, akka.NotUsed] =
(Flow[play.api.libs.json.JsArray].conflate((array, _) ? array)
.zipWith(Source.tick(duration, duration, ()))(Keep.left))
.scan(play.api.libs.json.JsArray(Seq.empty[play.api.libs.json.JsValue]))((_, stats) => stats)
.withAttributes(Attributes.inputBuffer(1, 1))
}
开发者ID:haghard,项目名称:scenter-frontend,代码行数:27,代码来源:StreamSupport.scala
示例19: create
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.ironmq
import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.stream.alpakka.ironmq.ConfigIronMqSettings.ConfigConsumerSettings
import akka.stream.alpakka.ironmq.IronMqSettings.ConsumerSettings
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
def create(as: ActorSystem): IronMqSettings = apply()(as)
}
object ConfigIronMqSettings {
class ConfigConsumerSettings(config: Config) extends ConsumerSettings {
override val bufferMinSize: Int = config.getInt("buffer-min-size")
override val bufferMaxSize: Int = config.getInt("buffer-max-size")
override val fetchInterval: FiniteDuration = config.getDuration("fetch-interval").asScala
override val pollTimeout: FiniteDuration = config.getDuration("poll-timeout").asScala
override val reservationTimeout: FiniteDuration = config.getDuration("reservation-timeout").asScala
}
}
class ConfigIronMqSettings(config: Config) extends IronMqSettings {
override val endpoint: Uri = config.getString("endpoint")
override val projectId: String = config.getString("credentials.project-id")
override val token: String = config.getString("credentials.token")
override def consumerSettings: ConsumerSettings = new ConfigConsumerSettings(config.getConfig("consumer"))
}
开发者ID:akka,项目名称:alpakka,代码行数:34,代码来源:IronMqSettings.scala
示例20: ShardSettings
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.kinesis
import java.util.Date
import com.amazonaws.services.kinesis.model.ShardIteratorType
import scala.concurrent.duration.FiniteDuration
object ShardSettings {
def create(streamName: String,
shardId: String,
shardIteratorType: ShardIteratorType,
timestamp: Date,
refreshInterval: FiniteDuration,
limit: Integer) =
ShardSettings(streamName, shardId, shardIteratorType, None, Some(timestamp), refreshInterval, limit)
}
case class ShardSettings(streamName: String,
shardId: String,
shardIteratorType: ShardIteratorType,
startingSequenceNumber: Option[String] = None,
atTimestamp: Option[java.util.Date] = None,
refreshInterval: FiniteDuration,
limit: Int) {
require(
limit >= 1 && limit <= 10000,
"Limit must be between 0 and 10000. See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html"
)
shardIteratorType match {
case ShardIteratorType.AFTER_SEQUENCE_NUMBER | ShardIteratorType.AT_SEQUENCE_NUMBER =>
require(startingSequenceNumber.nonEmpty)
case ShardIteratorType.AT_TIMESTAMP => require(atTimestamp.nonEmpty)
case _ => ()
}
}
开发者ID:akka,项目名称:alpakka,代码行数:40,代码来源:ShardSettings.scala
注:本文中的scala.concurrent.duration.FiniteDuration类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论