本文整理汇总了Scala中java.util.concurrent.atomic.AtomicInteger类的典型用法代码示例。如果您正苦于以下问题:Scala AtomicInteger类的具体用法?Scala AtomicInteger怎么用?Scala AtomicInteger使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了AtomicInteger类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: TenantEventProcessorSpec
//设置package包名称以及导入依赖的类
package optrak.lagomtest.tenant.impl
import java.util.concurrent.atomic.AtomicInteger
import akka.persistence.query.Sequence
import com.lightbend.lagom.scaladsl.api.ServiceLocator.NoServiceLocator
import com.lightbend.lagom.scaladsl.server.LagomApplication
import com.lightbend.lagom.scaladsl.testkit.ServiceTest
import TenantEvents.{TenantCreated, TenantEvent}
import optrak.lagomtest.data.Data.TenantId
import optrak.lagomtest.utils.ReadSideTestDriver
import org.scalatest.{AsyncWordSpec, BeforeAndAfterAll, Matchers}
import play.api.libs.ws.ahc.AhcWSComponents
class TenantEventProcessorSpec extends AsyncWordSpec with BeforeAndAfterAll with Matchers {
private val server = ServiceTest.startServer(ServiceTest.defaultSetup.withCassandra(true)) { ctx =>
new LagomApplication(ctx) with TenantComponents with AhcWSComponents {
override def serviceLocator = NoServiceLocator
override lazy val readSide: ReadSideTestDriver = new ReadSideTestDriver
}
}
override def afterAll() = server.stop()
private val testDriver = server.application.readSide
private val tenantRepository = server.application.tenantRepository
private val offset = new AtomicInteger()
"The tenant event processor" should {
"create a tenant" in {
val tenantCreated = TenantCreated("tim", "hello")
for {
_ <- feed(tenantCreated.id, tenantCreated)
tenants <- getTenants
} yield {
tenants should contain only tenantCreated.id
}
}
}
private def getTenants = {
tenantRepository.selectAllTenants
}
private def feed(tenantId: TenantId, event: TenantEvent) = {
testDriver.feed(tenantId.toString, event, Sequence(offset.getAndIncrement))
}
}
开发者ID:TimPigden,项目名称:lagom-multitenant,代码行数:52,代码来源:TenantEventProcessorSpec.scala
示例2: Exercise09
//设置package包名称以及导入依赖的类
package forimpatient.chapter13
import java.util.concurrent.atomic.AtomicInteger
import scala.io.Source
import scala.collection.JavaConversions.mapAsScalaConcurrentMap
object Exercise09 extends App {
println("Chapter 13 Exercise 09")
val frequencies: scala.collection.concurrent.Map[Char, AtomicInteger] = new java.util.concurrent.ConcurrentHashMap[Char, AtomicInteger]()
val files = Array("src/forimpatient/chapter13/1.txt", "src/forimpatient/chapter13/2.txt")
var nonProceeded = files.size
for (file <- files) {
new Thread(new Runnable {
override def run(): Unit = {
val chars = Source.fromFile(file).iter
for (c <- chars) { frequencies.putIfAbsent(c, new AtomicInteger(0)); frequencies(c).incrementAndGet()}
nonProceeded -= 1
if (nonProceeded == 0) println(frequencies)
}
}).start()
}
}
开发者ID:Kiryna,项目名称:Scala-for-the-Impatient,代码行数:28,代码来源:Exercise09.scala
示例3: ListenerSpec
//设置package包名称以及导入依赖的类
package akka.actor.routing
import akka.testkit._
import akka.actor._
import akka.actor.Actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ListenerSpec extends AkkaSpec {
"Listener" must {
"listen" in {
val fooLatch = TestLatch(2)
val barLatch = TestLatch(2)
val barCount = new AtomicInteger(0)
val broadcast = system.actorOf(Props(new Actor with Listeners {
def receive = listenerManagement orElse {
case "foo" ? gossip("bar")
}
}))
def newListener = system.actorOf(Props(new Actor {
def receive = {
case "bar" ?
barCount.incrementAndGet
barLatch.countDown()
case "foo" ?
fooLatch.countDown()
}
}))
val a1 = newListener
val a2 = newListener
val a3 = newListener
broadcast ! Listen(a1)
broadcast ! Listen(a2)
broadcast ! Listen(a3)
broadcast ! Deafen(a3)
broadcast ! WithListeners(_ ! "foo")
broadcast ! "foo"
Await.ready(barLatch, TestLatch.DefaultTimeout)
barCount.get should be(2)
Await.ready(fooLatch, TestLatch.DefaultTimeout)
for (a ? List(broadcast, a1, a2, a3)) system.stop(a)
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:58,代码来源:ListenerSpec.scala
示例4: CallingThreadDispatcherModelSpec
//设置package包名称以及导入依赖的类
package akka.testkit
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.dispatch.ActorModelSpec
import com.typesafe.config.Config
import akka.dispatch.DispatcherPrerequisites
import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageDispatcherConfigurator
import akka.dispatch.UnboundedMailbox
object CallingThreadDispatcherModelSpec {
import ActorModelSpec._
val config = {
"""
boss {
executor = thread-pool-executor
type = PinnedDispatcher
}
""" +
// use unique dispatcher id for each test, since MessageDispatcherInterceptor holds state
(for (n ? 1 to 30) yield """
test-calling-thread-%s {
type = "akka.testkit.CallingThreadDispatcherModelSpec$CallingThreadDispatcherInterceptorConfigurator"
}""".format(n)).mkString
}
class CallingThreadDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new CallingThreadDispatcher(this) with MessageDispatcherInterceptor {
override def id: String = config.getString("id")
}
override def dispatcher(): MessageDispatcher = instance
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CallingThreadDispatcherModelSpec extends ActorModelSpec(CallingThreadDispatcherModelSpec.config) {
import ActorModelSpec._
val dispatcherCount = new AtomicInteger()
override def interceptedDispatcher(): MessageDispatcherInterceptor = {
// use new id for each test, since the MessageDispatcherInterceptor holds state
system.dispatchers.lookup("test-calling-thread-" + dispatcherCount.incrementAndGet()).asInstanceOf[MessageDispatcherInterceptor]
}
override def dispatcherType = "Calling Thread Dispatcher"
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:55,代码来源:CallingThreadDispatcherModelSpec.scala
示例5: trade
//设置package包名称以及导入依赖的类
package akka.performance.trading.domain
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorSystem }
abstract trait TradeObserver {
def trade(bid: Bid, ask: Ask)
}
trait TotalTradeObserver extends TradeObserver {
def system: ActorSystem
private lazy val counter: TotalTradeCounter = TotalTradeCounterExtension(system)
override def trade(bid: Bid, ask: Ask) {
counter.increment()
}
}
trait NopTradeObserver extends TradeObserver {
override def trade(bid: Bid, ask: Ask) {
}
}
class TotalTradeCounter extends Extension {
private val counter = new AtomicInteger
def increment() = counter.incrementAndGet()
def reset() {
counter.set(0)
}
def count: Int = counter.get
}
object TotalTradeCounterExtension
extends ExtensionId[TotalTradeCounter]
with ExtensionIdProvider {
override def lookup = TotalTradeCounterExtension
override def createExtension(system: ExtendedActorSystem) = new TotalTradeCounter
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:40,代码来源:TradeObserver.scala
示例6: FakeRequest
//设置package包名称以及导入依赖的类
package io.policarp.logback.hec
import java.util.concurrent.atomic.AtomicInteger
import ch.qos.logback.classic.spi.ILoggingEvent
class FakeRequest(
var executed: Boolean = false,
val executions: AtomicInteger = new AtomicInteger(0),
var lastEvent: Option[ILoggingEvent] = None
) {
var nextIncrement: () => Unit = _
}
trait FakeHecClient extends SplunkHecClient {
override type AbstractRequest = FakeRequest
var fakeRequest = new FakeRequest()
override private[hec] def prepareRequest = (events, layout) => {
if (events.nonEmpty) {
fakeRequest.lastEvent = Some(events.reverse.head)
fakeRequest.nextIncrement = () => fakeRequest.executions.addAndGet(events.size)
}
Some(fakeRequest)
}
override private[hec] def executeRequest(preparedRequest: FakeRequest) = {
preparedRequest.executed = true
preparedRequest.nextIncrement()
}
}
开发者ID:kdrakon,项目名称:splunk-logback-hec-appender,代码行数:34,代码来源:FakeHecClient.scala
示例7: UnsyncHotspotBenchmark
//设置package包名称以及导入依赖的类
package lazytx.benchmark.oltp
import java.util.concurrent.ThreadLocalRandom
import java.util.HashMap
import java.util.concurrent.atomic.AtomicInteger
class UnsyncHotspotBenchmark(normalCount : Int, hotspotCount : Int, hotspotSize : Int) extends SimpleBenchmark {
def workload(dbsize : Int) = {
val state = Array.ofDim[AtomicInteger](1000000)
for(i <- 0 to 1000000 - 1) {
state(i) = new AtomicInteger(0)
}
() => {
val rnd = ThreadLocalRandom.current()
var i = normalCount + hotspotCount
while(i > 0) {
i -= 1
val k = if(i < normalCount)
rnd.nextInt(dbsize - hotspotSize)
else
dbsize - hotspotSize + rnd.nextInt(hotspotSize)
state(k).incrementAndGet()
}
}
}
}
开发者ID:utwente-fmt,项目名称:lazy-persistent-trie,代码行数:30,代码来源:UnsyncBenchmark.scala
示例8: documentAsVector
//设置package包名称以及导入依赖的类
package org.dl4scala.examples.nlp.paragraphvectors.tools
import java.util.concurrent.atomic.AtomicInteger
import lombok.NonNull
import org.deeplearning4j.models.embeddings.inmemory.InMemoryLookupTable
import org.deeplearning4j.models.word2vec.VocabWord
import org.deeplearning4j.text.documentiterator.LabelledDocument
import org.deeplearning4j.text.tokenization.tokenizerfactory.TokenizerFactory
import org.nd4j.linalg.api.ndarray.INDArray
import org.nd4j.linalg.factory.Nd4j
import scala.collection.JavaConverters._
def documentAsVector(@NonNull document: LabelledDocument): INDArray = {
val documentAsTokens = tokenizerFactory.create(document.getContent).getTokens.asScala
val cnt = new AtomicInteger(0)
for (word <- documentAsTokens) {
if (vocabCache.containsWord(word)) cnt.incrementAndGet
}
val allWords = Nd4j.create(cnt.get, lookupTable.layerSize)
cnt.set(0)
for (word <- documentAsTokens) {
if (vocabCache.containsWord(word)) allWords.putRow(cnt.getAndIncrement, lookupTable.vector(word))
}
val mean = allWords.mean(0)
mean
}
}
开发者ID:endymecy,项目名称:dl4scala,代码行数:35,代码来源:MeansBuilder.scala
示例9: TestBuild
//设置package包名称以及导入依赖的类
import java.util.concurrent.atomic.AtomicInteger
import sbt._
import sbt.Keys._
import com.typesafe.sbt.web.SbtWeb
import com.typesafe.sbt.web.SbtWeb.autoImport._
object TestBuild extends Build {
class TestLogger(target: File) extends Logger {
val unrecognisedInputCount = new AtomicInteger(0)
def trace(t: => Throwable): Unit = {}
def success(message: => String): Unit = {}
def log(level: Level.Value, message: => String): Unit = {
if (level == Level.Error) {
if (message.contains("""Cannot find variable `Decrement`""")) {
if (unrecognisedInputCount.addAndGet(1) == 1) {
IO.touch(target / "cannot-find-decrement")
}
}
}
}
}
class TestReporter(target: File) extends LoggerReporter(-1, new TestLogger(target))
lazy val root = Project(
id = "test-build",
base = file("."),
settings = Seq(WebKeys.reporter := new TestReporter(target.value))
).enablePlugins(SbtWeb)
}
开发者ID:sutiialex,项目名称:sbt-elm,代码行数:37,代码来源:TestBuild.scala
示例10: TestBuild
//设置package包名称以及导入依赖的类
import java.util.concurrent.atomic.AtomicInteger
import sbt._
import sbt.Keys._
import com.typesafe.sbt.web.SbtWeb
import com.typesafe.sbt.web.SbtWeb.autoImport._
object TestBuild extends Build {
class TestLogger(target: File) extends Logger {
val unrecognisedInputCount = new AtomicInteger(0)
def trace(t: => Throwable): Unit = {}
def success(message: => String): Unit = {}
def log(level: Level.Value, message: => String): Unit = {
if (level == Level.Error) {
if (message.contains("""I cannot find module 'B'""")) {
if (unrecognisedInputCount.addAndGet(1) == 1) {
IO.touch(target / "cannot-find-module")
}
}
}
}
}
class TestReporter(target: File) extends LoggerReporter(-1, new TestLogger(target))
lazy val root = Project(
id = "test-build",
base = file("."),
settings = Seq(WebKeys.reporter := new TestReporter(target.value))
).enablePlugins(SbtWeb)
}
开发者ID:sutiialex,项目名称:sbt-elm,代码行数:37,代码来源:TestBuild.scala
示例11: TestBuild
//设置package包名称以及导入依赖的类
import java.util.concurrent.atomic.AtomicInteger
import sbt._
import sbt.Keys._
import com.typesafe.sbt.web.SbtWeb
import com.typesafe.sbt.web.SbtWeb.autoImport._
object TestBuild extends Build {
class TestLogger(target: File) extends Logger {
val unrecognisedInputCount = new AtomicInteger(0)
def trace(t: => Throwable): Unit = {}
def success(message: => String): Unit = {}
def log(level: Level.Value, message: => String): Unit = {
if (level == Level.Error) {
if (message.contains("""Cannot find pattern `Reset`""")) {
if (unrecognisedInputCount.addAndGet(1) == 1) {
IO.touch(target / "cannot-find-reset")
}
}
}
}
}
class TestReporter(target: File) extends LoggerReporter(-1, new TestLogger(target))
lazy val root = Project(
id = "test-build",
base = file("."),
settings = Seq(WebKeys.reporter := new TestReporter(target.value))
).enablePlugins(SbtWeb)
}
开发者ID:sutiialex,项目名称:sbt-elm,代码行数:37,代码来源:TestBuild.scala
示例12: TestBuild
//设置package包名称以及导入依赖的类
import java.util.concurrent.atomic.AtomicInteger
import sbt._
import sbt.Keys._
import com.typesafe.sbt.web.SbtWeb
import com.typesafe.sbt.web.SbtWeb.autoImport._
object TestBuild extends Build {
class TestLogger(target: File) extends Logger {
val unrecognisedInputCount = new AtomicInteger(0)
def trace(t: => Throwable): Unit = {}
def success(message: => String): Unit = {}
def log(level: Level.Value, message: => String): Unit = {
if (level == Level.Error) {
if (message.contains("""Top-level value `main` does not have a type annotation.""")) {
if (unrecognisedInputCount.addAndGet(1) == 1) {
IO.touch(target / "main-warning")
}
}
}
}
}
class TestReporter(target: File) extends LoggerReporter(-1, new TestLogger(target))
lazy val root = Project(
id = "test-build",
base = file("."),
settings = Seq(WebKeys.reporter := new TestReporter(target.value))
).enablePlugins(SbtWeb)
}
开发者ID:sutiialex,项目名称:sbt-elm,代码行数:37,代码来源:TestBuild.scala
示例13: HelloSpec
//设置package包名称以及导入依赖的类
package example
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest._
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
class HelloSpec extends FlatSpec with Matchers {
val counter = new AtomicInteger()
"The Hello object" should "say hello" in {
def createFuture(sleepingTime:Int) = {
Future {
val no = counter.addAndGet(1)
println(s"no.$no : start sleeping $sleepingTime")
Thread.sleep(sleepingTime)
println(s"no.$no : end sleeping $sleepingTime")
no
}
}
val f: Future[Int] = createFuture(100)
val f2: Future[Int] = createFuture(50)
val hoge = for {
res1 <- f
res2 <- f2
res3 <- createFuture(100)
res4 <- createFuture(50)
f5 = createFuture(100)
f6 = createFuture(50)
res5 <- f5
res6 <- f6
} yield s"${res1}, ${res2}, ${res3}, ${res4}, ${res5}, ${res6}"
println(Await.result(hoge, Duration.Inf))
}
}
开发者ID:rysh,项目名称:my-scala-playground,代码行数:42,代码来源:HelloSpec.scala
示例14: DispatcherThreadFactory
//设置package包名称以及导入依赖的类
package knot.core.dispatch
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentHashMap, ThreadFactory}
object DispatcherThreadFactory {
private val threadNumbers = new ConcurrentHashMap[String, AtomicInteger]()
def getNewTheadNumber(prefix: String): Int = {
threadNumbers.computeIfAbsent(prefix, _ => new AtomicInteger(1)).getAndIncrement()
}
}
class DispatcherThreadFactory(val prefix: String) extends ThreadFactory {
import DispatcherThreadFactory._
override def newThread(r: Runnable): Thread = {
val t = new Thread(r, s"knot-$prefix-${getNewTheadNumber(prefix)}")
t.setDaemon(true)
t.setPriority(Thread.NORM_PRIORITY)
t
}
}
开发者ID:defvar,项目名称:knot,代码行数:26,代码来源:DispatcherThreadFactory.scala
示例15: TestSessionExporterSolr
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionSolrDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestSessionExporterSolr {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
val writer = new TestSessionSolrDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-sessions-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
testSessions
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporterSolr.scala
示例16: TestSessionExporter
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestSessionCassandraDataReaderComponent
import org.efset.writer.TestSessionElasticsearchDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestSessionExporter {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestSessionCassandraDataReaderComponent(query).dataReader
val writer = new TestSessionElasticsearchDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-sessions-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testSessions =>
println(s"Processing ${testSessions.size} items. Batch number ${total.incrementAndGet()}")
testSessions
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestSessionExporter.scala
示例17: TestGroupExporterSolr
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestGroupCassandraDataReaderComponent
import org.efset.writer.TestGroupSolrDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestGroupExporterSolr {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestGroupCassandraDataReaderComponent(query).dataReader
val writer = new TestGroupSolrDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-groups-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testGroups =>
println(s"Processing ${testGroups.size} items. Batch number ${total.incrementAndGet()}")
testGroups
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestGroupExporterSolr.scala
示例18: TestGroupExporter
//设置package包名称以及导入依赖的类
package org.efset
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import org.efset.ContextConfig.writeBatchSize
import org.efset.Model.{ModelDate, ModelThing}
import org.efset.readers.TestGroupCassandraDataReaderComponent
import org.efset.writer.TestGroupElasticsearchDataWriterComponent
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object TestGroupExporter {
def apply(query: String, maybeDateFilter: Option[Date]) = {
val reader = new TestGroupCassandraDataReaderComponent(query).dataReader
val writer = new TestGroupElasticsearchDataWriterComponent().dataWriter
implicit val system = ActorSystem("test-groups-export")
implicit val materializer = ActorMaterializer()
implicit val ec = ExecutionContext.global
def endDateFilter(thing: ModelThing, endDate: Date): Boolean = {
val maybeField = thing.fields.find(f => f.name == "create_date")
maybeField match {
case Some(ModelDate(_, dt)) => dt.after(endDate)
case _ => false
}
}
val filter = Flow[ModelThing].filter(t => maybeDateFilter.forall(dt => endDateFilter(t, dt)))
val source = Source.actorRef[ModelThing](Int.MaxValue, OverflowStrategy.fail)
val group = Flow[ModelThing].groupedWithin(writeBatchSize, ContextConfig.groupWindow.seconds)
val commit = Flow[Seq[ModelThing]].map { s => writer.write(s); s }
val sink = Sink.ignore
val total = new AtomicInteger()
val showGroup = Flow[Seq[ModelThing]].map { testGroups =>
println(s"Processing ${testGroups.size} items. Batch number ${total.incrementAndGet()}")
testGroups
}
val flow = source.via(filter).via(group).via(showGroup).via(commit).to(sink).run()
reader.read().foreach { thing =>
flow ! thing
Thread.sleep(ContextConfig.sleepLength)
}
println("Finished...")
}
}
开发者ID:ef-ice,项目名称:cassandra-exporter,代码行数:60,代码来源:TestGroupExporter.scala
示例19: apply
//设置package包名称以及导入依赖的类
package de.geekonaut.slickmdc
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._
import com.typesafe.scalalogging.StrictLogging
import slick.util.AsyncExecutor
def apply(name:String, numThreads: Int): AsyncExecutor = {
new AsyncExecutor {
val tf = new DaemonThreadFactory(name + "-")
lazy val executionContext = {
new MdcExecutionContext(ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads, tf)))
}
def close(): Unit = {}
}
}
def default(name: String = "AsyncExecutor.default"): AsyncExecutor = apply(name, 20)
private class DaemonThreadFactory(namePrefix: String) extends ThreadFactory {
private[this] val group = Option(System.getSecurityManager).fold(Thread.currentThread.getThreadGroup)(_.getThreadGroup)
private[this] val threadNumber = new AtomicInteger(1)
def newThread(r: Runnable): Thread = {
val t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement, 0)
if(!t.isDaemon) t.setDaemon(true)
if(t.getPriority != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY)
t
}
}
}
开发者ID:AVGP,项目名称:slickmdc,代码行数:38,代码来源:MdcAsyncExecutor.scala
示例20: TestGroupManagerFixture
//设置package包名称以及导入依赖的类
package mesosphere.marathon
package api
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Provider
import akka.event.EventStream
import com.codahale.metrics.MetricRegistry
import mesosphere.marathon.core.group.GroupManagerModule
import mesosphere.marathon.core.leadership.AlwaysElectedLeadershipModule
import mesosphere.marathon.io.storage.StorageProvider
import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.storage.repository.{ AppRepository, GroupRepository, PodRepository }
import mesosphere.marathon.test.{ MarathonActorSupport, Mockito }
import mesosphere.marathon.util.WorkQueue
class TestGroupManagerFixture extends Mockito with MarathonActorSupport {
val service = mock[MarathonSchedulerService]
val groupRepository = mock[GroupRepository]
val podRepository = mock[PodRepository]
val appRepository = mock[AppRepository]
val eventBus = mock[EventStream]
val provider = mock[StorageProvider]
val config = AllConf.withTestConfig("--zk_timeout", "1000")
val metricRegistry = new MetricRegistry()
val metrics = new Metrics(metricRegistry)
val actorId = new AtomicInteger(0)
val schedulerProvider = new Provider[DeploymentService] {
override def get() = service
}
private[this] val groupManagerModule = new GroupManagerModule(
config = config,
AlwaysElectedLeadershipModule.forActorSystem(system),
serializeUpdates = WorkQueue("serializeGroupUpdates", 1, 10),
scheduler = schedulerProvider,
groupRepo = groupRepository,
appRepo = appRepository,
podRepo = podRepository,
storage = provider,
eventBus = eventBus,
metrics = metrics)
val groupManager = groupManagerModule.groupManager
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:50,代码来源:TestGroupManagerFixture.scala
注:本文中的java.util.concurrent.atomic.AtomicInteger类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论