• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala AtomicReference类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中java.util.concurrent.atomic.AtomicReference的典型用法代码示例。如果您正苦于以下问题:Scala AtomicReference类的具体用法?Scala AtomicReference怎么用?Scala AtomicReference使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了AtomicReference类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: userPreferencesFileContents

//设置package包名称以及导入依赖的类
package com.github.madoc.create_sbt_project.io

import java.nio.charset.Charset
import java.nio.file.{FileSystem, FileSystems, Files}
import java.util.concurrent.atomic.AtomicReference

import com.github.madoc.create_sbt_project.action.framework.ActionEnvironment
import com.github.madoc.create_sbt_project.io.Write.WriteToAppendable

import scala.io.Source

trait FileSystemSupport extends ActionEnvironment {
  def userPreferencesFileContents:Option[String]
  def writeToStandardOutput:Write
  def writeToErrorOutput:Write
}
object FileSystemSupport {
  val default:FileSystemSupport = new Default(FileSystems getDefault)
  val main:AtomicReference[FileSystemSupport] = new AtomicReference[FileSystemSupport](default)

  class Default(val fs:FileSystem) extends FileSystemSupport {
    def userPreferencesFileContents = {
      val prefsPath = fs getPath userPreferencesFilePath
      if(Files.exists(prefsPath) && Files.isRegularFile(prefsPath) && Files.isReadable(prefsPath)) {
        val source = Source.fromInputStream(Files newInputStream prefsPath, "utf-8")
        try Some(source mkString) finally source.close()
      }
      else None
    }

    val writeToErrorOutput = new WriteToAppendable(systemErr)
    val writeToStandardOutput = new WriteToAppendable(systemOut)
    def fileExists(path:String) = Files exists (fs getPath path)
    def isFileWritable(path:String) = Files isWritable (fs getPath path)
    def isDirectory(path:String) = Files isDirectory (fs getPath path)
    def mkdirs(path:String) = Files createDirectories (fs getPath path)
    def outputToFile[A](contents:A, path:String, charset:Charset)(implicit output:Output[A]) = {
      val writer = Files.newBufferedWriter(fs getPath path, charset)
      try output(contents)(new WriteToAppendable(writer)) finally writer close
    }

    protected def systemErr:Appendable = System err
    protected def systemOut:Appendable = System out
    protected def userHomeDirectory:String = System.getProperty("user.home")
    protected def userPreferencesFilePath:String =
      if(userHomeDirectory endsWith "/") userHomeDirectory + ".create-sbt-project.json"
      else userHomeDirectory + "/.create-sbt-project.json"
  }
} 
开发者ID:Madoc,项目名称:create-sbt-project,代码行数:50,代码来源:FileSystemSupport.scala


示例2: TimeBasedPublishEmitter

//设置package包名称以及导入依赖的类
package knot.core.emitters

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import knot.core.Signals.Request
import knot.core.config.PartConfig
import knot.core.scheduler.{Cancellable, Scheduler}
import knot.core.{Signals, TickSource, Upstream, Workbench}

import scala.util.control.NonFatal

case class TimeBasedPublishEmitter[O](override val workbench: Workbench,
                                      override val config: PartConfig,
                                      override val part: TickSource[O],
                                      scheduler: Scheduler,
                                      initialDelay: Long,
                                      delay: Long,
                                      timeUnit: TimeUnit)
  extends SignalEmitter(workbench, config, part)
    with TimeBasedPublisherContext[O]
    with PushEmitter[O] {
  require(delay > 0, s"delay must be > 0 (delay=$delay)")

  part.setContext(this)

  private val tick = new AtomicReference[Cancellable](null)

  override def autoReceive(signal: Signals.Signal): Unit = {
    super.autoReceive(signal)
    signal match {
      case Request(n) => onRequest(part, n)
      case _ =>
    }
  }

  override def onRequest(part: Upstream[_], n: Long): Unit = {
    super.onRequest(part, n)
    tick.compareAndSet(null, scheduler.schedule(() => tryOnTime(), initialDelay, delay, TimeUnit.SECONDS))
  }

  override def unSubscribe(): Unit = {
    super.unSubscribe()
    val t = tick.getAndSet(null)
    if (t != null)
      t.cancel()
  }

  private def tryOnTime(): Unit = {
    try this.part.onTime() catch {
      case NonFatal(e) => error(e)
    }
  }
} 
开发者ID:defvar,项目名称:knot,代码行数:55,代码来源:TimeBasedPublishEmitter.scala


示例3: Intern

//设置package包名称以及导入依赖的类
package org.leialearns.crystallize.util

import java.util.concurrent.atomic.AtomicReference

import scala.collection.immutable

object Intern {
  private val internalized = new AtomicReference[immutable.HashMap[AnyRef,Internalizable]](immutable.HashMap.empty)

  def internalize[T <: Internalizable](thing: T): T = {
    val key = thing.equivalenceKey
    var result: Option[Internalizable] = None
    do {
      val oldInternalized = internalized.get()
      result = oldInternalized.get(key)
      result match {
        case Some(internal) => internal
        case _ =>
          val newInternalized = oldInternalized + ((key, thing))
          internalized.compareAndSet(oldInternalized, newInternalized)
      }
    } while (result.isEmpty)
    thing.getClass.cast(result.get)
  }
} 
开发者ID:jeroenvanmaanen,项目名称:crystallize,代码行数:26,代码来源:Intern.scala


示例4: Chat

//设置package包名称以及导入依赖的类
package code.snippet

import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator

import net.liftweb.actor.LiftActor
import net.liftweb.http.js.JsCmd
import net.liftweb.http.js.JsCmds.SetValById
import net.liftweb.http._
import net.liftweb.util.ClearClearable
import net.liftweb.util.Helpers._

object Chat {
  private [this] val ms = new AtomicReference(List("default message"))

  def messages = ".chat-message *" #> ms.get() & ClearClearable

  private [this] def append(msg:String):Unit =
    ms.updateAndGet(new UnaryOperator[List[String]] {
      override def apply(m: List[String]) = (m :+ msg).takeRight(5)
    })

  private [this] def doPost:Unit = for {
    r <- S.request if r.post_?
    msg <- S.param("in")
  } yield {
    append(msg)
    S.redirectTo("/")
  }

  def submit = {
    S.session.foreach(_.plumbUpdateDOM(listenTo = List(ChatActor)))
    var msg = ""

    def onAjax():JsCmd = {
      append(msg)

//      SendUpdateDOM()
      ChatActor ! ""
      SetValById("chat-in", "")
    }

    "name=in" #> (SHtml.text(msg, msg = _, "id" -> "chat-in") ++ SHtml.hidden(onAjax))
  }
}

object ChatActor extends LiftActor with ListenerManager {
  override def createUpdate = ""

  override def lowPriority = {
    case _ => sendListenersMessage(UpdateDOM())
  }
} 
开发者ID:joescii,项目名称:lift-vdom-chat,代码行数:54,代码来源:Chat.scala


示例5: transform

//设置package包名称以及导入依赖的类
package org.pico.event

import java.io.Closeable
import java.lang.ref.WeakReference
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import org.pico.atomic.syntax.std.atomicReference._
import org.pico.disposal.OnClose

trait SimpleSinkSource[A, B] extends SinkSource[A, B] {
  private final val subscribers = new AtomicReference(List.empty[WeakReference[B => Unit]])
  private final val garbage = new AtomicInteger(0)

  def transform: A => B

  final override def publish(event: A): Unit = {
    subscribers.get().foreach { subscriberRef =>
      val subscriber = subscriberRef.get()

      if (subscriber != null) {
        subscriber(transform(event))
      } else {
        garbage.incrementAndGet()
      }
    }

    houseKeep()
  }

  final override def subscribe(subscriber: B => Unit): Closeable = {
    val subscriberRef = new WeakReference(subscriber)

    subscribers.update(subscriberRef :: _)

    houseKeep()

    OnClose {
      identity(subscriber)
      subscriberRef.clear()
      houseKeep()
    }
  }

  final def houseKeep(): Unit = {
    if (garbage.get() > subscribers.get().size) {
      garbage.set(0)
      subscribers.update { subscriptions =>
        subscriptions.filter { subscription =>
          subscription.get() != null
        }
      }
    }
  }
} 
开发者ID:pico-works,项目名称:pico-event-shim-scalaz,代码行数:55,代码来源:SimpleSinkSource.scala


示例6: FileBackedStatus

//设置package包名称以及导入依赖的类
package org.scalawag.jibe.backend

import java.io.File
import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator

import org.scalawag.jibe.FileUtils._
import spray.json.RootJsonFormat

class FileBackedStatus[A, B <: RootJsonFormat[A]](file: File, initialValue: A)(implicit jsonFormat: B) {
  private[this] var _status: A = initialValue

  updateFile(initialValue) // write the initial value to disk

  def get: A = _status

  def mutate(fn: A => A) = synchronized {
    val oldValue = _status
    val newValue = fn(_status)
    if ( oldValue != newValue ) {
      updateFile(newValue)
      _status = newValue

      // Fire a change event
      changeListeners.get.foreach(_.apply(oldValue, newValue))
    }
  }

  private[this] val changeListeners = new AtomicReference[Seq[(A, A) => Unit]](Seq.empty)

  def addChangeListener(listener: (A, A) => Unit) =
    changeListeners.getAndUpdate(new UnaryOperator[Seq[(A, A) => Unit]] {
      override def apply(t: Seq[(A, A) => Unit]) = t :+ listener
    })

  private[this] def updateFile(a: A) =
    writeFileWithPrintWriter(file) { pw =>
      pw.print(jsonFormat.write(a).prettyPrint)
    }
} 
开发者ID:scalawag,项目名称:jibe,代码行数:41,代码来源:FileBackedStatus.scala


示例7: AtomicBuffer

//设置package包名称以及导入依赖的类
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec


class AtomicBuffer[T] {
  private val buffer = new AtomicReference[List[T]](Nil)

  @tailrec
  final def +=(x: T): Unit = {
    val xs = buffer.get
    val nxs = x :: xs
    if (!buffer.compareAndSet(xs, nxs)) {
      this.+=(x)
    }
  }

  override def toString = buffer.get().toString()
}

object SyncOnMutable extends App {
  val buffer = new AtomicBuffer[Int]

  def asyncAdd(numbers: Int): Unit = {
    buffer.synchronized {
      buffer += numbers
      ConcurrentCollections.log(s"buffer = $buffer")
    }
  }

  asyncAdd(20)
  asyncAdd(30)
} 
开发者ID:rators,项目名称:ConcurrentQueue,代码行数:34,代码来源:AtomicBuffer.scala


示例8: TestIOGlobals

//设置package包名称以及导入依赖的类
package io.really

import java.util.concurrent.atomic.AtomicReference

import akka.actor.ActorSystem
import _root_.io.really.io.IOConfig

class TestIOGlobals(val config: IOConfig) {
  import play.api.Logger
  private[this] val _actorSystem = new AtomicReference[ActorSystem]
  private val logger = Logger(getClass)
  lazy val actorSystem = _actorSystem.get

  def boot(): Unit = {
    _actorSystem.set(ActorSystem("IO-TEST", config.ioConfig))
    logger.info("IO Booted")
  }

  def shutdown(): Unit = {
    actorSystem.shutdown()
    actorSystem.awaitTermination()
    logger.info("IO Terminated")
  }
} 
开发者ID:radiodb,项目名称:radiodb,代码行数:25,代码来源:TestIOGlobals.scala


示例9: PlacedOrders

//设置package包名称以及导入依赖的类
package services

import java.util.concurrent.atomic.AtomicReference

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import dao.EventDao
import models.{Order, OrderPlaced}

class PlacedOrders(orders: EventDao[OrderPlaced])(
    implicit sys: ActorSystem,
    mat: Materializer) {

  val allOrders = new AtomicReference[Seq[Order]]

  Source
    .fromPublisher(orders.out)
    .scan(List.empty[Order]) {
      case (orders, OrderPlaced(order)) =>
        orders :+ order
    }
    .runForeach(allOrders.set)

} 
开发者ID:leanovate,项目名称:contoso-conference-manager,代码行数:26,代码来源:PlacedOrders.scala


示例10: SubscribeDispatcher

//设置package包名称以及导入依赖的类
package com.twitter.finagle.redis.exp

import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.redis.protocol._
import com.twitter.finagle.transport.Transport
import com.twitter.util.{Future, NonFatal, Promise}
import java.util.concurrent.atomic.AtomicReference

class SubscribeDispatcher(trans: Transport[Command, Reply])
    extends GenSerialClientDispatcher[Command, Reply, Command, Reply](trans) {

  private val handler = new AtomicReference[SubscribeHandler]

  loop()

  private[this] def loop(): Unit =
    trans.read().onSuccess { reply =>
      handler.get().onMessage(reply)
      loop()
    }.onFailure {
      case NonFatal(ex) =>
        Option(handler.get()).foreach(_.onException(this, ex))
    }

  protected def dispatch(req: Command, p: Promise[Reply]): Future[Unit] = {
    trans.write(req)
      .onSuccess { _ => p.setValue(NoReply) }
      .onFailure { case NonFatal(ex) => p.setException(ex) }
  }

  override def apply(req: Command): Future[Reply] = {
    req match {
      case cmd: SubscribeCommand =>
        handler.compareAndSet(null, cmd.handler)
        super.apply(cmd).masked.onSuccess { _ =>
          req match {
            case Subscribe(channels, handler) =>
              channels.foreach(handler.onSuccess(_, this))
            case PSubscribe(patterns, handler) =>
              patterns.foreach(handler.onSuccess(_, this))
            case _ =>
          }
        }
      case _ =>
        throw new IllegalArgumentException("Not a subscribe/unsubscribe command")
    }
  }

  override def close(deadline: com.twitter.util.Time) = {
    super.close(deadline)
  }
} 
开发者ID:wenkeyang,项目名称:finagle,代码行数:53,代码来源:SubscribeDispatcher.scala


示例11: AppTest

//设置package包名称以及导入依赖的类
package samples

import java.nio.file.Files
import java.util.concurrent.atomic.AtomicReference

import com.github.cuzfrog.excel.Workbook
import utest._


object AppTest extends TestSuite {

  val workbookRef = new AtomicReference[Workbook](null)

  val tests = this {
    'OpenWorkbook {
      workbookRef.set(Workbook("./src/test/resources/sample.xlsx"))
    }
    'PopulateSheet {
      workbookRef.get().sheets.foreach { sheet =>
        sheet.rows.foreach { row =>
          row.cells.foreach { cell =>
            cell.setValue(cell.getValue + "_p")
          }
        }
      }
    }
    'SaveWorkbook {
      workbookRef.get().saveAs(Files.createTempFile("tmp", ".xlsx").toString)
    }
  }

} 
开发者ID:cuzfrog,项目名称:excela,代码行数:33,代码来源:AppTest.scala


示例12: ProcessStepper

//设置package包名称以及导入依赖的类
package org.http4s.finagle

import java.util.concurrent.atomic.AtomicReference

import scalaz.concurrent.Task
import scalaz.std.option.none
import scalaz.stream.Process
import scalaz.syntax.monoid._
import scalaz.syntax.std.option._
import scalaz.Monoid

class ProcessStepper[A: Monoid](p: Process[Task, A]) {
  import scalaz.stream.Cause._
  import scalaz.stream.Process.{ Await, Emit, Halt, Step }

  private val cur = new AtomicReference[Process[Task, A]](p)

  def read: Task[Option[A]] = readFrom

  private val Done: Task[Option[A]] = Task.now(none[A])

  private def readFrom: Task[Option[A]] = {
    cur.get.step match {
      case s: Step[Task, A] @unchecked =>
        (s.head, s.next) match {
          case (Emit(os), cont) =>
            cur.set(cont.continue)
            Task.now(os.foldLeft[A](?)((a, o) => a |+| o).some)
          case (awt: Await[Task, Any, A] @unchecked, cont) =>
            awt.evaluate flatMap { p =>
              cur.set(p +: cont)
              readFrom
            }
        }
      case Halt(End) =>
        Done
      case Halt(Kill) =>
        Done
      case Halt(Error(rsn)) =>
        Task.fail(rsn)
    }
  }
} 
开发者ID:lukiano,项目名称:finagle-http4s,代码行数:44,代码来源:ProcessStepper.scala


示例13: Room

//设置package包名称以及导入依赖的类
package chat

import java.util.concurrent.atomic.AtomicReference
import javax.inject.{ Inject, Singleton }

import akka.actor.ActorSystem
import akka.stream.scaladsl.{ BroadcastHub, Flow, Keep, MergeHub, Sink }
import akka.stream.{ KillSwitches, Materializer, UniqueKillSwitch }
import akka.stream.scaladsl.{ Sink, Source }
import akka.NotUsed

import scala.collection.mutable.{ Map => MutableMap }
import scala.concurrent.duration._

case class Room(roomId: String, bus: Flow[Message, Message, UniqueKillSwitch])

@Singleton
class RoomClient @Inject()(implicit val materializer: Materializer, implicit val system: ActorSystem) {

  def chatRoom(roomId: String): Room = synchronized {
    RoomClient.roomPool.get.get(roomId) match {
      case Some(room) =>
        room
      case None =>
        val room = create(roomId)
        RoomClient.roomPool.get() += (roomId -> room)
        room
    }
  }

  private def create(roomId: String): Room = {

    val (sink, source) =
      MergeHub.source[Message](perProducerBufferSize = 16)
          .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
          .run()
          
    source.runWith(Sink.ignore)

    val bus = Flow.fromSinkAndSource(sink, source)
        .joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
        .backpressureTimeout(3.seconds)

    Room(roomId, bus)
  }
}

object RoomClient {
  
  val roomPool = new AtomicReference[MutableMap[String, Room]](MutableMap[String, Room]())

} 
开发者ID:hys-rabbit,项目名称:PlayChat,代码行数:53,代码来源:RoomClient.scala


示例14: CallbackHandler

//设置package包名称以及导入依赖的类
package org.svars

import java.util.concurrent.atomic.AtomicReference

class CallbackHandler[D, T](val thresholdSet: Set[D], val lattice: Lattice[D, T], val callback: D => Unit) {

  private val threshold = new AtomicReference(thresholdSet.toList.sortWith((a, b) => lattice < (a, b)))

  def tick(state: D): Unit = {
    var stop = false

    while (!stop) {
      val list = threshold.get

      list match {
        case head :: tail if lattice < (head, state) && threshold.compareAndSet(list, tail) =>
          callback(head)
        case _ =>
          stop = true
      }
    }
  }
} 
开发者ID:johanstenberg92,项目名称:SVars,代码行数:24,代码来源:CallbackHandler.scala


示例15: ExecutionState

//设置package包名称以及导入依赖的类
package org.imdex.tractor.internal

import java.util.concurrent.atomic.AtomicReference

import org.imdex.tractor.actor._
import org.imdex.tractor.dispatch.Dispatcher
import org.imdex.tractor.mailbox.{Envelope, Mailbox}

private[tractor] sealed trait ExecutionState
private[tractor] object ExecutionState {
    case object Idle extends ExecutionState // Mailbox is empty, actor suspended until next received message
    case object Work extends ExecutionState // Mailbox processing in progress
    case object Suspended extends ExecutionState // Mailbox processing was suspended by the actor
    case object Stopped extends ExecutionState // Actor is stopped
}

private[tractor] object ActorData {
    def apply(env: Environment, dispatcher: Dispatcher, actor: JustActor, mailbox: Mailbox, receiveContext: InternalReceiveContext, state: ExecutionState): ActorData = {
        new ActorData(env, dispatcher, actor, mailbox, new AtomicReference(state), actor.receive(receiveContext), receiveContext)
    }
}


private[tractor] final class ActorData(override val environment: Environment,
                                       override val dispatcher: Dispatcher,
                                       var actor: JustActor, // NOTE: var only to avoid dangling actor references when actor is dead
                                       var mailbox: Mailbox,
                                       val state: AtomicReference[ExecutionState],
                                       var receive: PartialFunction[Any, Any], // NOTE: var only to avoid dangling actor references when actor is dead
                                       val receiveContext: InternalReceiveContext) extends ActorContext {
    override def equals(obj: scala.Any): Boolean = obj match {
        case that: ActorData => actor eq that.actor
        case _               => false
    }

    override def hashCode(): Int = actor.hashCode()

    override def enqueue(letter: Envelope): Unit = {
        import ExecutionState._

        val idle = state.compareAndSet(Idle, Work) // synchronization point
        mailbox += letter
        if (idle) dispatcher.notifyResume(this)
    }
} 
开发者ID:Im-dex,项目名称:trActor,代码行数:46,代码来源:ActorData.scala


示例16: MetricsTrackerImpl

//设置package包名称以及导入依赖的类
package eu.inn.metrics

import java.util.concurrent.atomic.AtomicReference

import com.codahale.metrics._

import scala.collection.concurrent.TrieMap

private [metrics] class MetricsTrackerImpl(registry: MetricRegistry) extends MetricsTracker {
  protected val gauges = TrieMap[String, ReplaceableGauge[_]]()

  override def counter(name: String): Counter = {
    registry.counter(name)
  }

  override def meter(name: String): Meter = {
    registry.meter(name)
  }

  override def histogram(name: String): Histogram = {
    registry.histogram(name)
  }

  override def timer(name: String): Timer = {
    registry.timer(name)
  }

  def gauge[T](name: String, gauge: Gauge[T]): Unit = {
    gauges.putIfAbsent(name, new ReplaceableGauge[T](gauge)) match {
      case Some(existing) ? existing.replaceUnderlying(gauge)
      case None ? registry.register(name, gauge)
    }
  }

  def remove(name: String): Unit = {
    gauges.remove(name)
    registry.remove(name)
  }

  def removeAll(): Unit = {
    gauges.clear()
    registry.removeMatching(MetricFilter.ALL)
  }
}

private [metrics] class ReplaceableGauge[T](initUnderlying: Gauge[T]) extends Gauge[T] {
  private val underlyingGauge = new AtomicReference[Gauge[T]](initUnderlying)

  override def getValue = {
    underlyingGauge.get().getValue
  }

  def replaceUnderlying[X](newUnderlyingGauge: Gauge[X]): Unit = {
    underlyingGauge.set(newUnderlyingGauge.asInstanceOf[Gauge[T]])
  }
} 
开发者ID:InnovaCo,项目名称:service-metrics,代码行数:57,代码来源:MetricsTrackerImpl.scala


示例17: BlockedListenerAdapter

//设置package包名称以及导入依赖的类
package com.hellosoda.rmq.impl
import com.rabbitmq.client._
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.Promise

private[impl] class BlockedListenerAdapter (
  private val connectionId : String,
  private val reference : AtomicReference[Promise[Unit]])
    extends BlockedListener
    with    com.typesafe.scalalogging.LazyLogging {

  def handleBlocked(reason : String) : Unit = {
    logger.debug(s"handleBlocked: reason='$reason' connection=$connectionId")
    reference.compareAndSet(null, Promise[Unit]())
  }

  def handleUnblocked() : Unit = {
    logger.debug(s"handleUnblocked: connection=$connectionId")
    reference.getAndSet(null).success(Unit)
  }

} 
开发者ID:hellosoda,项目名称:RMQ,代码行数:23,代码来源:BlockedListenerAdapter.scala


示例18: CachedValue

//设置package包名称以及导入依赖的类
package io.ahamdy.taskforce.common

import java.time.ZonedDateTime
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantReadWriteLock

import io.ahamdy.taskforce.syntax.zonedDateTime._
import cats.syntax.flatMap._
import fs2.interop.cats._
import fs2.Task

import scala.concurrent.duration.FiniteDuration

class CachedValue[A](source: Task[A], ttl: FiniteDuration, time: Time) {
  val currentValue = new AtomicReference[Task[A]]()
  val lastUpdated = new AtomicReference[ZonedDateTime](time.epoch)
  val lock = new ReentrantReadWriteLock()

  def value: Task[A] =
    time.now.flatMap{ now =>
      if (lastUpdated.get().isBefore(now.minus(ttl))){
        try{
          lock.writeLock().lock()
          source.unsafeRunSync() match {
            case Right(a) =>
              Task.delay(lastUpdated.set(now)) >>
                Task.delay {
                  currentValue.set(Task.now(a))
                  a
                }
            case Left(_) => source
          }
        }finally {
          lock.writeLock().unlock()
        }
      }else{
        try{
          lock.readLock().lock()
          currentValue.get()
        }finally {
          lock.readLock().unlock()
        }
      }
    }
} 
开发者ID:ahamdy88,项目名称:JobForce,代码行数:46,代码来源:CachedValue.scala


示例19: DummyNodeStore

//设置package包名称以及导入依赖的类
package io.ahamdy.taskforce.store

import java.util.concurrent.atomic.AtomicReference

import fs2.Task
import io.ahamdy.taskforce.common.Time
import io.ahamdy.taskforce.domain._

class DummyNodeStore(time: Time) extends NodeStore {
  val nodesList = new AtomicReference[List[JobNode]](List(
    JobNode(NodeId("test-node-1"), NodeGroup("test-group-1"), time.unsafeNow().minusMinutes(1), NodeActive(true), NodeVersion("1.0.0")),
    JobNode(NodeId("test-node-2"), NodeGroup("test-group-1"), time.unsafeNow(), NodeActive(true), NodeVersion("1.0.0")),
    JobNode(NodeId("test-node-3"), NodeGroup("test-group-2"), time.unsafeNow().minusMinutes(1), NodeActive(true), NodeVersion("1.0.0")) // other group
  ))

  override def getAllNodes: Task[List[JobNode]] = Task.delay(nodesList.get())

  override def updateGroupNodesStatus(nodeGroup: NodeGroup, active: NodeActive): Task[Unit] =
    Task.delay {
      val updatedList = nodesList.get().map(_.copy(active = NodeActive(false)))
      nodesList.set(updatedList)
    }

  def reset(): Unit = nodesList.set(List(
    JobNode(NodeId("test-node-1"), NodeGroup("test-group-1"), time.unsafeNow().minusMinutes(1), NodeActive(true), NodeVersion("1.0.0")),
    JobNode(NodeId("test-node-2"), NodeGroup("test-group-1"), time.unsafeNow(), NodeActive(true), NodeVersion("1.0.0")),
    JobNode(NodeId("test-node-3"), NodeGroup("test-group-2"), time.unsafeNow().minusMinutes(1), NodeActive(true), NodeVersion("1.0.0")) // other group
  ))
} 
开发者ID:ahamdy88,项目名称:JobForce,代码行数:30,代码来源:DummyNodeStore.scala


示例20: TransactionIDService

//设置package包名称以及导入依赖的类
package com.bwsw.tstreamstransactionserver.netty.server.transactionIDService

import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator

object TransactionIDService
  extends ITransactionIDGenerator
{
  private val SCALE = 100000

  private val transactionIDAndCurrentTime = {
    val transactionGeneratorUnit =
      TransactionGeneratorUnit(0, 0L)

    new AtomicReference(transactionGeneratorUnit)
  }

  private def update(now: Long) = new UnaryOperator[TransactionGeneratorUnit] {
    override def apply(transactionGenUnit: TransactionGeneratorUnit): TransactionGeneratorUnit = {
      if (now - transactionGenUnit.currentTime > 0L) {
        TransactionGeneratorUnit(1 + 1, now)
      } else
        transactionGenUnit.copy(
          transactionID = transactionGenUnit.transactionID + 1
        )
    }
  }

  override def getTransaction(): Long = {
    val now = System.currentTimeMillis()
    val txn = transactionIDAndCurrentTime.updateAndGet(update(now))
    getTransaction(now) + txn.transactionID
  }

  override def getTransaction(timestamp: Long): Long = {
    timestamp * TransactionIDService.SCALE
  }
} 
开发者ID:bwsw,项目名称:tstreams-transaction-server,代码行数:39,代码来源:TransactionIDService.scala



注:本文中的java.util.concurrent.atomic.AtomicReference类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala FileSystem类代码示例发布时间:2022-05-23
下一篇:
Scala Level类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap