本文整理汇总了Scala中java.util.concurrent.atomic.AtomicBoolean类的典型用法代码示例。如果您正苦于以下问题:Scala AtomicBoolean类的具体用法?Scala AtomicBoolean怎么用?Scala AtomicBoolean使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了AtomicBoolean类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Benchmark
//设置package包名称以及导入依赖的类
package lazytx.benchmark.oltp
import java.util.concurrent.atomic.AtomicBoolean
object Benchmark {
def benchmark(threads : Int, millis : Long, tx : () => Unit) : Double = {
var start = time
val running = new AtomicBoolean(false)
val stopping = new AtomicBoolean(false)
val ts = Array.ofDim[BenchmarkThread](threads)
for(i <- 0 to threads - 1)
ts(i) = new BenchmarkThread(running, stopping, tx)
ts.foreach(_.start())
val begin = time()
running.set(true)
Thread.sleep(millis)
stopping.set(true)
val end = time()
ts.foreach(_.join())
return ts.map(_.count).sum / (end - begin)
}
def time() = System.nanoTime() / 1000000000.0
}
class BenchmarkThread(running : AtomicBoolean, stopping : AtomicBoolean, tx : () => Unit) extends Thread {
var count = 0L
override def run() {
var count = 0L
while(!running.get()) {
// wait to start
}
while(!stopping.get()) {
tx()
if(!stopping.get()) // Dont count transactions that didnt finish on time
count += 1
}
this.count = count
}
}
开发者ID:utwente-fmt,项目名称:lazy-persistent-trie,代码行数:51,代码来源:Benchmark.scala
示例2: ActionListTest
//设置package包名称以及导入依赖的类
package com.github.madoc.create_sbt_project.action.framework
import java.util.concurrent.atomic.AtomicBoolean
import com.github.madoc.create_sbt_project.action.CreateDirectory
import com.github.madoc.create_sbt_project.action.framework.ActionResult.PreconditionFailures
import com.github.madoc.create_sbt_project.action.precondition.PreconditionFailure
import com.github.madoc.create_sbt_project.action.precondition.PreconditionFailure.DirectoryDoesNotExist
import com.github.madoc.create_sbt_project.io.FileSystemSupport
import org.scalatest.{FreeSpec, Matchers}
class ActionListTest extends FreeSpec with Matchers {
"an ActionList combines the precondition failures that its contents might deal with" in {
val al = ActionList(CreateDirectory("dir1"), CreateDirectory("dir2"))
al.mightDealWith(DirectoryDoesNotExist("dir1")) should be (true)
al.mightDealWith(DirectoryDoesNotExist("dir2")) should be (true)
al.mightDealWith(DirectoryDoesNotExist("dir3")) should be (false)
}
"ActionList() returns the empty action" in {
ActionList() should be (ActionList empty)
}
"an empty ActionList cannot deal with any precondition failure" in {
ActionList.empty.mightDealWith(DirectoryDoesNotExist("dir")) should be (false)
}
"an empty ActionList returns a meaningful toString" in {
ActionList.empty.toString should be ("ActionList()")
}
"a non-empty ActionList returns a meaningful toString" in {
ActionList(CreateDirectory("foo"), CreateDirectory("bar")).toString should be ("ActionList(CreateDirectory(foo), CreateDirectory(bar))")
}
"when an action in the middle of an ActionList fails, the following action is not called" in {
object NonFailingAction extends Action {
protected def run(env:ActionEnvironment) = ActionResult.Success
def precondition = {_:ActionEnvironment ? Seq()}
def mightDealWith(failure:PreconditionFailure) = false
}
class FailingAction extends Action {
val runCalled = new AtomicBoolean(false)
protected def run(env:ActionEnvironment) = {
runCalled set true
ActionResult.PreconditionFailures(Seq(DirectoryDoesNotExist("dummy")))
}
def precondition = {_:ActionEnvironment ? Seq()}
def mightDealWith(failure:PreconditionFailure) = false
}
val (action1, action2, action3) = (NonFailingAction, new FailingAction, new FailingAction)
ActionList(action1, action2, action3)(FileSystemSupport default) should be (PreconditionFailures(Seq(DirectoryDoesNotExist("dummy"))))
action2.runCalled.get should be (true)
action3.runCalled.get should be (false)
}
}
开发者ID:Madoc,项目名称:create-sbt-project,代码行数:52,代码来源:ActionListTest.scala
示例3: onShutdown
//设置package包名称以及导入依赖的类
package mesosphere.marathon.core.base
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.LoggerFactory
import scala.util.control.NonFatal
trait ShutdownHooks {
def onShutdown(block: => Unit): Unit
def shutdown(): Unit
def isShuttingDown: Boolean
}
object ShutdownHooks {
def apply(): ShutdownHooks = new DefaultShutdownHooks
}
private[base] class BaseShutdownHooks extends ShutdownHooks {
private[this] val log = LoggerFactory.getLogger(getClass)
private[this] var shutdownHooks = List.empty[() => Unit]
private[this] val shuttingDown = new AtomicBoolean(false)
override def onShutdown(block: => Unit): Unit = {
shutdownHooks +:= { () => block }
}
override def shutdown(): Unit = {
shuttingDown.set(true)
shutdownHooks.foreach { hook =>
try hook()
catch {
case NonFatal(e) => log.error("while executing shutdown hook", e)
}
}
shutdownHooks = Nil
}
override def isShuttingDown: Boolean = shuttingDown.get
}
private class DefaultShutdownHooks extends BaseShutdownHooks {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
shutdown()
}
})
}
开发者ID:xiaozai512,项目名称:marathon,代码行数:52,代码来源:ShutdownHooks.scala
示例4: MessageRelayManager
//设置package包名称以及导入依赖的类
package io.grhodes.mcm.server.gcm
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{DelayQueue, Executors}
import com.gilt.gfc.logging.Loggable
import scala.util.control.NonFatal
class MessageRelayManager(val gcmMessageStore: GCMMessageStore) extends Loggable {
private val executorService = Executors.newFixedThreadPool(50)
private val queue = new DelayQueue[ScheduledJob]()
private val keepRunning = new AtomicBoolean(true)
private val schedulerFuture = executorService.submit(new JobScheduler(this))
def shutdown(): Boolean = {
this.keepRunning.set(false)
gcmMessageStore.shutdown()
queue.clear()
schedulerFuture.cancel(true)
true
}
def addJob(job: ScheduledJob): Boolean = queue.add(job)
class JobScheduler(relayManager: MessageRelayManager) extends Runnable {
override def run(): Unit = {
while (keepRunning.get()) {
try {
val scheduledJob = queue.take()
executorService.execute(new Runnable() {
override def run(): Unit = scheduledJob.execute(relayManager)
});
} catch {
case NonFatal(e) =>
error("JobScheduler.run() got exception:", e)
}
}
}
}
}
开发者ID:grahamar,项目名称:mcm-server,代码行数:45,代码来源:MessageRelayManager.scala
示例5: static
//设置package包名称以及导入依赖的类
package reactify
import java.util.concurrent.atomic.AtomicBoolean
import reactify.bind._
trait StateChannel[T] extends State[T] with Channel[T] {
override def static(value: T): Unit = super.static(value)
def bind[V](that: StateChannel[V], setNow: BindSet = BindSet.LeftToRight)
(implicit t2v: T => V, v2t: V => T): Binding[T, V] = {
setNow match {
case BindSet.LeftToRight => that := t2v(this)
case BindSet.RightToLeft => this := v2t(that)
case BindSet.None => // Nothing
}
val changing = new AtomicBoolean(false)
val leftToRight = this.attach { t =>
if (changing.compareAndSet(false, true)) {
try {
that := t2v(get)
} finally {
changing.set(false)
}
}
}
val rightToLeft = that.attach { t =>
if (changing.compareAndSet(false, true)) {
try {
StateChannel.this := v2t(that.get)
} finally {
changing.set(false)
}
}
}
new Binding(this, that, leftToRight, rightToLeft)
}
}
开发者ID:outr,项目名称:reactify,代码行数:39,代码来源:StateChannel.scala
示例6: BarkLiceBoot
//设置package包名称以及导入依赖的类
package com.flipkart.connekt.barklice
import java.util.concurrent.atomic.AtomicBoolean
import com.flipkart.connekt.commons.connections.ConnectionProvider
import com.flipkart.connekt.commons.core.BaseApp
import com.flipkart.connekt.commons.dao.DaoFactory
import com.flipkart.connekt.commons.factories.{ConnektLogger, LogFile}
import com.flipkart.connekt.commons.services.ConnektConfig
import com.flipkart.connekt.commons.utils.{NetworkUtils, StringUtils, ConfigUtils}
import flipkart.cp.convert.ha.worker.Bootstrap
object BarkLiceBoot extends BaseApp {
private val initialized = new AtomicBoolean(false)
def start() {
if (!initialized.getAndSet(true)) {
ConnektLogger(LogFile.SERVICE).info("BarkLiceBoot initializing.")
val loggerConfigFile = ConfigUtils.getSystemProperty("log4j.configurationFile").getOrElse("log4j2-barklice.xml")
ConnektLogger(LogFile.SERVICE).info(s"BarkLiceBoot logging using: $loggerConfigFile")
ConnektLogger.init(loggerConfigFile)
val applicationConfigFile = ConfigUtils.getSystemProperty("barklice.appConfigurationFile").getOrElse("barklice-config.json")
ConnektConfig(configServiceHost, configServicePort, apiVersion)(Seq("fk-connekt-root", "fk-connekt-".concat(ConfigUtils.getConfEnvironment), "fk-connekt-barklice"))(applicationConfigFile)
DaoFactory.setUpConnectionProvider(new ConnectionProvider)
val hConfig = ConnektConfig.getConfig("connections.hbase")
DaoFactory.initHTableDaoFactory(hConfig.get)
val hostname = NetworkUtils.getHostname
val instanceId = hostname + "-" + StringUtils.generateRandomStr(5)
ConnektLogger(LogFile.SERVICE).info(s"Starting BarkLice with InstanceId: $instanceId, Hostname : $hostname ...")
new Bootstrap(instanceId, "", hostname).start()
}
}
def terminate() = {
ConnektLogger(LogFile.SERVICE).info("BarkLiceBoot shutting down")
if (initialized.get()) {
DaoFactory.shutdownHTableDaoFactory()
ConnektLogger.shutdown()
}
}
def main(args: Array[String]) {
System.setProperty("log4j.configurationFile", "log4j2-test.xml")
start()
}
}
开发者ID:ayush03agarwal,项目名称:connekt,代码行数:55,代码来源:BarkLiceBoot.scala
示例7: CloseOnReleaseService
//设置package包名称以及导入依赖的类
package com.twitter.finagle.service
import com.twitter.finagle.{Status, Service, ServiceClosedException, ServiceProxy, WriteException}
import com.twitter.util.{Future, Time}
import java.util.concurrent.atomic.AtomicBoolean
private[finagle] class CloseOnReleaseService[Req, Rep](underlying: Service[Req, Rep])
extends ServiceProxy[Req, Rep](underlying)
{
private[this] val wasReleased = new AtomicBoolean(false)
override def apply(request: Req) = {
if (!wasReleased.get) {
super.apply(request)
} else {
Future.exception(
WriteException(new ServiceClosedException))
}
}
override def close(deadline: Time) = {
if (wasReleased.compareAndSet(false, true))
super.close(deadline)
else
Future.Done
}
override def status =
if (wasReleased.get) Status.Closed
else super.status
}
开发者ID:wenkeyang,项目名称:finagle,代码行数:33,代码来源:CloseOnReleaseService.scala
示例8: onException
//设置package包名称以及导入依赖的类
package indi.lewis.spider.thread
import java.util.concurrent.atomic.AtomicBoolean
def onException(job :Job[A], e : Exception) :Unit;
}
def getJobRunner[B](cores :Int,channel :Runner.JobChannel[B]) :Runner[B] = new Runner[B](cores,channel);
def getJobRunner[B](channel :Runner.JobChannel[B]) :Runner[B] = getJobRunner(cpuCores*16,channel);
final lazy val cpuCores:Int = Runtime.getRuntime.availableProcessors()
}
class Runner[C] private (cores :Int,channel : Runner.JobChannel[C]) {
val ts=new Array[Thread](cores);
private val started:java.util.concurrent.atomic.AtomicBoolean=new AtomicBoolean(false);
val endCount=new java.util.concurrent.CountDownLatch(cores);
val starter=new java.util.concurrent.CountDownLatch(1);
def startRun() : Unit= {
startRunAndRet().await()
}
def startRunAndRet():java.util.concurrent.CountDownLatch={
synchronized {
if (started.get() ) throw new RuntimeException(" Runner has been started ! ");
started.set(true);
}
for( i <- 1 to cores){
val th=new Thread(){
override def run(): Unit = {
starter.await();
while(channel.nextJob().map( j =>{
try{
j.run();
channel.onJobFinish(j);
}catch {
case ex :Exception =>channel.onException(j,ex)
}
channel.onJobOver(j)
})!=None) {}
endCount.countDown();
}
}
th.setName(s"JobThread( $i /$cores )");
th.setPriority(Thread.MAX_PRIORITY);
ts(i-1)=th;
th.start()
}
starter.countDown();
endCount
}
}
开发者ID:TokisakiFun,项目名称:Katipo,代码行数:62,代码来源:Runner.scala
示例9: TwitterFutureMonad
//设置package包名称以及导入依赖的类
package org.http4s
import java.util.concurrent.atomic.AtomicBoolean
import com.twitter.util._
import com.twitter.finagle.tracing.Trace
import org.jboss.netty.handler.codec.http.{ DefaultHttpResponse, HttpResponseStatus, HttpVersion => Version }
import scalaz.concurrent.Task
import scalaz.syntax.all._
package object finagle {
val bufferSize = 65536
implicit object TwitterFutureMonad extends FutureMonad
implicit class TwitterFutureToTask[A](val f: Future[A]) extends AnyVal {
def asTask: Task[A] = Task.async { cb =>
f.respond {
case Return(a) => cb(a.right)
case Throw(t) => cb(t.left)
}
}
}
implicit class ScalazTaskToTwitterFuture[A](val t: Task[A]) extends AnyVal {
def asFuture(handler: PartialFunction[Throwable, Unit] = PartialFunction.empty): Future[A] =
new Promise[A] <| { p =>
val cancel = new AtomicBoolean()
p.setInterruptHandler {
case th =>
cancel.set(true)
if (handler.isDefinedAt(th)) handler(th)
}
t.runAsyncInterruptibly({ _.fold(p.setException, p.setValue) }, cancel)
}
}
private[finagle] object OneHundredContinueResponse
extends DefaultHttpResponse(Version.HTTP_1_1, HttpResponseStatus.CONTINUE)
def disableTracer(): Unit =
Trace.disable()
}
开发者ID:lukiano,项目名称:finagle-http4s,代码行数:45,代码来源:package.scala
示例10: HandlerPoolImpl
//设置package包名称以及导入依赖的类
package org.svars
import concurrent.{ Future, ExecutionContext, blocking }
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean }
case class HandlerPoolImpl[D, T](lattice: Lattice[D, T])(implicit executionContext: ExecutionContext) extends HandlerPool[D, T] {
private val runningPuts = new AtomicInteger()
private val hasHadInvalidPut = new AtomicBoolean()
override def doPut(function: => Unit): Future[Unit] = Future {
runningPuts.incrementAndGet
try {
function
} catch {
case e: LVarFrozenException[D, T] =>
hasHadInvalidPut.set(true)
throw e
} finally {
runningPuts.decrementAndGet
}
}
override def doFuture(function: => Unit): Future[Unit] = Future {
function
}
override def quiesce(function: => D): Future[D] = Future {
if (hasHadInvalidPut.get) throw new IllegalStateException()
blocking { while (runningPuts.get != 0) { } }
function
}
}
开发者ID:johanstenberg92,项目名称:SVars,代码行数:38,代码来源:HandlerPoolImpl.scala
示例11: start
//设置package包名称以及导入依赖的类
package com.bwsw.imp.common
import java.util.concurrent.atomic.AtomicBoolean
trait StartStopBehaviour {
private val isStoppedFlag = new AtomicBoolean(true)
def start() = {
if(!isStoppedFlag.getAndSet(false))
throw new IllegalStateException(s"Object $this is already started. Next start is available only after stop call.")
}
def stop() = {
if(isStoppedFlag.getAndSet(true))
throw new IllegalStateException(s"Object $this is already stopped. Next stop is available only after start call.")
}
def isStopped = isStoppedFlag.get()
}
开发者ID:bwsw,项目名称:imp,代码行数:19,代码来源:StartStopBehaviour.scala
示例12: TransactionGenerator
//设置package包名称以及导入依赖的类
package com.bwsw.sj.transaction.generator.server
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
import com.bwsw.sj.common.utils.TransactionGeneratorLiterals
class TransactionGenerator(socket: Socket, doesServerWork: AtomicBoolean) extends Runnable {
private val counter = new AtomicInteger(0)
private val currentMillis = new AtomicLong(0)
private val inputStream = new DataInputStream(socket.getInputStream)
private val outputStream = new DataOutputStream(socket.getOutputStream)
private val scale = TransactionGeneratorLiterals.scale
override def run(): Unit = {
try {
while (doesServerWork.get()) {
if (isClientAvailable) {
val id = generateID()
send(id)
} else {
close()
return
}
}
} catch {
case ex: Exception =>
close()
}
}
private def isClientAvailable = {
val clientRequestStatus = inputStream.read()
clientRequestStatus != -1
}
private def generateID() = this.synchronized {
val now = System.currentTimeMillis()
if (now - currentMillis.get > 0) {
currentMillis.set(now)
counter.set(0)
}
now * scale + counter.getAndIncrement()
}
private def send(id: Long) {
outputStream.writeLong(id)
}
private def close() = {
inputStream.close()
outputStream.close()
socket.close()
}
}
开发者ID:bwsw,项目名称:sj-platform,代码行数:58,代码来源:TransactionGenerator.scala
示例13: cancel
//设置package包名称以及导入依赖的类
package io.eels.datastream
import java.util.concurrent.atomic.AtomicBoolean
trait Subscription {
def cancel()
}
object Subscription {
val empty = new Subscription {
override def cancel(): Unit = ()
}
def fromRunning(running: AtomicBoolean) = new Subscription {
assert(running.get)
override def cancel(): Unit = running.set(false)
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:20,代码来源:Subscription.scala
示例14: DataStreamPublisher
//设置package包名称以及导入依赖的类
package io.eels.datastream
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicMarkableReference, AtomicReference}
import com.sksamuel.exts.collection.BlockingQueueConcurrentIterator
import io.eels.Row
import io.eels.schema.StructType
class DataStreamPublisher(override val schema: StructType) extends DataStream {
private val queue = new LinkedBlockingQueue[Seq[Row]]
private val running = new AtomicBoolean(true)
private val failure = new AtomicReference[Throwable](null)
def isCancelled: Boolean = !running.get
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
try {
subscriber.subscribed(new Subscription {
override def cancel(): Unit = {
queue.clear()
queue.put(Row.Sentinel)
running.set(false)
}
})
BlockingQueueConcurrentIterator(queue, Row.Sentinel).takeWhile(_ => running.get).foreach(subscriber.next)
failure.get match {
case t: Throwable => subscriber.error(t)
case _ => subscriber.completed()
}
} catch {
case t: Throwable => subscriber.error(t)
}
}
def publish(row: Seq[Row]): Unit = queue.put(row)
def error(t: Throwable): Unit = {
failure.set(t)
queue.clear()
queue.add(Row.Sentinel)
}
def close(): Unit = queue.add(Row.Sentinel)
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:46,代码来源:DataStreamPublisher.scala
示例15: SequenceSource
//设置package包名称以及导入依赖的类
package io.eels.component.sequence
import java.util.concurrent.atomic.AtomicBoolean
import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels._
import io.eels.datastream.{DataStream, Publisher, Subscriber, Subscription}
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, IntWritable, SequenceFile}
case class SequenceSource(path: Path)(implicit conf: Configuration) extends Source with Logging {
logger.debug(s"Creating sequence source from $path")
override def schema: StructType = SequenceSupport.schema(path)
override def parts(): Seq[Publisher[Seq[Row]]] = List(new SequencePublisher(path))
}
object SequenceReaderIterator {
def apply(schema: StructType, reader: SequenceFile.Reader): Iterator[Row] = new Iterator[Row] {
private val k = new IntWritable()
private val v = new BytesWritable()
// throw away the header
reader.next(k, v)
override def next(): Row = Row(schema, SequenceSupport.toValues(v).toVector)
override def hasNext(): Boolean = reader.next(k, v)
}
}
class SequencePublisher(val path: Path)(implicit conf: Configuration) extends Publisher[Seq[Row]] with Logging with Using {
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
try {
using(SequenceSupport.createReader(path)) { reader =>
val schema = SequenceSupport.schema(path)
val running = new AtomicBoolean(true)
subscriber.subscribed(Subscription.fromRunning(running))
SequenceReaderIterator(schema, reader)
.takeWhile(_ => running.get)
.grouped(DataStream.DefaultBatchSize)
.foreach(subscriber.next)
subscriber.completed()
}
} catch {
case t: Throwable => subscriber.error(t)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:52,代码来源:SequenceSource.scala
示例16: AvroSource
//设置package包名称以及导入依赖的类
package io.eels.component.avro
import java.io.File
import java.util.concurrent.atomic.AtomicBoolean
import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels._
import io.eels.datastream.{DataStream, Publisher, Subscriber, Subscription}
import io.eels.schema.StructType
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
case class AvroSource(path: Path)
(implicit conf: Configuration, fs: FileSystem) extends Source with Using {
override lazy val schema: StructType = {
using(AvroReaderFns.createAvroReader(path)) { reader =>
val record = reader.next()
AvroSchemaFns.fromAvroSchema(record.getSchema)
}
}
override def parts(): Seq[Publisher[Seq[Row]]] = Seq(AvroSourcePublisher(path))
}
case class AvroSourcePublisher(path: Path)
(implicit conf: Configuration, fs: FileSystem)
extends Publisher[Seq[Row]] with Logging with Using {
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
val deserializer = new AvroDeserializer()
try {
using(AvroReaderFns.createAvroReader(path)) { reader =>
val running = new AtomicBoolean(true)
subscriber.subscribed(Subscription.fromRunning(running))
AvroRecordIterator(reader)
.takeWhile(_ => running.get)
.map(deserializer.toRow)
.grouped(DataStream.DefaultBatchSize)
.foreach(subscriber.next)
subscriber.completed()
}
} catch {
case t: Throwable => subscriber.error(t)
}
}
}
object AvroSource {
def apply(file: File)(implicit conf: Configuration, fs: FileSystem): AvroSource = AvroSource(new Path(file.getAbsoluteFile.toString))
def apply(path: java.nio.file.Path)(implicit conf: Configuration, fs: FileSystem): AvroSource = apply(path.toFile)
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:53,代码来源:AvroSource.scala
示例17: AvroParquetPublisher
//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro
import java.util.concurrent.atomic.AtomicBoolean
import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import io.eels.component.avro.AvroDeserializer
import io.eels.component.parquet.util.ParquetIterator
import io.eels.datastream.{DataStream, Publisher, Subscriber, Subscription}
import io.eels.{Predicate, Row}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
class AvroParquetPublisher(path: Path,
predicate: Option[Predicate])(implicit conf: Configuration)
extends Publisher[Seq[Row]] with Logging with Using {
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
try {
val deser = new AvroDeserializer()
val running = new AtomicBoolean(true)
subscriber.subscribed(Subscription.fromRunning(running))
using(AvroParquetReaderFn(path, predicate, None)) { reader =>
ParquetIterator(reader)
.takeWhile(_ => running.get)
.map(deser.toRow)
.grouped(DataStream.DefaultBatchSize)
.foreach(subscriber.next)
}
subscriber.completed()
} catch {
case t: Throwable => subscriber.error(t)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:36,代码来源:AvroParquetPublisher.scala
示例18: CsvPublisher
//设置package包名称以及导入依赖的类
package io.eels.component.csv
import java.io.InputStream
import java.util.concurrent.atomic.AtomicBoolean
import com.sksamuel.exts.Logging
import com.sksamuel.exts.io.Using
import com.univocity.parsers.csv.CsvParser
import io.eels.Row
import io.eels.datastream.{DataStream, Publisher, Subscriber, Subscription}
import io.eels.schema.StructType
class CsvPublisher(createParser: () => CsvParser,
inputFn: () => InputStream,
header: Header,
skipBadRows: Boolean,
schema: StructType) extends Publisher[Seq[Row]] with Logging with Using {
val rowsToSkip: Int = header match {
case Header.FirstRow => 1
case _ => 0
}
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
val input = inputFn()
val parser = createParser()
try {
parser.beginParsing(input)
val running = new AtomicBoolean(true)
subscriber.subscribed(Subscription.fromRunning(running))
Iterator.continually(parser.parseNext)
.takeWhile(_ != null)
.takeWhile(_ => running.get)
.drop(rowsToSkip)
.map { records => Row(schema, records.toVector) }
.grouped(DataStream.DefaultBatchSize)
.foreach(subscriber.next)
subscriber.completed()
} catch {
case t: Throwable => subscriber.error(t)
} finally {
parser.stopParsing()
input.close()
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:54,代码来源:CsvPublisher.scala
示例19: ElasticsearchSource
//设置package包名称以及导入依赖的类
package io.eels.component.elasticsearch
import java.util.concurrent.atomic.AtomicBoolean
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.search.SearchIterator
import io.eels.datastream.{Publisher, Subscriber, Subscription}
import io.eels.schema.{Field, StructType}
import io.eels.{Row, Source}
import scala.concurrent.duration._
class ElasticsearchSource(index: String)(implicit client: HttpClient) extends Source {
import com.sksamuel.elastic4s.http.ElasticDsl._
implicit val duration = 5.minutes
override def schema: StructType = {
val resp = client.execute {
getMapping(index)
}.await.head.mappings
val fields = resp.head._2.keys.map { key => Field(key) }.toSeq
StructType(fields)
}
override def parts(): Seq[Publisher[Seq[Row]]] = Seq(new ElasticsearchPublisher(index))
}
class ElasticsearchPublisher(index: String)(implicit client: HttpClient) extends Publisher[Seq[Row]] {
import com.sksamuel.elastic4s.http.ElasticDsl._
implicit val duration = 5.minutes
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
try {
val running = new AtomicBoolean(true)
subscriber.subscribed(Subscription.fromRunning(running))
SearchIterator.hits(client, search(index).matchAllQuery.keepAlive("1m").size(50))
.takeWhile(_ => running.get)
.grouped(50)
.foreach { batch =>
val chunk = batch.map { hit =>
val schema = StructType(hit.fields.keys.map { key => Field(key) }.toSeq)
Row(schema, hit.fields.values.toVector)
}
subscriber.next(chunk)
}
subscriber.completed()
} catch {
case t: Throwable => subscriber.error(t)
}
}
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:56,代码来源:ElasticsearchSource.scala
示例20: CloseOnReleaseService
//设置package包名称以及导入依赖的类
package com.twitter.finagle.service
import com.twitter.finagle.{Service, ServiceClosedException, ServiceProxy, WriteException}
import com.twitter.util.{Future, Time}
import java.util.concurrent.atomic.AtomicBoolean
private[finagle] class CloseOnReleaseService[Req, Rep](underlying: Service[Req, Rep])
extends ServiceProxy[Req, Rep](underlying)
{
private[this] val wasReleased = new AtomicBoolean(false)
override def apply(request: Req) = {
if (!wasReleased.get) {
super.apply(request)
} else {
Future.exception(
WriteException(new ServiceClosedException))
}
}
override def close(deadline: Time) = {
if (wasReleased.compareAndSet(false, true))
super.close(deadline)
else
Future.Done
}
override def isAvailable = !wasReleased.get && super.isAvailable
}
开发者ID:deenar,项目名称:fintest,代码行数:30,代码来源:CloseOnReleaseService.scala
注:本文中的java.util.concurrent.atomic.AtomicBoolean类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论