• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Scheduler类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.actor.Scheduler的典型用法代码示例。如果您正苦于以下问题:Scala Scheduler类的具体用法?Scala Scheduler怎么用?Scala Scheduler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Scheduler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ErrorHandling

//设置package包名称以及导入依赖的类
package au.csiro.data61.magda.util

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import akka.actor.Scheduler
import akka.pattern.after

object ErrorHandling {
  
  def retry[T](op: () => Future[T], delay: FiniteDuration, retries: Int, onRetry: (Int, Throwable) => Unit = (_, _) => {})(implicit ec: ExecutionContext, s: Scheduler): Future[T] =
    Future { op() } flatMap (x => x) recoverWith {
      case e: Throwable if retries > 0 => after(delay, s)({
        onRetry(retries - 1, e)
        retry(op, delay, retries - 1, onRetry)
      })
    }

  object CausedBy {
    def unapply(e: Throwable): Option[Throwable] = Option(e.getCause)
  }

  object RootCause {
    def unapply(e: Throwable): Option[Throwable] = Option(getRootCause(e))
  }

  def getRootCause(e: Throwable): Throwable =
    Option(e.getCause) match {
      case Some(cause) => getRootCause(cause)
      case None        => e
    }
} 
开发者ID:TerriaJS,项目名称:magda,代码行数:33,代码来源:ErrorHandling.scala


示例2: getClient

//设置package包名称以及导入依赖的类
package au.csiro.data61.magda.search.elasticsearch

import akka.actor.Scheduler
import akka.event.LoggingAdapter
import au.csiro.data61.magda.AppConfig
import au.csiro.data61.magda.util.ErrorHandling.retry
import com.sksamuel.elastic4s.{ TcpClient, ElasticsearchClientUri }
import org.elasticsearch.common.settings.Settings

import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._

trait ClientProvider {
  def getClient(implicit scheduler: Scheduler, logger: LoggingAdapter, ec: ExecutionContext): Future[TcpClient]
}

class DefaultClientProvider extends ClientProvider {
  private var clientFuture: Option[Future[TcpClient]] = None

  override def getClient(implicit scheduler: Scheduler, logger: LoggingAdapter, ec: ExecutionContext): Future[TcpClient] = {
    val outerFuture = clientFuture match {
      case Some(future) => future
      case None =>
        val future = retry(() => Future {
          val uri = ElasticsearchClientUri(AppConfig.conf().getString("elasticSearch.serverUrl"))
          val settings = Settings.builder().put("cluster.name", "myesdb").build()
          TcpClient.transport(settings, uri)
        }, 10 seconds, 10, onRetry(logger))
          .map { client =>
            logger.info("Successfully connected to elasticsearch client")
            client
          }

        clientFuture = Some(future)

        future
    }

    outerFuture
  }

  private def onRetry(logger: LoggingAdapter)(retriesLeft: Int, error: Throwable) = logger.error("Failed to make initial contact with ES server, {} retries left", retriesLeft, error)
} 
开发者ID:TerriaJS,项目名称:magda,代码行数:44,代码来源:ClientProvider.scala


示例3: apply

//设置package包名称以及导入依赖的类
package mesosphere.marathon.util

import java.util.concurrent.TimeUnit

import akka.actor.Scheduler
import mesosphere.util.{ CallerThreadExecutionContext, DurationToHumanReadable }

import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future, Promise, blocking => blockingCall }


  def apply[T](timeout: Duration)(f: => Future[T])(implicit
    scheduler: Scheduler,
    ctx: ExecutionContext): Future[T] = {
    require(timeout != Duration.Zero)

    if (timeout.isFinite()) {
      val promise = Promise[T]()
      val finiteTimeout = FiniteDuration(timeout.toNanos, TimeUnit.NANOSECONDS)
      val token = scheduler.scheduleOnce(finiteTimeout) {
        promise.tryFailure(new TimeoutException(s"Timed out after ${timeout.toHumanReadable}"))
      }
      val result = f
      result.onComplete { res =>
        promise.tryComplete(res)
        token.cancel()
      }(CallerThreadExecutionContext.callerThreadExecutionContext)
      promise.future
    } else {
      f
    }
  }
} 
开发者ID:xiaozai512,项目名称:marathon,代码行数:34,代码来源:Timeout.scala


示例4: DeleteAction

//设置package包名称以及导入依赖的类
package eventsource.slick

import akka.actor.Scheduler
import eventsource.models.{Action, Entity, Event}
import play.api.Logger
import slick.dbio.DBIOAction
import slick.jdbc.JdbcProfile

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

class DeleteAction[Key, ETT, E <: Entity[Key]](
    adapter: SlickAdapter[Key, ETT, E],
    val driver: JdbcProfile,
    db: JdbcProfile#Backend#Database,
    override val logger: Logger
)(implicit ec: ExecutionContext, s: Scheduler)
    extends Action[Key, ETT, Option[E]]
    with Retrying {

  import driver.api._

  def errorHandler: PartialFunction[(Throwable, Int), Future[Option[E]]] =
    PartialFunction.empty

  override def processEvent(event: Event[Key, ETT]): Future[Option[E]] =
    retry(10, 100.millis, errorHandler) {
      db.run(adapter
        .findByEventWithLock(event)
        .flatMap {
          case Some(entity) if event.version.isNewerThan(entity.lastModifiedVersion) =>
            logger.debug(
              s"Got newer lastModifiedVersion ${event.version} > ${entity.lastModifiedVersion} for ${event.key}. Deleting ...")
            adapter.deleteByEvent(event).map(_ => Some(entity))
          case Some(entity) =>
            logger.debug(
              s"Got older lastModifiedVersion ${event.version} <= ${entity.lastModifiedVersion} for ${event.key}. Ignoreing ...")
            DBIOAction.successful(None)
          case None =>
            logger.debug(s"Entity ${event.key} does not exists, ignore delete")
            DBIOAction.successful(None)
        }
        .transactionally)
    }
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:46,代码来源:DeleteAction.scala


示例5: CreateAction

//设置package包名称以及导入依赖的类
package eventsource.slick

import akka.actor.Scheduler
import eventsource.models.{Action, Entity, Event}
import play.api.Logger
import slick.dbio.DBIOAction
import slick.jdbc.JdbcProfile

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

abstract class CreateAction[Key, ETT, E <: Entity[Key]](
    adapter: SlickAdapter[Key, ETT, E],
    val driver: JdbcProfile,
    db: JdbcProfile#Backend#Database,
    override val logger: Logger
)(implicit ec: ExecutionContext, s: Scheduler)
    extends Action[Key, ETT, Option[E]]
    with Retrying {

  protected def entityFromEvent(event: Event[Key, ETT]): E

  import driver.api._

  def errorHandler: PartialFunction[(Throwable, Int), Future[Option[E]]] =
    PartialFunction.empty

  override def processEvent(event: Event[Key, ETT]): Future[Option[E]] =
    retry(10, 100.millis, errorHandler) {
      db.run(adapter
        .findByEventWithLock(event)
        .flatMap {
          case Some(entity) if entity.createdVersion == event.version =>
            logger.debug(s"createdVersion=${event.version} already exists. Nothing to do...")
            DBIOAction.successful(Some(entity))
          case Some(entity) if entity.createdVersion.isNewerThan(event.version) =>
            logger.debug(
              s"Got older createdVersion ${event.version} < ${entity.createdVersion} for ${event.key}. Updating ...")
            val olderEntity = entityFromEvent(event)
            logger.debug(s"Update with $olderEntity")
            adapter.update(olderEntity).map(_ => Some(olderEntity))
          case Some(entity) =>
            logger.debug(
              s"Got newer createdVersion ${event.version} >= ${entity.createdVersion} for ${event.key}. Ignoring ...")
            DBIOAction.successful(None)
          case None =>
            val entity = entityFromEvent(event)
            logger.debug(s"Insert new $entity")
            adapter.insert(entity).map(_ => Some(entity))
        }
        .transactionally)
    }
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:54,代码来源:CreateAction.scala


示例6: logger

//设置package包名称以及导入依赖的类
package eventsource.slick

import akka.actor.Scheduler
import akka.pattern.after
import play.api.LoggerLike

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

trait Retrying {
  def logger: LoggerLike

  def retry[T](retries: Int,
               delay: FiniteDuration,
               errorHandler: PartialFunction[(Throwable, Int), Future[T]] = PartialFunction.empty)(
      f: => Future[T])(implicit ec: ExecutionContext, s: Scheduler): Future[T] = {
    f.recoverWith {
      case e if errorHandler.isDefinedAt(e, retries) =>
        errorHandler.apply(e, retries)
      case e if retries > 0 =>
        logger.warn(s"Retrying on ${e.getMessage}")
        after(delay, s)(retry(retries - 1, delay, errorHandler)(f))
      case e: Throwable =>
        logger.error("Retries exhausted", e)
        Future.failed(e)
    }
  }
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:29,代码来源:Retrying.scala


示例7: SimpleUpdateAction

//设置package包名称以及导入依赖的类
package eventsource.slick

import akka.actor.Scheduler
import eventsource.models.{Action, Entity, Event}
import play.api.Logger
import slick.dbio.DBIOAction
import slick.jdbc.JdbcProfile

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

abstract class SimpleUpdateAction[Key, ETT, E <: Entity[Key]](
    adapter: SlickAdapter[Key, ETT, E],
    val driver: JdbcProfile,
    db: JdbcProfile#Backend#Database,
    override val logger: Logger
)(implicit ec: ExecutionContext, s: Scheduler)
    extends Action[Key, ETT, Option[E]]
    with Retrying {

  import driver.api._

  def updateEntity(entity: E, event: Event[Key, ETT]): E

  def errorHandler: PartialFunction[(Throwable, Int), Future[Option[E]]] =
    PartialFunction.empty

  override def processEvent(event: Event[Key, ETT]): Future[Option[E]] =
    retry(10, 100.millis, errorHandler) {
      db.run(adapter
        .findByEventWithLock(event)
        .flatMap {
          case Some(entity) if event.version == entity.lastModifiedVersion =>
            logger.debug(s"lastModifiedVersion=${event.version} already exists. Nothing to do")
            DBIOAction.successful(Some(entity))
          case Some(entity) if event.version.isNewerThan(entity.lastModifiedVersion) =>
            logger.debug(
              s"Got newer lastModifiedVersion ${event.version} > ${entity.lastModifiedVersion} for ${event.key}. Updating ...")

            val newerEntity = updateEntity(entity, event)
            logger.debug(s"Update with $newerEntity")
            adapter.update(newerEntity).map(_ => Some(newerEntity))
          case Some(entity) =>
            logger.debug(
              s"Got older lastModifiedVersion ${event.version} <= ${entity.lastModifiedVersion} for ${event.key}. Ignoreing ...")
            DBIOAction.successful(None)
          case None =>
            logger.debug(s"Entity ${event.key} does not exists, ignore update")
            DBIOAction.successful(None)
        }
        .transactionally)
    }
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:54,代码来源:SimpleUpdateAction.scala


示例8: ChangeAction

//设置package包名称以及导入依赖的类
package eventsource.slick

import akka.actor.Scheduler
import eventsource.models.{Action, Event, VersionedEntity}
import play.api.Logger
import slick.jdbc.JdbcProfile

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

abstract class ChangeAction[Key, ETT, E <: VersionedEntity](
    val driver: JdbcProfile,
    db: JdbcProfile#Backend#Database,
    override val logger: Logger)(implicit ec: ExecutionContext, s: Scheduler)
    extends Action[Key, ETT, Option[E]]
    with Retrying {

  import driver.api._

  def findByEventWithLock(event: Event[Key, ETT]): DBIOAction[Option[E], NoStream, Effect.Read]

  def applyChange(currentEntity: Option[E],
                  event: Event[Key, ETT]): DBIOAction[Option[E], NoStream, Effect.All]

  def errorHandler: PartialFunction[(Throwable, Int), Future[Option[E]]] =
    PartialFunction.empty

  override def processEvent(event: Event[Key, ETT]): Future[Option[E]] = {
    retry(10, 100.millis, errorHandler) {
      db.run(findByEventWithLock(event).flatMap {
        case Some(entity) if event.version == entity.lastModifiedVersion =>
          logger.debug(s"lastModifiedVersion=${event.version} already exists. Nothing to do")
          DBIO.successful(Some(entity))
        case Some(entity) if event.version.isNewerThan(entity.lastModifiedVersion) =>
          logger.debug(
            s"Got newer lastModifiedVersion ${event.version} > ${entity.lastModifiedVersion} for ${event.key}. Updating ...")

          applyChange(Some(entity), event)
        case Some(entity) =>
          logger.debug(
            s"Got older lastModifiedVersion ${event.version} <= ${entity.lastModifiedVersion} for ${event.key}. Ignoreing ...")
          DBIO.successful(None)
        case None =>
          logger.debug(s"Entity ${event.key} does not exists, ignore update")
          applyChange(None, event)
      }.transactionally)
    }
  }
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:50,代码来源:ChangeAction.scala


示例9: UpdateAction

//设置package包名称以及导入依赖的类
package eventsource.slick

import akka.actor.Scheduler
import eventsource.models.{Action, Entity, Event}
import play.api.Logger
import slick.jdbc.JdbcProfile

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

abstract class UpdateAction[Key, ETT, E <: Entity[Key]](
    adapter: SlickAdapter[Key, ETT, E],
    val driver: JdbcProfile,
    db: JdbcProfile#Backend#Database,
    override val logger: Logger
)(implicit ec: ExecutionContext, s: Scheduler)
    extends Action[Key, ETT, Option[E]]
    with Retrying {

  import driver.api._

  def errorHandler: PartialFunction[(Throwable, Int), Future[Option[E]]] =
    PartialFunction.empty

  def applyUpdate(currentEntity: E,
                  event: Event[Key, ETT]): DBIOAction[Option[E], NoStream, Effect.All]

  override def processEvent(event: Event[Key, ETT]): Future[Option[E]] =
    retry(10, 100.millis, errorHandler) {
      db.run(adapter
        .findByEventWithLock(event)
        .flatMap {
          case Some(entity) if event.version == entity.lastModifiedVersion =>
            logger.debug(s"lastModifiedVersion=${event.version} already exists. Nothing to do")
            DBIO.successful(Some(entity))
          case Some(entity) if event.version.isNewerThan(entity.lastModifiedVersion) =>
            logger.debug(
              s"Got newer lastModifiedVersion ${event.version} > ${entity.lastModifiedVersion} for ${event.key}. Updating ...")

            applyUpdate(entity, event)
          case Some(entity) =>
            logger.debug(
              s"Got older lastModifiedVersion ${event.version} <= ${entity.lastModifiedVersion} for ${event.key}. Ignoreing ...")
            DBIO.successful(None)
          case None =>
            logger.debug(s"Entity ${event.key} does not exists, ignore update")
            DBIO.successful(None)
        }
        .transactionally)
    }
} 
开发者ID:21re,项目名称:play-eventsource,代码行数:52,代码来源:UpdateAction.scala


示例10: StatisticsCollector

//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.perftest.util

import akka.actor.{ActorSystem, Cancellable, Scheduler}
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class StatisticsCollector[T: Ordering](scheduler: Scheduler,
                                       interval: FiniteDuration,
                                       id: String)
                                      (collect: => Future[T])
                                      (implicit ec: ExecutionContext) extends LazyLogging {

  @volatile private var histogram: Histogram[T] = Histogram.empty[T]

  def start(): Started = {
    val cancellable = scheduler.schedule(0 seconds, interval) {
      collect andThen {
        case Success(value) =>
          logger.debug(s"$id: fetched $value")
          histogram = histogram.withValue(value)
        case Failure(ex) =>
          logger.error("Error while collecting statistics. Histogram won't be complete", ex)
      }
    }
    new Started(cancellable)
  }

  class Started(cancellable: Cancellable) {

    def stop(): Stopped = {
      cancellable.cancel()
      new Stopped(histogram)
    }

  }

  class Stopped(val histogram: Histogram[T])

}

object StatisticsCollector {

  def apply[T: Ordering](system: ActorSystem,
                         interval: FiniteDuration,
                         id: String)
                        (collect: => Future[T]) = {
    import system.dispatcher
    new StatisticsCollector[T](
      scheduler = system.scheduler,
      interval = interval,
      id = id)(
      collect = collect)
  }

} 
开发者ID:TouK,项目名称:nussknacker,代码行数:59,代码来源:StatisticsCollector.scala


示例11: SyncBlockBodiesRequestHandler

//设置package包名称以及导入依赖的类
package io.iohk.ethereum.blockchain.sync

import akka.actor.{ActorRef, Props, Scheduler}
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.SyncController.BlockBodiesReceived
import io.iohk.ethereum.network.Peer
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBodies, GetBlockBodies}
import org.spongycastle.util.encoders.Hex

class SyncBlockBodiesRequestHandler(
    peer: Peer,
    etcPeerManager: ActorRef,
    peerMessageBus: ActorRef,
    requestedHashes: Seq[ByteString])(implicit scheduler: Scheduler)
  extends SyncRequestHandler[GetBlockBodies, BlockBodies](peer, etcPeerManager, peerMessageBus) {

  override val requestMsg = GetBlockBodies(requestedHashes)
  override val responseMsgCode: Int = BlockBodies.code

  override def handleResponseMsg(blockBodies: BlockBodies): Unit = {
    if (blockBodies.bodies.isEmpty) {
      val reason = s"got empty block bodies response for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}"
      syncController ! BlacklistSupport.BlacklistPeer(peer.id, reason)
      syncController ! FastSync.EnqueueBlockBodies(requestedHashes)
    } else {
      syncController ! BlockBodiesReceived(peer, requestedHashes, blockBodies.bodies)
    }

    log.info("Received {} block bodies in {} ms", blockBodies.bodies.size, timeTakenSoFar())
    cleanupAndStop()
  }

  override def handleTimeout(): Unit = {
    val reason = s"time out on block bodies response for known hashes: ${requestedHashes.map(h => Hex.toHexString(h.toArray[Byte]))}"
    syncController ! BlacklistSupport.BlacklistPeer(peer.id, reason)
    syncController ! FastSync.EnqueueBlockBodies(requestedHashes)
    cleanupAndStop()
  }

  override def handleTerminated(): Unit = {
    syncController ! FastSync.EnqueueBlockBodies(requestedHashes)
    cleanupAndStop()
  }

}

object SyncBlockBodiesRequestHandler {
  def props(peer: Peer, etcPeerManager: ActorRef, peerMessageBus: ActorRef, requestedHashes: Seq[ByteString])
           (implicit scheduler: Scheduler): Props =
    Props(new SyncBlockBodiesRequestHandler(peer, etcPeerManager, peerMessageBus, requestedHashes))
} 
开发者ID:input-output-hk,项目名称:etc-client,代码行数:52,代码来源:SyncBlockBodiesRequestHandler.scala


示例12: scheduler

//设置package包名称以及导入依赖的类
package io.iohk.ethereum.blockchain.sync

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.{Actor, ActorLogging, Cancellable, Scheduler}
import io.iohk.ethereum.network.PeerId

trait BlacklistSupport {
  selfActor: Actor with ActorLogging =>

  import BlacklistSupport._

  def scheduler: Scheduler

  var blacklistedPeers: Seq[(PeerId, Cancellable)] = Nil

  def blacklist(peerId: PeerId, duration: FiniteDuration, reason: String): Unit = {
    undoBlacklist(peerId)
    log.debug(s"Blacklisting peer ($peerId), $reason")
    val unblacklistCancellable = scheduler.scheduleOnce(duration, self, UnblacklistPeer(peerId))
    blacklistedPeers :+= (peerId, unblacklistCancellable)
  }

  def undoBlacklist(peerId: PeerId): Unit = {
    blacklistedPeers.find(_._1 == peerId).foreach(_._2.cancel())
    blacklistedPeers = blacklistedPeers.filterNot(_._1 == peerId)
  }

  def isBlacklisted(peerId: PeerId): Boolean =
    blacklistedPeers.exists(_._1 == peerId)
}

object BlacklistSupport {
  case class BlacklistPeer(peerId: PeerId, reason: String)
  case class UnblacklistPeer(peerId: PeerId)
} 
开发者ID:input-output-hk,项目名称:etc-client,代码行数:37,代码来源:BlacklistSupport.scala


示例13: start

//设置package包名称以及导入依赖的类
package me.invkrh.raft.core

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import akka.actor.{ActorRef, Cancellable, Scheduler}

import me.invkrh.raft.message.TimerMessage

trait Timer {
  protected var cancellable: Cancellable = _

  def start(): Unit
  def stop(): Unit = {
    if (cancellable != null && !cancellable.isCancelled) {
      cancellable.cancel()
    }
  }
  def restart(): Unit = {
    stop()
    start()
  }
}

class RandomizedTimer(min: FiniteDuration, max: FiniteDuration, event: TimerMessage)(
  implicit scheduler: Scheduler,
  target: ActorRef
) extends Timer {
  def start(): Unit = {
    require(target != null, "Timer target can not be null")
    val rd = min.toMillis + Random.nextInt((max.toMillis - min.toMillis + 1).toInt)
    cancellable = scheduler.scheduleOnce(rd milliseconds, target, event)
  }
}

class PeriodicTimer(
  duration: FiniteDuration,
  event: TimerMessage
)(implicit scheduler: Scheduler, target: ActorRef)
    extends Timer {
  def start(): Unit = {
    require(target != null, "Timer target can not be null")
    cancellable = scheduler.schedule(Duration.Zero, duration, target, event)
  }
} 
开发者ID:invkrh,项目名称:akka-raft,代码行数:48,代码来源:Timer.scala


示例14: Retry

//设置package包名称以及导入依赖的类
package com.lightbend.lagom.internal.javadsl.persistence.jpa

import java.util.concurrent.CompletionStage
import java.util.function.Supplier

import akka.actor.Scheduler
import akka.pattern.after

import scala.concurrent.duration.Duration.fromNanos
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal

// With thanks to https://gist.github.com/viktorklang/9414163
private[lagom] class Retry(delay: FiniteDuration, delayFactor: Double, maxRetries: Int) {
  def apply[T](op: => T)(implicit ec: ExecutionContext, s: Scheduler): Future[T] = {
    def iterate(nextDelay: FiniteDuration, remainingRetries: Int): Future[T] =
      Future(op) recoverWith {
        case NonFatal(throwable) if remainingRetries > 0 => {
          onRetry(throwable, nextDelay, remainingRetries)
          after(nextDelay, s)(iterate(finiteMultiply(nextDelay, delayFactor), remainingRetries - 1))
        }
      }

    iterate(delay, maxRetries)
  }

  // For convenient use from Java 8
  def retry[T](op: Supplier[T])(implicit ec: ExecutionContext, s: Scheduler): CompletionStage[T] = {
    import scala.compat.java8.FutureConverters._

    apply(op.get()).toJava
  }

  protected def onRetry(throwable: Throwable, delay: FiniteDuration, remainingRetries: Int): Unit = ()

  private def finiteMultiply(duration: FiniteDuration, factor: Double): FiniteDuration =
    fromNanos((duration.toNanos * factor).toLong)
}

private[lagom] object Retry {
  def apply[T](delay: FiniteDuration, delayFactor: Double, maxRetries: Int)(op: => T)(implicit ec: ExecutionContext, s: Scheduler): Future[T] =
    (new Retry(delay, delayFactor, maxRetries))(op)
} 
开发者ID:lagom,项目名称:lagom,代码行数:45,代码来源:Retry.scala


示例15: JobSchedulerServiceTest

//设置package包名称以及导入依赖的类
package io.cronit.services

import akka.actor.{ActorRef, ActorSystem, Scheduler}
import akka.testkit.{TestKit, TestProbe}
import io.cronit.builder.RestJobModelBuilder
import io.cronit.utils.{Clock, DateUtils}
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.specs2.mock.Mockito

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

class JobSchedulerServiceTest extends TestKit(ActorSystem("CronitTestActorSystem")) with ActorSystemComponent with FlatSpecLike
  with Matchers
  with Mockito
  with JobSchedulerComponent
  with CronExpressionComponent
  with BeforeAndAfterAll {

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  override val actorSystemService: ActorSystemService = new ActorSystemService {
    override val scheduler: Scheduler = mock[Scheduler]
  }
  override val jobSchedulerService: JobSchedulerService = new JobSchedulerService {
    override val restTaskActor: ActorRef = TestProbe().ref
  }
  override val cronExpressionService: CronExpressionService = mock[CronExpressionService]

  it should "calculate next execution delay when scheduling a task" in {
    Clock.freeze(DateUtils.toDate("20180202"))

    val restJob = RestJobModelBuilder.sampleRestJobWithCronScheduler
    when(cronExpressionService.getNextExecutionDate("* * * * *")).thenReturn(Clock.now)
    when(cronExpressionService.getFiniteDurationFromNow(Clock.now())).thenReturn(5 minutes)

    jobSchedulerService.scheduleTask(restJob)

    verify(actorSystemService.scheduler).scheduleOnce(5 minutes, jobSchedulerService.restTaskActor, restJob)
    Clock.unfreeze()
  }

  it should "calculate next execution delay when schedule once task" in {
    Clock.freeze(DateUtils.toDate("20180202"))

    val restJob = RestJobModelBuilder.sampleRestJobWithScheduleOnce
    when(cronExpressionService.getFiniteDurationFromNow(Clock.now())).thenReturn(15 minutes)

    jobSchedulerService.scheduleTask(restJob)

    verify(actorSystemService.scheduler).scheduleOnce(15 minutes, jobSchedulerService.restTaskActor, restJob)
    Clock.unfreeze()
  }
} 
开发者ID:212data,项目名称:cronit-service,代码行数:58,代码来源:JobSchedulerServiceTest.scala


示例16: Ocb

//设置package包名称以及导入依赖的类
package com.pcb.db

import akka.actor.Scheduler
import akka.pattern.CircuitBreaker
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps

class Ocb (
  scheduler: Scheduler, 
  var maxFailures: Int, 
  callTimeout: Int, 
  var resetTimeout: Int)(implicit executor: ExecutionContext) {

  private var cb = newCircuitBreaker

  private def newCircuitBreaker: CircuitBreaker = {
     new CircuitBreaker(
      scheduler,
      maxFailures = maxFailures,
      callTimeout = callTimeout.seconds,
      resetTimeout = resetTimeout.minute)
  }

  def reconfigure(mf: Int, rt: Int): Boolean = {
    var reconfigured = false
    maxFailures = mf
    resetTimeout = rt

    if(cb.isClosed) {
      cb = newCircuitBreaker
      reconfigured = true
    }
   
    reconfigured 
  }  

  def guard[T](body: => Future[T]): Future[T] = cb.withCircuitBreaker(body)
} 
开发者ID:bahadley,项目名称:akka-slick-demo,代码行数:40,代码来源:Ocb.scala


示例17: selfAddress

//设置package包名称以及导入依赖的类
package tanukki.akka.cluster.autodown

import akka.actor.{Address, ActorLogging, Scheduler}
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterDomainEvent
import scala.concurrent.duration._

trait ClusterCustomDowning extends ActorLogging { base: CustomAutoDownBase =>

  val cluster = Cluster(context.system)

  override def selfAddress: Address = cluster.selfAddress

  override def scheduler: Scheduler = {
    if (context.system.scheduler.maxFrequency < 1.second / cluster.settings.SchedulerTickDuration) {
      log.warning("CustomDowning does not use a cluster dedicated scheduler. Cluster will use a dedicated scheduler if configured " +
        "with 'akka.scheduler.tick-duration' [{} ms] >  'akka.cluster.scheduler.tick-duration' [{} ms].",
        (1000 / context.system.scheduler.maxFrequency).toInt, cluster.settings.SchedulerTickDuration.toMillis)
    }
    context.system.scheduler
  }

  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[ClusterDomainEvent])
  }
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }
} 
开发者ID:TanUkkii007,项目名称:akka-cluster-custom-downing,代码行数:30,代码来源:ClusterCustomDowning.scala


示例18: CircuitBreakerHandler

//设置package包名称以及导入依赖的类
package com.unstablebuild.autobreaker

import java.lang.reflect.{InvocationHandler, InvocationTargetException, Method}

import akka.actor.Scheduler
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.{ExecutionContext, Future}
import scala.{Proxy => BaseProxy}

class CircuitBreakerHandler(val self: Any, breaker: AutoBreaker)(implicit ec: ExecutionContext, scheduler: Scheduler)
  extends InvocationHandler
    with BaseProxy
    with StrictLogging {

  private val future = classOf[Future[_]]

  override def invoke(proxy: Any, method: Method, args: Array[AnyRef]): AnyRef = {

    def callMethod: AnyRef = method.invoke(self, args: _*)

    try {
      method.getReturnType match {
        case `future` if canWrap(method) =>
          breaker.call(future.cast(callMethod))
        case _ =>
          callMethod
      }
    } catch {
      case e: InvocationTargetException => throw e.getTargetException
    }
  }

  private def canWrap(method: Method): Boolean =
    !self.getClass.getMethod(method.getName, method.getParameterTypes: _*).isAnnotationPresent(classOf[NoCircuitBreaker])

} 
开发者ID:lucastorri,项目名称:autobreaker,代码行数:38,代码来源:CircuitBreakerHandler.scala


示例19: retry

//设置package包名称以及导入依赖的类
package org.bjean.sample.wordcount.aws.support

import akka.actor.Scheduler

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import akka.pattern.after

trait Retrying {
  def retry[T](op: => T, retries: Int)(implicit ec: ExecutionContext): Future[T] = {
    val eventualT: Future[T] = Future(op)
    eventualT recoverWith {
      case _ if retries > 0 => retry(op, retries - 1)
    }
  }

  def retry[T](eventualT: => Future[T], delay: FiniteDuration, retries: Int)(implicit ec: ExecutionContext, s: Scheduler): Future[T] = {
    eventualT recoverWith {
      case _ if retries > 0 => after(delay, s)(retry(eventualT, delay, retries - 1))
    }
  }
} 
开发者ID:bjet007,项目名称:word-count-spark-aws,代码行数:23,代码来源:Retrying.scala


示例20: AppComponents

//设置package包名称以及导入依赖的类
package application

import akka.actor.Scheduler
import application.filters.ExampleFilter
import com.typesafe.config.Config
import controllers.HomeController
import play.api.ApplicationLoader.Context
import play.api.BuiltInComponentsFromContext
import play.api.cache.EhCacheComponents
import router.Routes

import scala.concurrent.ExecutionContext


class AppComponents(context: Context)(implicit val ec: ExecutionContext)
    extends BuiltInComponentsFromContext(context)
    with EhCacheComponents {

  val config: Config                = context.initialConfiguration.underlying
  implicit val scheduler: Scheduler = actorSystem.scheduler

  lazy val homeController = new HomeController()

  // order matters - should be the same as routes file
  lazy val router = new Routes(
    httpErrorHandler,
    homeController
  )

  val exampleFilter: ExampleFilter = new ExampleFilter()

  override lazy val httpFilters = Seq(exampleFilter)
} 
开发者ID:harrylaou,项目名称:play2.5-skeleton-compileDI,代码行数:34,代码来源:AppComponents.scala



注:本文中的akka.actor.Scheduler类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Producer类代码示例发布时间:2022-05-23
下一篇:
Scala Iteratee类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap