本文整理汇总了Scala中java.util.concurrent.ArrayBlockingQueue类的典型用法代码示例。如果您正苦于以下问题:Scala ArrayBlockingQueue类的具体用法?Scala ArrayBlockingQueue怎么用?Scala ArrayBlockingQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ArrayBlockingQueue类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: XferQClient
//设置package包名称以及导入依赖的类
package org.openchai.tcp.xfer
import org.openchai.tcp.util.TcpCommon
import java.util.concurrent.ArrayBlockingQueue
import org.openchai.tcp.util.Logger._
import org.openchai.tcp.xfer.XferConClient._
import reflect.runtime.universe.TypeTag
class XferQClient[T: TypeTag](val queue: ArrayBlockingQueue[TypedEntry[T]],
val controllers: XferControllers) {
def offer(t: TypedEntry[T]) = queue.offer(t)
val qclient = new Thread() {
var canceled = false
override def run(): Unit = {
while (!canceled) {
if (queue.peek() == null) {
debug("XferQClient: QReader is waiting ..")
} else {
debug("XferQClient: QReader found an entry immediately")
}
val payload = queue.take
debug(s"XferQClient: payload is $payload")
val wparams = XferWriteParams(payload.tag, controllers.xferConf,
TcpCommon.serializeObject(s"/tmp/qread.${payload.tag}",payload.t))
val wres = controllers.client.write(controllers.xferConf, wparams)
info(s"Xferqclient: QReader completed with resp=$wres")
}
}
}
qclient.start
}
case class Howdy(name: String, metric: Int)
object XferQClient {
import org.openchai.tcp.xfer.XferConCommon._
type ComplexTuple = (Array[String], Array[Byte])
def main(args: Array[String]): Unit = {
val nEntries = 20
val q = new ArrayBlockingQueue[TypedEntry[ComplexTuple]](5)
val controllers = makeXferControllers(TestControllers)
val client = new XferQClient[ComplexTuple](q,controllers)
val entries = for (i <- 1 until nEntries) yield {
(Array("a", "b", "c"), TcpCommon.serializeObject(s"/tmp/qwrite.$i", Howdy(s"Hi${i}!", i*i)))
}
}
}
开发者ID:OpenChaiSpark,项目名称:OCspark,代码行数:56,代码来源:XferQClient.scala
示例2: XferServerIf
//设置package包名称以及导入依赖的类
package org.openchai.tcp.xfer
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
import org.openchai.tcp.rpc._
abstract class XferServerIf extends ServerIf("XferServerIf")
object XferServer {
var server: TcpServer = _
def apply(tcpParams: TcpParams) = {
// if (System.currentTimeMillis > 0) {
// throw new IllegalStateException("Why in apply(NioServer) ?")
// }
server = TcpServer(tcpParams.server, tcpParams.port,
new NioXferServerIf(tcpParams))
server
}
def apply(q: BlockingQueue[TaggedEntry], tcpParams: TcpParams) = {
server = TcpServer(tcpParams.server, tcpParams.port,
new QXferServerIf(q, tcpParams))
server
}
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).toInt
// val server = apply(TcpParams(host, port))
val q = new ArrayBlockingQueue[TaggedEntry](1000)
val qreader = new Thread() {
override def run(): Unit = {
println("QReader thread started")
while (true) {
val v = q.take
println(s"QReader: received $v")
}
}
}
qreader.start
val server = apply(q, TcpParams(host, port))
server.start
Thread.currentThread.join
}
}
开发者ID:OpenChaiSpark,项目名称:OCspark,代码行数:48,代码来源:XferServer.scala
示例3: ConnectionParams
//设置package包名称以及导入依赖的类
package repos.jdbc.watcher
import java.util
import java.util.concurrent.ArrayBlockingQueue
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.event.deserialization._
import com.github.shyiko.mysql.binlog.event.{EventType, TableMapEventData}
case class ConnectionParams(
hostname: String,
port: Int,
username: String,
password: String,
position: Option[BinLogPosition])
object RepoWatcher {
def asSource(params: ConnectionParams, bufferSize: Int = 1000): Source[RepoInsertEvent, NotUsed] = {
val client = createClient(params)
val q = new ArrayBlockingQueue[RepoInsertEvent](bufferSize)
client.registerEventListener(new RepoInsertListener(q.put))
val source: Source[RepoInsertEvent, NotUsed] = Source.unfoldResource(
create = () => (),
read = (_: Unit) => {
// Blocks until something is available in our queue.
Some(q.take())
},
close = (_: Unit) => {
// when the stream is done, disconnects the client. This causes the thread to sleep.
client.disconnect()
}
)
new Thread("repo-binlog-event-loop") {
override def run(): Unit = {
client.connect()
}
}.start()
source
}
}
开发者ID:trueaccord,项目名称:repos,代码行数:49,代码来源:RepoWatcher.scala
示例4: InputStreamingChannelInitializer
//设置package包名称以及导入依赖的类
package com.bwsw.sj.engine.input.connection.tcp.server
import java.util.concurrent.ArrayBlockingQueue
import com.bwsw.sj.engine.core.input.InputStreamingExecutor
import io.netty.buffer.ByteBuf
import io.netty.channel.socket.SocketChannel
import io.netty.channel.{ChannelHandlerContext, ChannelInitializer}
import io.netty.handler.codec.string.StringEncoder
import io.netty.handler.logging.{LogLevel, LoggingHandler}
import scala.collection.concurrent
class InputStreamingChannelInitializer(executor: InputStreamingExecutor,
channelContextQueue: ArrayBlockingQueue[ChannelHandlerContext],
bufferForEachContext: concurrent.Map[ChannelHandlerContext, ByteBuf])
extends ChannelInitializer[SocketChannel] {
def initChannel(channel: SocketChannel) = {
val pipeline = channel.pipeline()
pipeline.addLast("logger", new LoggingHandler(LogLevel.WARN))
pipeline.addLast("encoder", new StringEncoder())
pipeline.addLast("handler", new InputStreamingServerHandler(executor, channelContextQueue, bufferForEachContext))
}
}
开发者ID:bwsw,项目名称:sj-platform,代码行数:28,代码来源:InputStreamingChannelInitializer.scala
示例5: InputStreamingServer
//设置package包名称以及导入依赖的类
package com.bwsw.sj.engine.input.connection.tcp.server
import java.util.concurrent.{Callable, ArrayBlockingQueue}
import com.bwsw.sj.engine.core.input.InputStreamingExecutor
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.ByteBuf
import io.netty.channel._
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.{LogLevel, LoggingHandler}
import org.slf4j.LoggerFactory
import scala.collection.concurrent
class InputStreamingServer(host: String,
port: Int,
executor: InputStreamingExecutor,
channelContextQueue: ArrayBlockingQueue[ChannelHandlerContext],
bufferForEachContext: concurrent.Map[ChannelHandlerContext, ByteBuf]) extends Callable[Unit] {
private val logger = LoggerFactory.getLogger(this.getClass)
override def call() = {
logger.info(s"Launch input streaming server on: '$host:$port'\n")
val bossGroup: EventLoopGroup = new NioEventLoopGroup()
val workerGroup = new NioEventLoopGroup()
try {
val bootstrapServer = new ServerBootstrap()
bootstrapServer.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new InputStreamingChannelInitializer(executor, channelContextQueue, bufferForEachContext))
bootstrapServer.bind(host, port).sync().channel().closeFuture().sync()
} finally {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
}
}
开发者ID:bwsw,项目名称:sj-platform,代码行数:44,代码来源:InputStreamingServer.scala
示例6: Message
//设置package包名称以及导入依赖的类
package com.twitter.diffy.lifter
import com.twitter.concurrent.NamedPoolThreadFactory
import com.twitter.util.{ExecutorServiceFuturePool, Future, FuturePool}
import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
case class Message(endpoint: Option[String], result: FieldMap[Any])
trait MapLifter {
def apply(input: Array[Byte]): Future[Message]
}
object MapLifterPool {
val QueueSizeDefault = 5
def apply(mapLifterFactory: => MapLifter) = {
val executorService =
new ThreadPoolExecutor(
3, // core pool size
10, // max pool size
500, // keep alive time
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue[Runnable](10), // work queue
new NamedPoolThreadFactory("maplifter", makeDaemons = true),
new ThreadPoolExecutor.AbortPolicy()
)
executorService.prestartCoreThread()
new MapLifterPool(mapLifterFactory, new ExecutorServiceFuturePool(executorService))
}
}
class MapLifterPool(underlying: MapLifter, futurePool: FuturePool) extends MapLifter {
override def apply(input: Array[Byte]): Future[Message] =
(futurePool { underlying(input) }).flatten
}
开发者ID:sachinmanchanda,项目名称:diffy_unicast,代码行数:36,代码来源:MapLifter.scala
注:本文中的java.util.concurrent.ArrayBlockingQueue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论