• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ArrayBlockingQueue类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中java.util.concurrent.ArrayBlockingQueue的典型用法代码示例。如果您正苦于以下问题:Scala ArrayBlockingQueue类的具体用法?Scala ArrayBlockingQueue怎么用?Scala ArrayBlockingQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ArrayBlockingQueue类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: XferQClient

//设置package包名称以及导入依赖的类
package org.openchai.tcp.xfer

import org.openchai.tcp.util.TcpCommon

import java.util.concurrent.ArrayBlockingQueue

import org.openchai.tcp.util.Logger._
import org.openchai.tcp.xfer.XferConClient._

import reflect.runtime.universe.TypeTag

class XferQClient[T: TypeTag](val queue: ArrayBlockingQueue[TypedEntry[T]],
  val controllers: XferControllers) {

  def offer(t: TypedEntry[T]) =  queue.offer(t)

  val qclient = new Thread() {

    var canceled = false

    override def run(): Unit = {
      while (!canceled) {
        if (queue.peek() == null) {
          debug("XferQClient:  QReader is waiting ..")
        } else {
          debug("XferQClient:  QReader found an entry immediately")
        }
        val payload = queue.take
        debug(s"XferQClient: payload is $payload")
        val wparams = XferWriteParams(payload.tag, controllers.xferConf,
          TcpCommon.serializeObject(s"/tmp/qread.${payload.tag}",payload.t))
        val wres = controllers.client.write(controllers.xferConf, wparams)
        info(s"Xferqclient: QReader completed with resp=$wres")
      }
    }
  }
  qclient.start
}

case class Howdy(name: String, metric: Int)

object XferQClient {
  import org.openchai.tcp.xfer.XferConCommon._
  type ComplexTuple = (Array[String], Array[Byte])
  def main(args: Array[String]): Unit = {
    val nEntries = 20
    val q = new ArrayBlockingQueue[TypedEntry[ComplexTuple]](5)
    val controllers = makeXferControllers(TestControllers)
    val client = new XferQClient[ComplexTuple](q,controllers)
    val entries = for (i <- 1 until nEntries) yield {
      (Array("a", "b", "c"), TcpCommon.serializeObject(s"/tmp/qwrite.$i", Howdy(s"Hi${i}!", i*i)))
    }

  }
} 
开发者ID:OpenChaiSpark,项目名称:OCspark,代码行数:56,代码来源:XferQClient.scala


示例2: XferServerIf

//设置package包名称以及导入依赖的类
package org.openchai.tcp.xfer

import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}

import org.openchai.tcp.rpc._

abstract class XferServerIf extends ServerIf("XferServerIf")

object XferServer {

  var server: TcpServer = _

  def apply(tcpParams: TcpParams) = {
//    if (System.currentTimeMillis > 0) {
//      throw new IllegalStateException("Why in apply(NioServer) ?")
//    }
    server = TcpServer(tcpParams.server, tcpParams.port,
      new NioXferServerIf(tcpParams))
    server
  }

  def apply(q: BlockingQueue[TaggedEntry], tcpParams: TcpParams) = {
    server = TcpServer(tcpParams.server, tcpParams.port,
      new QXferServerIf(q, tcpParams))
    server
  }

  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
//    val server = apply(TcpParams(host, port))
    val q = new ArrayBlockingQueue[TaggedEntry](1000)
    val qreader = new Thread() {
      override def run(): Unit = {
        println("QReader thread started")
        while (true) {
          val v = q.take
          println(s"QReader: received $v")
        }
      }
    }
    qreader.start
    val server = apply(q, TcpParams(host, port))
    server.start
    Thread.currentThread.join
  }
} 
开发者ID:OpenChaiSpark,项目名称:OCspark,代码行数:48,代码来源:XferServer.scala


示例3: ConnectionParams

//设置package包名称以及导入依赖的类
package repos.jdbc.watcher

import java.util
import java.util.concurrent.ArrayBlockingQueue

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.event.deserialization._
import com.github.shyiko.mysql.binlog.event.{EventType, TableMapEventData}


case class ConnectionParams(
  hostname: String,
  port: Int,
  username: String,
  password: String,
  position: Option[BinLogPosition])

object RepoWatcher {

  
  def asSource(params: ConnectionParams, bufferSize: Int = 1000): Source[RepoInsertEvent, NotUsed] = {
    val client = createClient(params)
    val q = new ArrayBlockingQueue[RepoInsertEvent](bufferSize)
    client.registerEventListener(new RepoInsertListener(q.put))

    val source: Source[RepoInsertEvent, NotUsed] = Source.unfoldResource(
      create = () => (),
      read = (_: Unit) => {
        // Blocks until something is available in our queue.
        Some(q.take())
      },
      close = (_: Unit) => {
        // when the stream is done, disconnects the client. This causes the thread to sleep.
        client.disconnect()
      }
    )

    new Thread("repo-binlog-event-loop") {
      override def run(): Unit = {
        client.connect()
      }
    }.start()

    source
  }
} 
开发者ID:trueaccord,项目名称:repos,代码行数:49,代码来源:RepoWatcher.scala


示例4: InputStreamingChannelInitializer

//设置package包名称以及导入依赖的类
package com.bwsw.sj.engine.input.connection.tcp.server

import java.util.concurrent.ArrayBlockingQueue

import com.bwsw.sj.engine.core.input.InputStreamingExecutor
import io.netty.buffer.ByteBuf
import io.netty.channel.socket.SocketChannel
import io.netty.channel.{ChannelHandlerContext, ChannelInitializer}
import io.netty.handler.codec.string.StringEncoder
import io.netty.handler.logging.{LogLevel, LoggingHandler}

import scala.collection.concurrent


class InputStreamingChannelInitializer(executor: InputStreamingExecutor,
                                       channelContextQueue: ArrayBlockingQueue[ChannelHandlerContext],
                                       bufferForEachContext: concurrent.Map[ChannelHandlerContext, ByteBuf])
  extends ChannelInitializer[SocketChannel] {

  def initChannel(channel: SocketChannel) = {
    val pipeline = channel.pipeline()

    pipeline.addLast("logger", new LoggingHandler(LogLevel.WARN))
    pipeline.addLast("encoder", new StringEncoder())
    pipeline.addLast("handler", new InputStreamingServerHandler(executor, channelContextQueue, bufferForEachContext))
  }
} 
开发者ID:bwsw,项目名称:sj-platform,代码行数:28,代码来源:InputStreamingChannelInitializer.scala


示例5: InputStreamingServer

//设置package包名称以及导入依赖的类
package com.bwsw.sj.engine.input.connection.tcp.server

import java.util.concurrent.{Callable, ArrayBlockingQueue}

import com.bwsw.sj.engine.core.input.InputStreamingExecutor
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.{LogLevel, LoggingHandler}
import org.slf4j.LoggerFactory

import scala.collection.concurrent



class InputStreamingServer(host: String,
                           port: Int,
                           executor: InputStreamingExecutor,
                           channelContextQueue: ArrayBlockingQueue[ChannelHandlerContext],
                           bufferForEachContext: concurrent.Map[ChannelHandlerContext, ByteBuf]) extends Callable[Unit] {

  private val logger = LoggerFactory.getLogger(this.getClass)

  override def call() = {
    logger.info(s"Launch input streaming server on: '$host:$port'\n")
    val bossGroup: EventLoopGroup = new NioEventLoopGroup()
    val workerGroup = new NioEventLoopGroup()
    try {
      val bootstrapServer = new ServerBootstrap()
      bootstrapServer.group(bossGroup, workerGroup)
        .channel(classOf[NioServerSocketChannel])
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new InputStreamingChannelInitializer(executor,  channelContextQueue, bufferForEachContext))

      bootstrapServer.bind(host, port).sync().channel().closeFuture().sync()
    } finally {
      workerGroup.shutdownGracefully()
      bossGroup.shutdownGracefully()
    }
  }
} 
开发者ID:bwsw,项目名称:sj-platform,代码行数:44,代码来源:InputStreamingServer.scala


示例6: Message

//设置package包名称以及导入依赖的类
package com.twitter.diffy.lifter

import com.twitter.concurrent.NamedPoolThreadFactory
import com.twitter.util.{ExecutorServiceFuturePool, Future, FuturePool}
import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}

case class Message(endpoint: Option[String], result: FieldMap[Any])

trait MapLifter {
  def apply(input: Array[Byte]): Future[Message]
}

object MapLifterPool {
  val QueueSizeDefault = 5

  def apply(mapLifterFactory: => MapLifter) = {
    val executorService =
      new ThreadPoolExecutor(
        3,   // core pool size
        10,  // max pool size
        500, // keep alive time
        TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue[Runnable](10), // work queue
        new NamedPoolThreadFactory("maplifter", makeDaemons = true),
        new ThreadPoolExecutor.AbortPolicy()
      )
    executorService.prestartCoreThread()
    new MapLifterPool(mapLifterFactory, new ExecutorServiceFuturePool(executorService))
  }
}

class MapLifterPool(underlying: MapLifter, futurePool: FuturePool) extends MapLifter {
  override def apply(input: Array[Byte]): Future[Message] =
    (futurePool { underlying(input) }).flatten
} 
开发者ID:sachinmanchanda,项目名称:diffy_unicast,代码行数:36,代码来源:MapLifter.scala



注:本文中的java.util.concurrent.ArrayBlockingQueue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala parse类代码示例发布时间:2022-05-23
下一篇:
Scala SecretKeySpec类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap