• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala BlockingQueue类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中java.util.concurrent.BlockingQueue的典型用法代码示例。如果您正苦于以下问题:Scala BlockingQueue类的具体用法?Scala BlockingQueue怎么用?Scala BlockingQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BlockingQueue类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Consumer

//设置package包名称以及导入依赖的类
package com.softwaremill.modemconnector

import java.util.concurrent.BlockingQueue

import com.typesafe.scalalogging.LazyLogging

abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable with LazyLogging {

  override def run(): Unit = {
    logger.info("Starting Consumer....")
    try {
      while (!Thread.currentThread.isInterrupted) {
        val sample = queue.take()
        consume(sample)
      }
    } catch {
      case ie: InterruptedException => logger.info("Consumer stopped with interrupt")
    }
  }

  def consume(sample: T): Unit

} 
开发者ID:softwaremill,项目名称:modem-connector,代码行数:24,代码来源:Consumer.scala


示例2: AGWPEFrameConsumer

//设置package包名称以及导入依赖的类
package com.softwaremill.modemconnector.agwpe

import java.util.concurrent.BlockingQueue

import com.softwaremill.modemconnector.ax25.AX25Frame
import com.softwaremill.modemconnector.{Consumer, Descrambler, Subject}

class AGWPEFrameConsumer(queue: BlockingQueue[AGWPEFrame], descrambler: Option[Descrambler] = None) extends Consumer(queue) with Subject[AX25Frame] {

  override def consume(agwpe: AGWPEFrame): Unit = {
    val ax25frame: AX25Frame = prepareAX25Frame(agwpe)
    notifyObservers(ax25frame)
  }

  private def prepareAX25Frame(agwpe: AGWPEFrame): AX25Frame = {
    descrambler match {
      case Some(ds) => AX25Frame(ds.descramble(agwpe.data.get))
      case None => AX25Frame(agwpe.data.get)
    }
  }
} 
开发者ID:softwaremill,项目名称:modem-connector,代码行数:22,代码来源:AGWPEFrameConsumer.scala


示例3: ReqRealTimeBarsHandler

//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers

import java.util.concurrent.CountDownLatch
import scalaz._, Scalaz._
import scalaz.concurrent._
import messages._
import contract._
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import com.ib.client.EClientSocket
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logging

class ReqRealTimeBarsHandler(security: Stock  ,
                             ibActor: Actor[FibsPromiseMessage \/ IBMessage],
                             tickerId: Int, socket: EClientSocketLike) extends FibsPromise[CloseableStream[RealTimeBar]] with Logging { 
  private[this] val TickerId = tickerId
  val latch = new CountDownLatch(0) // don't need to block
  val actor = Actor[IBMessage] {
    case RealTimeBarResp(TickerId, time, open, high, low, close, volume, count, wap) ? 
      queue.add(RealTimeBar(new DateTime(time * 1000), open, high, low, close, volume, count, wap).some)
    case _ ? ???
  }
  val barHandler: PartialFunction[IBMessage, Unit] = {
    case [email protected](TickerId, time, open, high, low, close, volume, count, wap) ? actor ! m
  }
  val patterns = List(barHandler)
  private[this] def toStream: EphemeralStream[RealTimeBar] = {
    val ret: EphemeralStream[RealTimeBar] = queue.take match {
      case Some(d) ? EphemeralStream.cons(d, toStream)
      case None    ? EphemeralStream.emptyEphemeralStream
    }
    ret
  }
  private[this] val queue: BlockingQueue[Option[RealTimeBar]] =
    new LinkedBlockingQueue[Option[RealTimeBar]]()
  private[this] def closeStream = {
    queue add None
    socket.cancelRealTimeBars(tickerId)
    ibActor ! UnregisterFibsPromise(this).left
  }
  def get = new CloseableStream[RealTimeBar] {
    def close = closeStream 
    lazy val as = toStream
  }
} 
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:49,代码来源:ReqRealTimeBarsHandler.scala


示例4: ReqHistoricalDataHandler

//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }

import scalaz._, Scalaz._
import scalaz.concurrent._
import com.github.nscala_time.time.Imports._

import messages._
import contract._

class ReqHistoricalDataHandler(security: Stock  ,
  ibActor: Actor[FibsPromiseMessage \/ IBMessage],
  tickerId: Int) extends FibsPromise[Stream[HistoricalDataPeriod]] {
  private[this] val TickerId = tickerId
  val actor = Actor[IBMessage]{
    case d @ HistoricalData(TickerId, time, _, _, _, _, _, _, _, _) if (!time.startsWith("finished-")) =>
      enqueue(transformMsg(d))
    case d @ HistoricalData(TickerId, time, _, _, _, _, _, _, _, _) if (time.startsWith("finished-")) =>
      close
    case d @ HistoricalDataError(TickerId, 162, msg) if (msg contains "HMDS query returned no data") =>
      close
    case _ => ???
  }
  val historicalDataHandler: PartialFunction[IBMessage, Unit] = {
    case d @ HistoricalData(TickerId, _, _, _, _, _, _, _, _, _) => actor ! d
    case d @ HistoricalDataError(TickerId, 162, msg) if (msg contains "HMDS query returned no data") =>
      actor ! d
  }
  val patterns = List(historicalDataHandler)
  val latch = new CountDownLatch(0) // don't need to block
  def get = toStream
  private[this] def transformMsg(i: HistoricalData) = 
    HistoricalDataPeriod(
      new DateTime(i.date.parseLong.toOption.getOrElse(0L) * 1000),
      i.open,
      i.high,
      i.low,
      i.close,
      i.volume,
      i.count,
      i.wap,
      i.hasGaps)
  private[this] val queue: BlockingQueue[Option[HistoricalDataPeriod]] =
    new LinkedBlockingQueue[Option[HistoricalDataPeriod]]()
  private[this] def close = {
    queue add None
    ibActor ! UnregisterFibsPromise(this).left
  }
  private[this] def enqueue(d: HistoricalDataPeriod) = queue add d.some
  private[this] def toStream(): Stream[HistoricalDataPeriod] = queue.take match {
    case Some(d) => Stream.cons(d, toStream)
    case None => Stream.empty
  }
} 
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:60,代码来源:ReqHistoricalDataHandler.scala


示例5: ReqMarketTickDataStreamHandler

//设置package包名称以及导入依赖的类
package name.kaeding.fibs
package ib
package impl
package handlers

import java.util.concurrent.CountDownLatch
import scalaz._, Scalaz._
import scalaz.concurrent._
import messages._
import contract._
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue }
import com.ib.client.EClientSocket
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logging

class ReqMarketTickDataStreamHandler(security: Stock  ,
                                 ibActor: Actor[FibsPromiseMessage \/ IBMessage],
                                 tickerId: Int, socket: EClientSocketLike) extends FibsPromise[CloseableStream[MarketTickDataResult]] with Logging {
  private[this] val TickerId = tickerId
  val latch = new CountDownLatch(0) // don't need to block
  private[this] val RTVolumePattern = "(\\d+\\.?\\d*);(\\d+);(\\d+);(\\d+);(\\d+\\.?\\d*);(true|false)".r
  val actor = Actor[IBMessage] {
    case TickString(TickerId, RTVolume, v) ? 
      parseInput(v).cata(some = t => queue.add(t.some),
                         none = warn(s"error parsing tick data: $v"))
    case _ ? ???
  }
  def parseInput(s: String) = s match {
    case RTVolumePattern(p, s, t, v, w, f) => 
      (p.parseDouble.toOption |@|
       s.parseInt.toOption |@|
       v.parseInt.toOption |@| 
       t.parseLong.toOption |@|
       w.parseDouble.toOption |@|
       f.parseBoolean.toOption)(MarketTickDataResult.apply)
    case _ => none
  }
  val stringHandler: PartialFunction[IBMessage, Unit] = {
    case [email protected](tickerId, RTVolume, _) ? actor ! m
  }
  val patterns = List(stringHandler)
  private[this] val queue: BlockingQueue[Option[MarketTickDataResult]] =
    new LinkedBlockingQueue[Option[MarketTickDataResult]]()
  private[this] def closeStream = {
    queue add None
    socket.cancelMktData(TickerId)
    ibActor ! UnregisterFibsPromise(this).left
  }
  private[this] def toStream: EphemeralStream[MarketTickDataResult] = {
    val ret: EphemeralStream[MarketTickDataResult] = queue.take match {
      case Some(d) ? EphemeralStream.cons(d, toStream)
      case None    ? EphemeralStream.emptyEphemeralStream
    }
    ret
  }
  def get = new CloseableStream[MarketTickDataResult] {
    def close = closeStream 
    lazy val as = toStream
  }
} 
开发者ID:carrot-garden,项目名称:vendor_ibrk_fibs-scala,代码行数:61,代码来源:ReqMarketTickDataStreamHandler.scala


示例6: QTestParams

//设置package包名称以及导入依赖的类
package org.openchai.tcp.xfer

import java.nio.file.{Files, Paths}
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}

import org.openchai.tcp.rpc.{P2pReq, P2pResp, TcpParams, TcpServer}
import org.openchai.tcp.util.{FileUtils, TcpCommon}

case class QTestParams(master: String, cHost: String, cPort: Int, sHost: String, sPort: Int)

class XferQServerIf(outQ: BlockingQueue[TaggedEntry], tcpParams: TcpParams, qServerIf: QXferServerIf)
  extends XferConServerIf(tcpParams) {
  println("Created XferQServerIf")

  override def consume(config: TcpXferConfig) = {
    val payload = super.defaultConsume(config).asInstanceOf[TaggedEntry]
    println(s"Consuming message of length ${payload.toString.length}")
    val res = outQ.offer(payload)
    res
  }

}


class QXferServerIf(q: BlockingQueue[TaggedEntry], tcpParams: TcpParams) extends XferServerIf {

  def writeQ(path: DataPtr, data: TaggedEntry) = {
//    val _md5 = md5(buf.array.slice(0,buf.position))
//    println(s"writeq: data with tag=${data.tag} len=${data.data.length}")
    q.offer(data)
//    println(s"After writeQ: qsize=${q.size}")
  }

  override def service(req: P2pReq[_]): P2pResp[_] = {
    req match {
      case o: XferWriteReq =>
        val params = o.value.asInstanceOf[XferWriteParams]
//        FileUtils.checkMd5(params.config.finalPath, params.data, params.md5)
        println(s"XferWriteReq! datalen=${params.data.length}")
//        FileUtils.checkMd5(path, FileUtils.md5(data), md5In)
        val start = System.currentTimeMillis
        val len = writeQ(params.config.finalPath, TaggedEntry(params.tag, params.data))
        val elapsed = System.currentTimeMillis - start
        XferWriteResp("abc", params.data.length, elapsed, Array.empty[Byte])
      case _ => throw new IllegalArgumentException(s"Unknown service type ${req.getClass.getName}")

    }
  }

} 
开发者ID:OpenChaiSpark,项目名称:OCspark,代码行数:51,代码来源:QXferServer.scala


示例7: XferServerIf

//设置package包名称以及导入依赖的类
package org.openchai.tcp.xfer

import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}

import org.openchai.tcp.rpc._

abstract class XferServerIf extends ServerIf("XferServerIf")

object XferServer {

  var server: TcpServer = _

  def apply(tcpParams: TcpParams) = {
//    if (System.currentTimeMillis > 0) {
//      throw new IllegalStateException("Why in apply(NioServer) ?")
//    }
    server = TcpServer(tcpParams.server, tcpParams.port,
      new NioXferServerIf(tcpParams))
    server
  }

  def apply(q: BlockingQueue[TaggedEntry], tcpParams: TcpParams) = {
    server = TcpServer(tcpParams.server, tcpParams.port,
      new QXferServerIf(q, tcpParams))
    server
  }

  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
//    val server = apply(TcpParams(host, port))
    val q = new ArrayBlockingQueue[TaggedEntry](1000)
    val qreader = new Thread() {
      override def run(): Unit = {
        println("QReader thread started")
        while (true) {
          val v = q.take
          println(s"QReader: received $v")
        }
      }
    }
    qreader.start
    val server = apply(q, TcpParams(host, port))
    server.start
    Thread.currentThread.join
  }
} 
开发者ID:OpenChaiSpark,项目名称:OCspark,代码行数:48,代码来源:XferServer.scala


示例8: cancel

//设置package包名称以及导入依赖的类
package com.github.madoc.runsbt.running

import java.io.File
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

import com.github.madoc.runsbt.events.{SBTEvent, SBTEventParser}

import scala.sys.process._

trait SBTProcess {
  def cancel()
  lazy val events:Stream[SBTEvent] = SBTEventParser(outputLines)
  def exitValue:Int
  def outputLines:Stream[String]
}
object SBTProcess {
  case class BasedOnProcessBuilder(builder:ProcessBuilder, workingDirectory:File, command:SBTCommand) extends SBTProcess {
    def exitValue = process exitValue
    def cancel() {process destroy(); buffer put EOS}
    lazy val outputLines:Stream[String] = nextLogLine()

    private sealed trait Element
    private sealed case class Line(string:String) extends Element
    private object EOS extends Element

    private val buffer:BlockingQueue[Element] = new LinkedBlockingQueue[Element]()
    private val process:Process = builder.run(ProcessLogger(str ? buffer put Line(str)))

    private def nextLogLine():Stream[String] = buffer take match {
      case EOS ? buffer put EOS; Stream.empty
      case Line(string) ? Stream cons (cleanUp(string), nextLogLine())
    }

    private def cleanUp(str:String):String = str replaceAll ("\\e\\[[\\d;]*[^\\d;]", "")

    locally {new Thread() {
      override def run() {process.exitValue(); buffer put EOS}
    }.start()}

    override def toString:String = s"SBTProcess(directory=$workingDirectory, command=$command, process=$builder)"
  }
} 
开发者ID:Madoc,项目名称:run-sbt,代码行数:43,代码来源:SBTProcess.scala


示例9: ObserveBlockingQueue

//设置package包名称以及导入依赖的类
package nl.knaw.dans.experiments.hazelcast

import java.util.concurrent.{BlockingQueue, TimeUnit}

import com.hazelcast.core.HazelcastInstanceNotActiveException
import rx.lang.scala.{Observable, Scheduler}
import rx.lang.scala.schedulers.NewThreadScheduler
import rx.lang.scala.subscriptions.CompositeSubscription

import scala.concurrent.duration.Duration

package object queue {

  implicit class ObserveBlockingQueue[T](val queue: BlockingQueue[T]) extends AnyVal {
    def observe(timeout: Duration, scheduler: Scheduler = NewThreadScheduler())(running: () => Boolean): Observable[T] = {
      Observable(subscriber => {
        val worker = scheduler.createWorker
        val subscription = worker.scheduleRec {
          try {
            if (running()) {
              val t = Option(queue.poll(timeout.toMillis, TimeUnit.MILLISECONDS))
              println(s"received: $t")
              t.foreach(subscriber.onNext)
            }
            else subscriber.onCompleted()
          }
          catch {
            case e: HazelcastInstanceNotActiveException => subscriber.onCompleted()
            case e: Throwable => println(s"  caught ${e.getClass.getSimpleName}: ${e.getMessage}"); subscriber.onError(e)
          }
        }

        subscriber.add(CompositeSubscription(subscription, worker))
      })
    }
  }
} 
开发者ID:rvanheest-DANS-KNAW,项目名称:Hazelcast-experiments,代码行数:38,代码来源:package.scala


示例10: ParItr

//设置package包名称以及导入依赖的类
package fshgen

import scala.concurrent.{ ExecutionContext, future, Future, Await }
import scala.concurrent.duration.Duration.Inf
import java.util.concurrent.{ BlockingQueue, Executors, ArrayBlockingQueue }

object ParItr {

  def map[A, B](i: Iterator[A])(f: A => B)(implicit execctx: ExecutionContext): Iterator[B] = {
    val cpus = Runtime.getRuntime().availableProcessors() + 1
//    val queue: BlockingQueue[Option[Future[B]]] = new ArrayBlockingQueue(cpus * cpus)
    val queue: BlockingQueue[Option[Future[B]]] = new ArrayBlockingQueue(1 + 16 * cpus)
    Future {
      try i.foreach(x => queue.put(Some(Future(f(x)))))
      finally queue.put(None) // poison
    }
    new Iterator[B] {

      private[this] var fopt: Option[Future[B]] = None
      private[this] var alive = true

      override def next() =
        if (hasNext) { val v = Await.result(fopt.get, Inf); fopt = None; v }
        else Iterator.empty.next()

      override def hasNext: Boolean = alive && take().isDefined

      private def take(): Option[Future[B]] = {
        if (fopt.isEmpty) {
          fopt = queue.take() match {
            case None => { alive = false; None }
            case some => some
          }
        }
        fopt
      }

    }
  }
} 
开发者ID:memo33,项目名称:fshgen,代码行数:41,代码来源:ParItr.scala


示例11: ThreadPool

//设置package包名称以及导入依赖的类
package command

import java.util.concurrent.BlockingQueue

import java.util.concurrent.LinkedBlockingQueue

class ThreadPool(n: Int) {

  private val jobQueue: BlockingQueue[Job] = new LinkedBlockingQueue()

  private val jobThreads = new Array[Thread](n)

  @volatile private var shutdown: Boolean = _

  for (i <- 0 until n) {
    jobThreads(i) = new Worker("Pool Thread " + i)
    jobThreads(i).start()
  }

  def addJob(r: Job): Unit = {
    try jobQueue.put(r)
    catch {
      case e: InterruptedException => Thread.currentThread().interrupt()
    }
  }

  def shutdownPool: Unit = {
    while (!jobQueue.isEmpty) 
    try {
        Thread.sleep(1000)
    } catch {
      case e: InterruptedException => e.printStackTrace()
    }
    shutdown = true
    for (workerThread <- jobThreads) {
      workerThread.interrupt()
    }
  }

  private class Worker(name: String) extends Thread(name) {

    override def run = {
      while (!shutdown) 
      try {
        val r: Job = jobQueue.take()
        r.run
      } catch {
        case e: InterruptedException => {}
      }
    }
  }
} 
开发者ID:BBK-PiJ-2015-67,项目名称:sdp-portfolio,代码行数:53,代码来源:ThreadPool.scala



注:本文中的java.util.concurrent.BlockingQueue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Sorting类代码示例发布时间:2022-05-23
下一篇:
Scala Outcome类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap