• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala LinkedBlockingQueue类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中java.util.concurrent.LinkedBlockingQueue的典型用法代码示例。如果您正苦于以下问题:Scala LinkedBlockingQueue类的具体用法?Scala LinkedBlockingQueue怎么用?Scala LinkedBlockingQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了LinkedBlockingQueue类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ReqRealTimeBarsHandler

//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers

import java.util.concurrent.CountDownLatch
import scalaz._, Scalaz._
import scalaz.concurrent._
import messages._
import contract._
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import com.ib.client.EClientSocket
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logging

class ReqRealTimeBarsHandler(security: Stock  ,
                             ibActor: Actor[FibsPromiseMessage \/ IBMessage],
                             tickerId: Int, socket: EClientSocketLike) extends FibsPromise[CloseableStream[RealTimeBar]] with Logging { 
  private[this] val TickerId = tickerId
  val latch = new CountDownLatch(0) // don't need to block
  val actor = Actor[IBMessage] {
    case RealTimeBarResp(TickerId, time, open, high, low, close, volume, count, wap) ? 
      queue.add(RealTimeBar(new DateTime(time * 1000), open, high, low, close, volume, count, wap).some)
    case _ ? ???
  }
  val barHandler: PartialFunction[IBMessage, Unit] = {
    case [email protected](TickerId, time, open, high, low, close, volume, count, wap) ? actor ! m
  }
  val patterns = List(barHandler)
  private[this] def toStream: EphemeralStream[RealTimeBar] = {
    val ret: EphemeralStream[RealTimeBar] = queue.take match {
      case Some(d) ? EphemeralStream.cons(d, toStream)
      case None    ? EphemeralStream.emptyEphemeralStream
    }
    ret
  }
  private[this] val queue: BlockingQueue[Option[RealTimeBar]] =
    new LinkedBlockingQueue[Option[RealTimeBar]]()
  private[this] def closeStream = {
    queue add None
    socket.cancelRealTimeBars(tickerId)
    ibActor ! UnregisterFibsPromise(this).left
  }
  def get = new CloseableStream[RealTimeBar] {
    def close = closeStream 
    lazy val as = toStream
  }
} 
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:49,代码来源:ReqRealTimeBarsHandler.scala


示例2: ReqHistoricalDataHandler

//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }

import scalaz._, Scalaz._
import scalaz.concurrent._
import com.github.nscala_time.time.Imports._

import messages._
import contract._

class ReqHistoricalDataHandler(security: Stock  ,
  ibActor: Actor[FibsPromiseMessage \/ IBMessage],
  tickerId: Int) extends FibsPromise[Stream[HistoricalDataPeriod]] {
  private[this] val TickerId = tickerId
  val actor = Actor[IBMessage]{
    case d @ HistoricalData(TickerId, time, _, _, _, _, _, _, _, _) if (!time.startsWith("finished-")) =>
      enqueue(transformMsg(d))
    case d @ HistoricalData(TickerId, time, _, _, _, _, _, _, _, _) if (time.startsWith("finished-")) =>
      close
    case d @ HistoricalDataError(TickerId, 162, msg) if (msg contains "HMDS query returned no data") =>
      close
    case _ => ???
  }
  val historicalDataHandler: PartialFunction[IBMessage, Unit] = {
    case d @ HistoricalData(TickerId, _, _, _, _, _, _, _, _, _) => actor ! d
    case d @ HistoricalDataError(TickerId, 162, msg) if (msg contains "HMDS query returned no data") =>
      actor ! d
  }
  val patterns = List(historicalDataHandler)
  val latch = new CountDownLatch(0) // don't need to block
  def get = toStream
  private[this] def transformMsg(i: HistoricalData) = 
    HistoricalDataPeriod(
      new DateTime(i.date.parseLong.toOption.getOrElse(0L) * 1000),
      i.open,
      i.high,
      i.low,
      i.close,
      i.volume,
      i.count,
      i.wap,
      i.hasGaps)
  private[this] val queue: BlockingQueue[Option[HistoricalDataPeriod]] =
    new LinkedBlockingQueue[Option[HistoricalDataPeriod]]()
  private[this] def close = {
    queue add None
    ibActor ! UnregisterFibsPromise(this).left
  }
  private[this] def enqueue(d: HistoricalDataPeriod) = queue add d.some
  private[this] def toStream(): Stream[HistoricalDataPeriod] = queue.take match {
    case Some(d) => Stream.cons(d, toStream)
    case None => Stream.empty
  }
} 
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:60,代码来源:ReqHistoricalDataHandler.scala


示例3: ReqMarketTickDataStreamHandler

//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers

import java.util.concurrent.CountDownLatch
import scalaz._, Scalaz._
import scalaz.concurrent._
import messages._
import contract._
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import com.ib.client.EClientSocket
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logging

class ReqMarketTickDataStreamHandler(security: Stock  ,
                                 ibActor: Actor[FibsPromiseMessage \/ IBMessage],
                                 tickerId: Int, socket: EClientSocketLike) extends FibsPromise[CloseableStream[MarketTickDataResult]] with Logging {
  private[this] val TickerId = tickerId
  val latch = new CountDownLatch(0) // don't need to block
  private[this] val RTVolumePattern = "(\\d+\\.?\\d*);(\\d+);(\\d+);(\\d+);(\\d+\\.?\\d*);(true|false)".r
  val actor = Actor[IBMessage] {
    case TickString(TickerId, RTVolume, v) ? 
      parseInput(v).cata(some = t => queue.add(t.some),
                         none = warn(s"error parsing tick data: $v"))
    case _ ? ???
  }
  def parseInput(s: String) = s match {
    case RTVolumePattern(p, s, t, v, w, f) => 
      (p.parseDouble.toOption |@|
       s.parseInt.toOption |@|
       v.parseInt.toOption |@| 
       t.parseLong.toOption |@|
       w.parseDouble.toOption |@|
       f.parseBoolean.toOption)(MarketTickDataResult.apply)
    case _ => none
  }
  val stringHandler: PartialFunction[IBMessage, Unit] = {
    case [email protected](tickerId, RTVolume, _) ? actor ! m
  }
  val patterns = List(stringHandler)
  private[this] val queue: BlockingQueue[Option[MarketTickDataResult]] =
    new LinkedBlockingQueue[Option[MarketTickDataResult]]()
  private[this] def closeStream = {
    queue add None
    socket.cancelMktData(TickerId)
    ibActor ! UnregisterFibsPromise(this).left
  }
  private[this] def toStream: EphemeralStream[MarketTickDataResult] = {
    val ret: EphemeralStream[MarketTickDataResult] = queue.take match {
      case Some(d) ? EphemeralStream.cons(d, toStream)
      case None    ? EphemeralStream.emptyEphemeralStream
    }
    ret
  }
  def get = new CloseableStream[MarketTickDataResult] {
    def close = closeStream 
    lazy val as = toStream
  }
} 
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:61,代码来源:ReqMarketTickDataStreamHandler.scala


示例4: ActorThread

//设置package包名称以及导入依赖的类
package spooky.actor

import scala.collection.JavaConversions._
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CopyOnWriteArraySet
import spooky.io.Tcp

private class ActorThread(private val actorFactory: ActorFactory, private val queue: LinkedBlockingQueue[Tuple2[ActorRef, Any]], private val deathPact: CopyOnWriteArraySet[ActorRef], private implicit val self: ActorRef) extends Runnable {
  def run(): Unit = {
    ActorContext.setSelf(self)
    val actor = actorFactory.create
    actor.context.become(actor.receive)
    try {
      while (!Thread.interrupted()) {
        val entry = queue.take()

        val sender = entry._1
        actor.context._sender = sender

        val value = entry._2

        val receiver = actor.context.receive
        if (receiver.isDefinedAt(value)) {
          receiver(entry._2)
        } else actor.unhandled(value)
      }
      println("interrupted")
    } catch {
      case e: DeathPactException =>
      case e: Throwable          => e.printStackTrace()
    } finally {
      terminate(deathPact)
      ActorContext.setSelf(null)
      println(s"terminate: ${actor.getClass.getSimpleName}")
    }
  }

  private def terminate(deathPact: CopyOnWriteArraySet[ActorRef]): Unit = {
    for (c <- deathPact) {
      c ! Terminated(self)
    }
    deathPact.clear()
  }
} 
开发者ID:zpooky,项目名称:bittorrent,代码行数:45,代码来源:ActorThread.scala


示例5: ActorRef

//设置package包名称以及导入依赖的类
package spooky.actor

import scala.collection.JavaConversions._
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.CopyOnWriteArraySet

class ActorRef private[actor] (private val c: Class[_], private val queue: LinkedBlockingQueue[Tuple2[ActorRef, Any]], private val deathPact: CopyOnWriteArraySet[ActorRef]) {

  private[actor] def registerDeathPack(actorRef: ActorRef): Unit = deathPact.add(actorRef)

  def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
    //    if(sender == null){
    //      println("Sender is null")
    //      Thread.dumpStack()
    //    }
    assert(queue.add((sender, msg)))
  }
  override def toString:String = s"ActorRef[${c.getSimpleName}]"
} 
开发者ID:zpooky,项目名称:bittorrent,代码行数:20,代码来源:ActorRef.scala


示例6: ExecutionThread

//设置package包名称以及导入依赖的类
package ppl.delite.runtime.executor

import java.util.concurrent.LinkedBlockingQueue
import ppl.delite.runtime.Config
import ppl.delite.runtime.Delite
import ppl.delite.runtime.codegen.DeliteExecutable



class ExecutionThread extends Runnable {

  // the work queue for this ExecutionThread
  // synchronization is handled by the queue implementation
  private[executor] val queue = new LinkedBlockingQueue[DeliteExecutable](Config.queueSize) //work queue

  private[executor] var continue: Boolean = true

  //this loop should be terminated by interrupting the thread
  def run {
    while(continue) {
      try {
        val work = queue.take
        executeWork(work)
      }
      catch {
        case i: InterruptedException => continue = false //another thread threw an exception -> exit silently
        case e: Throwable => {
          Delite.shutdown(e)
          continue = false
        }
      }
    }
  }

  // how to execute work
  protected def executeWork(work: DeliteExecutable) = work.run

} 
开发者ID:leratojeffrey,项目名称:OptiSDR-Compiler,代码行数:39,代码来源:ExecutionThread.scala


示例7: CatClient

//设置package包名称以及导入依赖的类
package ru.ifmo.ctddev.semenov.dkvs.client

import java.io._
import java.net.Socket
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.LinkedBlockingQueue

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}


class CatClient(socket: Socket) {
  val consoleReader = new BufferedReader(new InputStreamReader(System.in, UTF_8))
  val consoleWriter = new PrintWriter(new OutputStreamWriter(System.out, UTF_8))
  val socketReader = new BufferedReader(new InputStreamReader(socket.getInputStream, UTF_8))
  val socketWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream, UTF_8), true)

  val requests  = new LinkedBlockingQueue[String]()
  val responses = new LinkedBlockingQueue[String]()

  def start(): Unit = {
    def start(action: () => Unit) = {
      new Thread(new Runnable() {
        override def run() = action()
      }).start()
    }

    start(readRequest)
    start(writeRequest)
    start(readResponse)
    start(writeResponse)
  }

  private def readRequest() = interact(consoleReader.readLine, requests add _, "exit reading requests")
  private def writeRequest() = interact(requests.take, socketWriter.println, "exit writing requests")
  private def readResponse() = interact(socketReader.readLine, responses add _, "exit reading responses")
  private def writeResponse() = interact(responses.take, consoleWriter.println, "exit writing responses")

  @tailrec private def interact(read: () => String, write: String => Unit, exitMessage: String): Unit = {
    val line = read()
    if (line == null) {
      consoleWriter println exitMessage
      return
    }
    write(line)
    interact(read, write, exitMessage)
  }
}

object Connect {
  def main(args: Array[String]) {
    if (args.length != 2) println("Usage: Client <host> <port>")
    else Try(new Socket(args(0), args(1).toInt)) match {
      case Success(socket)    => new CatClient(socket).start()
      case Failure(exception) =>
        println(s"Cannot connect to ${args(0)}:${args(1)}")
        exception.printStackTrace()
    }
  }
} 
开发者ID:vadimsemenov,项目名称:distributed-key-value-storage,代码行数:61,代码来源:CatClient.scala


示例8: cancel

//设置package包名称以及导入依赖的类
package com.github.madoc.runsbt.running

import java.io.File
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

import com.github.madoc.runsbt.events.{SBTEvent, SBTEventParser}

import scala.sys.process._

trait SBTProcess {
  def cancel()
  lazy val events:Stream[SBTEvent] = SBTEventParser(outputLines)
  def exitValue:Int
  def outputLines:Stream[String]
}
object SBTProcess {
  case class BasedOnProcessBuilder(builder:ProcessBuilder, workingDirectory:File, command:SBTCommand) extends SBTProcess {
    def exitValue = process exitValue
    def cancel() {process destroy(); buffer put EOS}
    lazy val outputLines:Stream[String] = nextLogLine()

    private sealed trait Element
    private sealed case class Line(string:String) extends Element
    private object EOS extends Element

    private val buffer:BlockingQueue[Element] = new LinkedBlockingQueue[Element]()
    private val process:Process = builder.run(ProcessLogger(str ? buffer put Line(str)))

    private def nextLogLine():Stream[String] = buffer take match {
      case EOS ? buffer put EOS; Stream.empty
      case Line(string) ? Stream cons (cleanUp(string), nextLogLine())
    }

    private def cleanUp(str:String):String = str replaceAll ("\\e\\[[\\d;]*[^\\d;]", "")

    locally {new Thread() {
      override def run() {process.exitValue(); buffer put EOS}
    }.start()}

    override def toString:String = s"SBTProcess(directory=$workingDirectory, command=$command, process=$builder)"
  }
} 
开发者ID:Madoc,项目名称:run-sbt,代码行数:43,代码来源:SBTProcess.scala


示例9: InMemoryEventStream

//设置package包名称以及导入依赖的类
package io.bfil.eventsourcing.inmemory

import java.util.concurrent.LinkedBlockingQueue

import scala.concurrent.Future

import io.bfil.eventsourcing.{EventEnvelope, EventStream}

class InMemoryEventStream[Event] extends EventStream[Event] {
  private val queue = new LinkedBlockingQueue[EventEnvelope[Event]]()
  def publish(event: EventEnvelope[Event]): Unit = queue.put(event)
  def subscribe(f: EventEnvelope[Event] => Future[Unit], offset: Long = 0): Unit =
    new Thread(() =>
      while(true) {
        val envelope = queue.take()
        if (envelope.offset > offset) f(envelope)
      }
    ).start()
} 
开发者ID:bfil,项目名称:simple-eventsourcing,代码行数:20,代码来源:InMemoryEventStream.scala


示例10: SummerClass

//设置package包名称以及导入依赖的类
package main.scala.slaves

import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks._

class SummerClass(queue: LinkedBlockingQueue[Double]) extends Runnable {
  val batch_size = 3200
  private val sharedQueue = queue

  def run() {
    while (true) {
      try {
        var input_data_list_buffer = new ListBuffer[Double]()
        var element = 0.0
        for (i <- 1 to batch_size) {
          element = sharedQueue.take()
          if (element != null)
            input_data_list_buffer += element
          else
            break
        }
        val resultFromProcessor = summer(input_data_list_buffer.toList)
      } catch {
        case ex: InterruptedException => println("Interrupted Exception")
      }
    }
  }

  //Processor
  def summer(input_data_list: List[Double]): Double = {
    return input_data_list.foldLeft(0.00)(_ + _)
  }

} 
开发者ID:ahamshubham,项目名称:scala-speech,代码行数:36,代码来源:SummerClass.scala


示例11: Master

//设置package包名称以及导入依赖的类
package main.scala

import java.util.concurrent.LinkedBlockingQueue

import slaves._

object Master {

  def main(args: Array[String]):Unit = {

    val numberOfQueues: Int = 2

    //Creating a list of shared queues
    val sharedQueues: Map[String,LinkedBlockingQueue[Double]] = Map(
      "listener-summer" -> new LinkedBlockingQueue(),
      "listener-plotter" -> new LinkedBlockingQueue(),
      "listener-recorder" -> new LinkedBlockingQueue()
    )

    //Creating threads
    val listenerThread = new Thread(new ListenerClass(sharedQueues("listener-summer"),sharedQueues("listener-plotter"),sharedQueues("listener-recorder")))
    val summerThread = new Thread(new SummerClass(sharedQueues("listener-summer")))
    val plotterThread = new Thread(new RealTimeAudioPlotterClass(sharedQueues("listener-plotter")))
    val recorderThread = new Thread(new RecorderClass(sharedQueues("listener-recorder")))

    //Starting threads
    listenerThread.start()
    summerThread.start()
    plotterThread.start()
    recorderThread.start()
  }

} 
开发者ID:ahamshubham,项目名称:scala-speech,代码行数:34,代码来源:Master.scala


示例12: DataStreamPublisher

//设置package包名称以及导入依赖的类
package io.eels.datastream

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicMarkableReference, AtomicReference}

import com.sksamuel.exts.collection.BlockingQueueConcurrentIterator
import io.eels.Row
import io.eels.schema.StructType


class DataStreamPublisher(override val schema: StructType) extends DataStream {

  private val queue = new LinkedBlockingQueue[Seq[Row]]
  private val running = new AtomicBoolean(true)
  private val failure = new AtomicReference[Throwable](null)

  def isCancelled: Boolean = !running.get

  override def subscribe(subscriber: Subscriber[Seq[Row]]): Unit = {
    try {
      subscriber.subscribed(new Subscription {
        override def cancel(): Unit = {
          queue.clear()
          queue.put(Row.Sentinel)
          running.set(false)
        }
      })
      BlockingQueueConcurrentIterator(queue, Row.Sentinel).takeWhile(_ => running.get).foreach(subscriber.next)
      failure.get match {
        case t: Throwable => subscriber.error(t)
        case _ => subscriber.completed()
      }
    } catch {
      case t: Throwable => subscriber.error(t)
    }
  }

  def publish(row: Seq[Row]): Unit = queue.put(row)
  def error(t: Throwable): Unit = {
    failure.set(t)
    queue.clear()
    queue.add(Row.Sentinel)
  }
  def close(): Unit = queue.add(Row.Sentinel)
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:46,代码来源:DataStreamPublisher.scala


示例13: start

//设置package包名称以及导入依赖的类
package com.github.zabbicook.api

import java.time.Duration
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

sealed trait Throttle {
  def start(): Unit
  def end(): Unit
}

object NonStrictThrottle extends Throttle {
  // do nothing
  def start(): Unit = {}
  def end(): Unit = {}
}

class StrictThrottle(
  concurrency: Int,
  timeout: Duration,
  startInterval: Duration
) extends Throttle {
  private[this] val queue = new LinkedBlockingQueue[Boolean](concurrency)
  queue.offer(true)

  // blocking
  def start(): Unit = {
    queue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)
    if (startInterval != Duration.ZERO) {
      Thread.sleep(startInterval.toMillis)
    }
  }

  def end(): Unit = {
    queue.offer(true)
  }
} 
开发者ID:rerorero,项目名称:zabbicook,代码行数:37,代码来源:Throttle.scala


示例14: ARouter

//设置package包名称以及导入依赖的类
package cas.service

import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.pattern.pipe
import cas.analysis.subject.Subject
import cas.persistence.searching.ElasticSearch
import cas.utils.RemoteLogger
import cas.utils.UtilAliases._
import cas.web.dealers.vk.VkApiProtocol.VkPost

import scala.collection.mutable
import scala.collection.mutable.Queue

object ARouter {
  case object PullSubjects
  case class PulledSubjects(subjs: Subjects)
  case class PushingEstimations(estims: Estimations)

  case class Estimation(subj: Subject, actuality: Double)
}

class ARouter(producer: ActorRef) extends Actor with ActorLogging {
  import ARouter._

  val pulledSubjs = new LinkedBlockingQueue[PulledSubjects]()
  val waitingWorkers = new LinkedBlockingQueue[ActorRef]()

  override def preStart = {
    super.preStart()
    
      // log.info("Subjects pulled: " + chunk)
      if (waitingWorkers.isEmpty) pulledSubjs.offer(PulledSubjects(chunk), 1000, TimeUnit.MILLISECONDS)
      else {
        waitingWorkers.poll(1000, TimeUnit.MILLISECONDS) ! PulledSubjects(chunk)
        producer ! PullSubjects
      }
    }

    case estims: PushingEstimations => {
      // RemoteLogger.info("PushingEstimations: `" + estims.estims.mkString + "`")
      // log.info("Estimations computed: " + estims.estims)
      producer forward estims
    }
  }
} 
开发者ID:kell18,项目名称:CAS,代码行数:48,代码来源:ARouter.scala


示例15: ThreadPool

//设置package包名称以及导入依赖的类
package command

import java.util.concurrent.BlockingQueue

import java.util.concurrent.LinkedBlockingQueue

class ThreadPool(n: Int) {

  private val jobQueue: BlockingQueue[Job] = new LinkedBlockingQueue()

  private val jobThreads = new Array[Thread](n)

  @volatile private var shutdown: Boolean = _

  for (i <- 0 until n) {
    jobThreads(i) = new Worker("Pool Thread " + i)
    jobThreads(i).start()
  }

  def addJob(r: Job): Unit = {
    try jobQueue.put(r)
    catch {
      case e: InterruptedException => Thread.currentThread().interrupt()
    }
  }

  def shutdownPool: Unit = {
    while (!jobQueue.isEmpty) 
    try {
        Thread.sleep(1000)
    } catch {
      case e: InterruptedException => e.printStackTrace()
    }
    shutdown = true
    for (workerThread <- jobThreads) {
      workerThread.interrupt()
    }
  }

  private class Worker(name: String) extends Thread(name) {

    override def run = {
      while (!shutdown) 
      try {
        val r: Job = jobQueue.take()
        r.run
      } catch {
        case e: InterruptedException => {}
      }
    }
  }
} 
开发者ID:BBK-PiJ-2015-67,项目名称:sdp-portfolio,代码行数:53,代码来源:ThreadPool.scala


示例16: ScoreThread

//设置package包名称以及导入依赖的类
package io.aigar.score

import com.typesafe.scalalogging.LazyLogging
import io.aigar.model.PlayerRepository
import java.util.concurrent.LinkedBlockingQueue



class ScoreThread(playerRepository: PlayerRepository) extends Runnable
                                                      with LazyLogging {
  final val modificationQueue = new LinkedBlockingQueue[ScoreModification]
  var running: Boolean = true;

  def run: Unit = {
    while (running) {
      saveScore
    }
  }

  def addScoreModification(modification: ScoreModification): Unit = {
    modificationQueue.add(modification)
  }

  def saveScore: Unit = {
    val modification = modificationQueue.take

    logger.debug(s"Player ${modification.player_id} gained ${modification.value} points.")
    playerRepository.addScore(modification.player_id, modification.value)
  }
} 
开发者ID:DrPandemic,项目名称:aigar.io,代码行数:31,代码来源:ScoreThread.scala


示例17: Test

//设置package包名称以及导入依赖的类
package teleporter.stream.integration.transaction

import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}


object Test extends App {
  val pool = new ThreadPoolExecutor(2, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable](1))
  for (i ? 1 to 10) {
    pool.execute(new Runnable {
      override def run(): Unit = {
        while (true) {
          println(s"${Thread.currentThread().getName}: $i")
          Thread.sleep(2000)
        }
      }
    })
  }
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:19,代码来源:Test.scala


示例18: AsyncProcessor

//设置package包名称以及导入依赖的类
package alexsmirnov.stream

import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import org.reactivestreams.Processor
import java.util.concurrent.LinkedBlockingQueue

  class AsyncProcessor[A](queueSize: Int = 100) extends Processor[A, A] with PublisherBase[A] with SubscriberBase[A] {

    private[this] val executor = Executors.unconfigurableExecutorService(
      new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue[Runnable](queueSize + 1),
        streamsThreadFactory))

    def onStart() { request(queueSize) }
    def onStop() { cancel() }
    def onNext(a: A) {
      executor.submit(new Runnable { def run() { sendNext(a); request(1L) } })
    }
    def onComplete() = {
      executor.submit(new Runnable { def run() { sendComplete() } })
    }
    def onError(t: Throwable) {
      stopProducer()
      sendError(t)
    }
  } 
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:30,代码来源:AsyncProcessor.scala



注:本文中的java.util.concurrent.LinkedBlockingQueue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala DecimalFormat类代码示例发布时间:2022-05-23
下一篇:
Scala ConcurrentLinkedQueue类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap