本文整理汇总了Scala中java.util.concurrent.LinkedBlockingQueue类的典型用法代码示例。如果您正苦于以下问题:Scala LinkedBlockingQueue类的具体用法?Scala LinkedBlockingQueue怎么用?Scala LinkedBlockingQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了LinkedBlockingQueue类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ReqRealTimeBarsHandler
//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers
import java.util.concurrent.CountDownLatch
import scalaz._, Scalaz._
import scalaz.concurrent._
import messages._
import contract._
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import com.ib.client.EClientSocket
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logging
class ReqRealTimeBarsHandler(security: Stock ,
ibActor: Actor[FibsPromiseMessage \/ IBMessage],
tickerId: Int, socket: EClientSocketLike) extends FibsPromise[CloseableStream[RealTimeBar]] with Logging {
private[this] val TickerId = tickerId
val latch = new CountDownLatch(0) // don't need to block
val actor = Actor[IBMessage] {
case RealTimeBarResp(TickerId, time, open, high, low, close, volume, count, wap) ?
queue.add(RealTimeBar(new DateTime(time * 1000), open, high, low, close, volume, count, wap).some)
case _ ? ???
}
val barHandler: PartialFunction[IBMessage, Unit] = {
case [email protected](TickerId, time, open, high, low, close, volume, count, wap) ? actor ! m
}
val patterns = List(barHandler)
private[this] def toStream: EphemeralStream[RealTimeBar] = {
val ret: EphemeralStream[RealTimeBar] = queue.take match {
case Some(d) ? EphemeralStream.cons(d, toStream)
case None ? EphemeralStream.emptyEphemeralStream
}
ret
}
private[this] val queue: BlockingQueue[Option[RealTimeBar]] =
new LinkedBlockingQueue[Option[RealTimeBar]]()
private[this] def closeStream = {
queue add None
socket.cancelRealTimeBars(tickerId)
ibActor ! UnregisterFibsPromise(this).left
}
def get = new CloseableStream[RealTimeBar] {
def close = closeStream
lazy val as = toStream
}
}
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:49,代码来源:ReqRealTimeBarsHandler.scala
示例2: ReqHistoricalDataHandler
//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import scalaz._, Scalaz._
import scalaz.concurrent._
import com.github.nscala_time.time.Imports._
import messages._
import contract._
class ReqHistoricalDataHandler(security: Stock ,
ibActor: Actor[FibsPromiseMessage \/ IBMessage],
tickerId: Int) extends FibsPromise[Stream[HistoricalDataPeriod]] {
private[this] val TickerId = tickerId
val actor = Actor[IBMessage]{
case d @ HistoricalData(TickerId, time, _, _, _, _, _, _, _, _) if (!time.startsWith("finished-")) =>
enqueue(transformMsg(d))
case d @ HistoricalData(TickerId, time, _, _, _, _, _, _, _, _) if (time.startsWith("finished-")) =>
close
case d @ HistoricalDataError(TickerId, 162, msg) if (msg contains "HMDS query returned no data") =>
close
case _ => ???
}
val historicalDataHandler: PartialFunction[IBMessage, Unit] = {
case d @ HistoricalData(TickerId, _, _, _, _, _, _, _, _, _) => actor ! d
case d @ HistoricalDataError(TickerId, 162, msg) if (msg contains "HMDS query returned no data") =>
actor ! d
}
val patterns = List(historicalDataHandler)
val latch = new CountDownLatch(0) // don't need to block
def get = toStream
private[this] def transformMsg(i: HistoricalData) =
HistoricalDataPeriod(
new DateTime(i.date.parseLong.toOption.getOrElse(0L) * 1000),
i.open,
i.high,
i.low,
i.close,
i.volume,
i.count,
i.wap,
i.hasGaps)
private[this] val queue: BlockingQueue[Option[HistoricalDataPeriod]] =
new LinkedBlockingQueue[Option[HistoricalDataPeriod]]()
private[this] def close = {
queue add None
ibActor ! UnregisterFibsPromise(this).left
}
private[this] def enqueue(d: HistoricalDataPeriod) = queue add d.some
private[this] def toStream(): Stream[HistoricalDataPeriod] = queue.take match {
case Some(d) => Stream.cons(d, toStream)
case None => Stream.empty
}
}
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:60,代码来源:ReqHistoricalDataHandler.scala
示例3: ReqMarketTickDataStreamHandler
//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers
import java.util.concurrent.CountDownLatch
import scalaz._, Scalaz._
import scalaz.concurrent._
import messages._
import contract._
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import com.ib.client.EClientSocket
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logging
class ReqMarketTickDataStreamHandler(security: Stock ,
ibActor: Actor[FibsPromiseMessage \/ IBMessage],
tickerId: Int, socket: EClientSocketLike) extends FibsPromise[CloseableStream[MarketTickDataResult]] with Logging {
private[this] val TickerId = tickerId
val latch = new CountDownLatch(0) // don't need to block
private[this] val RTVolumePattern = "(\\d+\\.?\\d*);(\\d+);(\\d+);(\\d+);(\\d+\\.?\\d*);(true|false)".r
val actor = Actor[IBMessage] {
case TickString(TickerId, RTVolume, v) ?
parseInput(v).cata(some = t => queue.add(t.some),
none = warn(s"error parsing tick data: $v"))
case _ ? ???
}
def parseInput(s: String) = s match {
case RTVolumePattern(p, s, t, v, w, f) =>
(p.parseDouble.toOption |@|
s.parseInt.toOption |@|
v.parseInt.toOption |@|
t.parseLong.toOption |@|
w.parseDouble.toOption |@|
f.parseBoolean.toOption)(MarketTickDataResult.apply)
case _ => none
}
val stringHandler: PartialFunction[IBMessage, Unit] = {
case [email protected](tickerId, RTVolume, _) ? actor ! m
}
val patterns = List(stringHandler)
private[this] val queue: BlockingQueue[Option[MarketTickDataResult]] =
new LinkedBlockingQueue[Option[MarketTickDataResult]]()
private[this] def closeStream = {
queue add None
socket.cancelMktData(TickerId)
ibActor ! UnregisterFibsPromise(this).left
}
private[this] def toStream: EphemeralStream[MarketTickDataResult] = {
val ret: EphemeralStream[MarketTickDataResult] = queue.take match {
case Some(d) ? EphemeralStream.cons(d, toStream)
case None ? EphemeralStream.emptyEphemeralStream
}
ret
}
def get = new CloseableStream[MarketTickDataResult] {
def close = closeStream
lazy val as = toStream
}
}
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:61,代码来源:ReqMarketTickDataStreamHandler.scala
示例4: ActorThread
//设置package包名称以及导入依赖的类
package spooky.actor
import scala.collection.JavaConversions._
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CopyOnWriteArraySet
import spooky.io.Tcp
private class ActorThread(private val actorFactory: ActorFactory, private val queue: LinkedBlockingQueue[Tuple2[ActorRef, Any]], private val deathPact: CopyOnWriteArraySet[ActorRef], private implicit val self: ActorRef) extends Runnable {
def run(): Unit = {
ActorContext.setSelf(self)
val actor = actorFactory.create
actor.context.become(actor.receive)
try {
while (!Thread.interrupted()) {
val entry = queue.take()
val sender = entry._1
actor.context._sender = sender
val value = entry._2
val receiver = actor.context.receive
if (receiver.isDefinedAt(value)) {
receiver(entry._2)
} else actor.unhandled(value)
}
println("interrupted")
} catch {
case e: DeathPactException =>
case e: Throwable => e.printStackTrace()
} finally {
terminate(deathPact)
ActorContext.setSelf(null)
println(s"terminate: ${actor.getClass.getSimpleName}")
}
}
private def terminate(deathPact: CopyOnWriteArraySet[ActorRef]): Unit = {
for (c <- deathPact) {
c ! Terminated(self)
}
deathPact.clear()
}
}
开发者ID:zpooky,项目名称:bittorrent,代码行数:45,代码来源:ActorThread.scala
示例5: ActorRef
//设置package包名称以及导入依赖的类
package spooky.actor
import scala.collection.JavaConversions._
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CopyOnWriteArraySet
class ActorRef private[actor] (private val c: Class[_], private val queue: LinkedBlockingQueue[Tuple2[ActorRef, Any]], private val deathPact: CopyOnWriteArraySet[ActorRef]) {
private[actor] def registerDeathPack(actorRef: ActorRef): Unit = deathPact.add(actorRef)
def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
// if(sender == null){
// println("Sender is null")
// Thread.dumpStack()
// }
assert(queue.add((sender, msg)))
}
override def toString:String = s"ActorRef[${c.getSimpleName}]"
}
开发者ID:zpooky,项目名称:bittorrent,代码行数:20,代码来源:ActorRef.scala
示例6: ExecutionThread
//设置package包名称以及导入依赖的类
package ppl.delite.runtime.executor
import java.util.concurrent.LinkedBlockingQueue
import ppl.delite.runtime.Config
import ppl.delite.runtime.Delite
import ppl.delite.runtime.codegen.DeliteExecutable
class ExecutionThread extends Runnable {
// the work queue for this ExecutionThread
// synchronization is handled by the queue implementation
private[executor] val queue = new LinkedBlockingQueue[DeliteExecutable](Config.queueSize) //work queue
private[executor] var continue: Boolean = true
//this loop should be terminated by interrupting the thread
def run {
while(continue) {
try {
val work = queue.take
executeWork(work)
}
catch {
case i: InterruptedException => continue = false //another thread threw an exception -> exit silently
case e: Throwable => {
Delite.shutdown(e)
continue = false
}
}
}
}
// how to execute work
protected def executeWork(work: DeliteExecutable) = work.run
}
开发者ID:leratojeffrey,项目名称:OptiSDR-Compiler,代码行数:39,代码来源:ExecutionThread.scala
示例7: CatClient
//设置package包名称以及导入依赖的类
package ru.ifmo.ctddev.semenov.dkvs.client
import java.io._
import java.net.Socket
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.LinkedBlockingQueue
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
class CatClient(socket: Socket) {
val consoleReader = new BufferedReader(new InputStreamReader(System.in, UTF_8))
val consoleWriter = new PrintWriter(new OutputStreamWriter(System.out, UTF_8))
val socketReader = new BufferedReader(new InputStreamReader(socket.getInputStream, UTF_8))
val socketWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream, UTF_8), true)
val requests = new LinkedBlockingQueue[String]()
val responses = new LinkedBlockingQueue[String]()
def start(): Unit = {
def start(action: () => Unit) = {
new Thread(new Runnable() {
override def run() = action()
}).start()
}
start(readRequest)
start(writeRequest)
start(readResponse)
start(writeResponse)
}
private def readRequest() = interact(consoleReader.readLine, requests add _, "exit reading requests")
private def writeRequest() = interact(requests.take, socketWriter.println, "exit writing requests")
private def readResponse() = interact(socketReader.readLine, responses add _, "exit reading responses")
private def writeResponse() = interact(responses.take, consoleWriter.println, "exit writing responses")
@tailrec private def interact(read: () => String, write: String => Unit, exitMessage: String): Unit = {
val line = read()
if (line == null) {
consoleWriter println exitMessage
return
}
write(line)
interact(read, write, exitMessage)
}
}
object Connect {
def main(args: Array[String]) {
if (args.length != 2) println("Usage: Client <host> <port>")
else Try(new Socket(args(0), args(1).toInt)) match {
case Success(socket) => new CatClient(socket).start()
case Failure(exception) =>
println(s"Cannot connect to ${args(0)}:${args(1)}")
exception.printStackTrace()
}
}
}
开发者ID:vadimsemenov,项目名称:distributed-key-value-storage,代码行数:61,代码来源:CatClient.scala
示例8: cancel
//设置package包名称以及导入依赖的类
package com.github.madoc.runsbt.running
import java.io.File
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import com.github.madoc.runsbt.events.{SBTEvent, SBTEventParser}
import scala.sys.process._
trait SBTProcess {
def cancel()
lazy val events:Stream[SBTEvent] = SBTEventParser(outputLines)
def exitValue:Int
def outputLines:Stream[String]
}
object SBTProcess {
case class BasedOnProcessBuilder(builder:ProcessBuilder, workingDirectory:File, command:SBTCommand) extends SBTProcess {
def exitValue = process exitValue
def cancel() {process destroy(); buffer put EOS}
lazy val outputLines:Stream[String] = nextLogLine()
private sealed trait Element
private sealed case class Line(string:String) extends Element
private object EOS extends Element
private val buffer:BlockingQueue[Element] = new LinkedBlockingQueue[Element]()
private val process:Process = builder.run(ProcessLogger(str ? buffer put Line(str)))
private def nextLogLine():Stream[String] = buffer take match {
case EOS ? buffer put EOS; Stream.empty
case Line(string) ? Stream cons (cleanUp(string), nextLogLine())
}
private def cleanUp(str:String):String = str replaceAll ("\\e\\[[\\d;]*[^\\d;]", "")
locally {new Thread() {
override def run() {process.exitValue(); buffer put EOS}
}.start()}
override def toString:String = s"SBTProcess(directory=$workingDirectory, command=$command, process=$builder)"
}
}
开发者ID:Madoc,项目名称:run-sbt,代码行数:43,代码来源:SBTProcess.scala
示例9: InMemoryEventStream
//设置package包名称以及导入依赖的类
package io.bfil.eventsourcing.inmemory
import java.util.concurrent.LinkedBlockingQueue
import scala.concurrent.Future
import io.bfil.eventsourcing.{EventEnvelope, EventStream}
class InMemoryEventStream[Event] extends EventStream[Event] {
private val queue = new LinkedBlockingQueue[EventEnvelope[Event]]()
def publish(event: EventEnvelope[Event]): Unit = queue.put(event)
def subscribe(f: EventEnvelope[Event] => Future[Unit], offset: Long = 0): Unit =
new Thread(() =>
while(true) {
val envelope = queue.take()
if (envelope.offset > offset) f(envelope)
}
).start()
}
开发者ID:bfil,项目名称:simple-eventsourcing,代码行数:20,代码来源:InMemoryEventStream.scala
示例10: SummerClass
//设置package包名称以及导入依赖的类
package main.scala.slaves
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks._
class SummerClass(queue: LinkedBlockingQueue[Double]) extends Runnable {
val batch_size = 3200
private val sharedQueue = queue
def run() {
while (true) {
try {
var input_data_list_buffer = new ListBuffer[Double]()
var element = 0.0
for (i <- 1 to batch_size) {
element = sharedQueue.take()
if (element != null)
input_data_list_buffer += element
else
break
}
val resultFromProcessor = summer(input_data_list_buffer.toList)
} catch {
case ex: InterruptedException => println("Interrupted Exception")
}
}
}
//Processor
def summer(input_data_list: List[Double]): Double = {
return input_data_list.foldLeft(0.00)(_ + _)
}
}
开发者ID:ahamshubham,项目名称:scala-speech,代码行数:36,代码来源:SummerClass.scala
示例11: Master
//设置package包名称以及导入依赖的类
package main.scala
import java.util.concurrent.LinkedBlockingQueue
import slaves._
object Master {
def main(args: Array[String]):Unit = {
val numberOfQueues: Int = 2
//Creating a list of shared queues
val sharedQueues: Map[String,LinkedBlockingQueue[Double]] = Map(
"listener-summer" -> new LinkedBlockingQueue(),
"listener-plotter" -> new LinkedBlockingQueue(),
"listener-recorder" -> new LinkedBlockingQueue()
)
//Creating threads
val listenerThread = new Thread(new ListenerClass(sharedQueues("listener-summer"),sharedQueues("listener-plotter"),sharedQueues("listener-recorder")))
val summerThread = new Thread(new SummerClass(sharedQueues("listener-summer")))
val plotterThread = new Thread(new RealTimeAudioPlotterClass(sharedQueues("listener-plotter")))
val recorderThread = new Thread(new RecorderClass(sharedQueues("listener-recorder")))
//Starting threads
listenerThread.start()
summerThread.start()
plotterThread.start()
recorderThread.start()
}
}
开发者ID:ahamshubham,项目名称:scala-speech,代码行数:34,代码来源:Master.scala
示例12: DataStreamPublisher
//设置package包名称以及导入依赖的类
package io.eels.datastream
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicMarkableReference, AtomicReference}
import com.sksamuel.exts.collection.BlockingQueueConcurrentIterator
import io.eels.Row
import io.eels.schema.StructType
class DataStreamPublisher(override val schema: StructType) extends DataStream {
private val queue = new LinkedBlockingQueue[Seq[Row]]
private val running = new AtomicBoolean(true)
private val failure = new AtomicReference[Throwable](null)
def isCancelled: Boolean = !running.get
override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
try {
subscriber.subscribed(new Subscription {
override def cancel(): Unit = {
queue.clear()
queue.put(Row.Sentinel)
running.set(false)
}
})
BlockingQueueConcurrentIterator(queue, Row.Sentinel).takeWhile(_ => running.get).foreach(subscriber.next)
failure.get match {
case t: Throwable => subscriber.error(t)
case _ => subscriber.completed()
}
} catch {
case t: Throwable => subscriber.error(t)
}
}
def publish(row: Seq[Row]): Unit = queue.put(row)
def error(t: Throwable): Unit = {
failure.set(t)
queue.clear()
queue.add(Row.Sentinel)
}
def close(): Unit = queue.add(Row.Sentinel)
}
开发者ID:51zero,项目名称:eel-sdk,代码行数:46,代码来源:DataStreamPublisher.scala
示例13: start
//设置package包名称以及导入依赖的类
package com.github.zabbicook.api
import java.time.Duration
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
sealed trait Throttle {
def start(): Unit
def end(): Unit
}
object NonStrictThrottle extends Throttle {
// do nothing
def start(): Unit = {}
def end(): Unit = {}
}
class StrictThrottle(
concurrency: Int,
timeout: Duration,
startInterval: Duration
) extends Throttle {
private[this] val queue = new LinkedBlockingQueue[Boolean](concurrency)
queue.offer(true)
// blocking
def start(): Unit = {
queue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)
if (startInterval != Duration.ZERO) {
Thread.sleep(startInterval.toMillis)
}
}
def end(): Unit = {
queue.offer(true)
}
}
开发者ID:rerorero,项目名称:zabbicook,代码行数:37,代码来源:Throttle.scala
示例14: ARouter
//设置package包名称以及导入依赖的类
package cas.service
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.pattern.pipe
import cas.analysis.subject.Subject
import cas.persistence.searching.ElasticSearch
import cas.utils.RemoteLogger
import cas.utils.UtilAliases._
import cas.web.dealers.vk.VkApiProtocol.VkPost
import scala.collection.mutable
import scala.collection.mutable.Queue
object ARouter {
case object PullSubjects
case class PulledSubjects(subjs: Subjects)
case class PushingEstimations(estims: Estimations)
case class Estimation(subj: Subject, actuality: Double)
}
class ARouter(producer: ActorRef) extends Actor with ActorLogging {
import ARouter._
val pulledSubjs = new LinkedBlockingQueue[PulledSubjects]()
val waitingWorkers = new LinkedBlockingQueue[ActorRef]()
override def preStart = {
super.preStart()
// log.info("Subjects pulled: " + chunk)
if (waitingWorkers.isEmpty) pulledSubjs.offer(PulledSubjects(chunk), 1000, TimeUnit.MILLISECONDS)
else {
waitingWorkers.poll(1000, TimeUnit.MILLISECONDS) ! PulledSubjects(chunk)
producer ! PullSubjects
}
}
case estims: PushingEstimations => {
// RemoteLogger.info("PushingEstimations: `" + estims.estims.mkString + "`")
// log.info("Estimations computed: " + estims.estims)
producer forward estims
}
}
}
开发者ID:kell18,项目名称:CAS,代码行数:48,代码来源:ARouter.scala
示例15: ThreadPool
//设置package包名称以及导入依赖的类
package command
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
class ThreadPool(n: Int) {
private val jobQueue: BlockingQueue[Job] = new LinkedBlockingQueue()
private val jobThreads = new Array[Thread](n)
@volatile private var shutdown: Boolean = _
for (i <- 0 until n) {
jobThreads(i) = new Worker("Pool Thread " + i)
jobThreads(i).start()
}
def addJob(r: Job): Unit = {
try jobQueue.put(r)
catch {
case e: InterruptedException => Thread.currentThread().interrupt()
}
}
def shutdownPool: Unit = {
while (!jobQueue.isEmpty)
try {
Thread.sleep(1000)
} catch {
case e: InterruptedException => e.printStackTrace()
}
shutdown = true
for (workerThread <- jobThreads) {
workerThread.interrupt()
}
}
private class Worker(name: String) extends Thread(name) {
override def run = {
while (!shutdown)
try {
val r: Job = jobQueue.take()
r.run
} catch {
case e: InterruptedException => {}
}
}
}
}
开发者ID:BBK-PiJ-2015-67,项目名称:sdp-portfolio,代码行数:53,代码来源:ThreadPool.scala
示例16: ScoreThread
//设置package包名称以及导入依赖的类
package io.aigar.score
import com.typesafe.scalalogging.LazyLogging
import io.aigar.model.PlayerRepository
import java.util.concurrent.LinkedBlockingQueue
class ScoreThread(playerRepository: PlayerRepository) extends Runnable
with LazyLogging {
final val modificationQueue = new LinkedBlockingQueue[ScoreModification]
var running: Boolean = true;
def run: Unit = {
while (running) {
saveScore
}
}
def addScoreModification(modification: ScoreModification): Unit = {
modificationQueue.add(modification)
}
def saveScore: Unit = {
val modification = modificationQueue.take
logger.debug(s"Player ${modification.player_id} gained ${modification.value} points.")
playerRepository.addScore(modification.player_id, modification.value)
}
}
开发者ID:DrPandemic,项目名称:aigar.io,代码行数:31,代码来源:ScoreThread.scala
示例17: Test
//设置package包名称以及导入依赖的类
package teleporter.stream.integration.transaction
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
object Test extends App {
val pool = new ThreadPoolExecutor(2, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable](1))
for (i ? 1 to 10) {
pool.execute(new Runnable {
override def run(): Unit = {
while (true) {
println(s"${Thread.currentThread().getName}: $i")
Thread.sleep(2000)
}
}
})
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:19,代码来源:Test.scala
示例18: AsyncProcessor
//设置package包名称以及导入依赖的类
package alexsmirnov.stream
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import org.reactivestreams.Processor
import java.util.concurrent.LinkedBlockingQueue
class AsyncProcessor[A](queueSize: Int = 100) extends Processor[A, A] with PublisherBase[A] with SubscriberBase[A] {
private[this] val executor = Executors.unconfigurableExecutorService(
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](queueSize + 1),
streamsThreadFactory))
def onStart() { request(queueSize) }
def onStop() { cancel() }
def onNext(a: A) {
executor.submit(new Runnable { def run() { sendNext(a); request(1L) } })
}
def onComplete() = {
executor.submit(new Runnable { def run() { sendComplete() } })
}
def onError(t: Throwable) {
stopProducer()
sendError(t)
}
}
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:30,代码来源:AsyncProcessor.scala
注:本文中的java.util.concurrent.LinkedBlockingQueue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论