本文整理汇总了Scala中java.util.concurrent.ConcurrentLinkedQueue类的典型用法代码示例。如果您正苦于以下问题:Scala ConcurrentLinkedQueue类的具体用法?Scala ConcurrentLinkedQueue怎么用?Scala ConcurrentLinkedQueue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ConcurrentLinkedQueue类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: AccessLogQueue
//设置package包名称以及导入依赖的类
package httpmock
import java.util.concurrent.ConcurrentLinkedQueue
import play.api.mvc._
final case class AccessLogQueue(queue: ConcurrentLinkedQueue[AccessLog] = new ConcurrentLinkedQueue[AccessLog]()) {
def add(request: RequestHeader): Unit = queue.add(AccessLog(request))
def add(request: RequestHeader, body: Option[ArrayByte]): Unit = queue.add(AccessLog(request, body))
def shift(): Option[AccessLog] = Option(queue.poll())
def filter(p: AccessLog => Boolean): Seq[AccessLog] = {
val array = scala.collection.mutable.ArrayBuffer[AccessLog]()
val it = queue.iterator()
while (it.hasNext) {
val log = it.next()
if (p(log))
array += log
}
array.toSeq
}
}
开发者ID:xuwei-k,项目名称:httpmock,代码行数:22,代码来源:AccessLogQueue.scala
示例2: MoveMessage
//设置package包名称以及导入依赖的类
package com.catinthedark.shapeshift.network
import java.util.concurrent.ConcurrentLinkedQueue
import com.badlogic.gdx.math.Vector2
import com.catinthedark.lib.Pipe
import com.catinthedark.lib.network.messages.Message
case class MoveMessage(x: Float, y: Float, angle: Float, idle: Boolean) extends Message
case class JumpMessage(x: Float, y: Float, angle: Float, scale: Float) extends Message
case class ShootMessage(x: Float, y: Float, shotObject: String) extends Message
trait NetworkControl extends Runnable {
var isConnected: Option[Unit] = None
var isMain: Boolean = false
val onMovePipe = new Pipe[(Vector2, Float, Boolean)]()
val onShootPipe = new Pipe[(Vector2, String)]()
val onJumpPipe = new Pipe[(Vector2, Float, Float)]()
val onEnemyDisconnected = new Pipe[Unit]()
def move(pos: Vector2, angle: Float, idle: Boolean): Unit = {
processOut(new MoveMessage(x=pos.x, y=pos.y, angle = angle, idle = idle))
}
def shoot(shotFrom: Vector2, objName: String): Unit = {
processOut(new ShootMessage(x = shotFrom.x, y = shotFrom.y, shotObject = objName))
}
def jump(pos: Vector2, angle: Float, scale: Float): Unit = {
processOut(new JumpMessage(x = pos.x, y = pos.y, angle = angle, scale = scale))
}
def processIn() = {
while(!bufferIn.isEmpty)
bufferIn.poll()()
}
def processOut(message: Any)
def dispose(): Unit = {
isConnected = None
}
protected val bufferIn = new ConcurrentLinkedQueue[() => Unit]()
protected def onMove(msg: (Vector2, Float, Boolean)) = bufferIn.add(() => onMovePipe(msg))
protected def onShoot(objName: String, shotFrom: Vector2) = bufferIn.add(() => onShootPipe(shotFrom, objName))
protected def onJump(msg: (Vector2, Float, Float)) = bufferIn.add(() => onJumpPipe(msg))
protected def onGameStarted(msg: (String, String)) = println(s"Received GameStart package $msg")
}
开发者ID:cat-in-the-dark,项目名称:old48_35_game,代码行数:53,代码来源:NetworkControl.scala
示例3: close
//设置package包名称以及导入依赖的类
package com.twitter.finagle.util
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import java.util.logging.{Logger, Level}
trait Chan[-T] {
def !(elem: T)
def close()
}
trait Proc[-T] extends Chan[T] {
private[this] val q = new ConcurrentLinkedQueue[T]
private[this] val nq = new AtomicInteger(0)
@volatile private[this] var closed = false
def close() { closed = true }
def !(elem: T) {
q.offer(elem)
if (nq.getAndIncrement() == 0)
do {
val elem = q.poll()
// Swallow exceptions as these would cause
// unbounded queue growth.
if (!closed) {
try receiver(elem) catch {
case exc: Throwable =>
Logger.getLogger("").log(Level.WARNING, "Exception thrown in proc", exc)
}
}
} while (nq.decrementAndGet() > 0)
}
def receive: T => Unit
private[this] val receiver = receive
}
object Proc {
def apply[T](iteratee: T => Unit): Proc[T] = new Proc[T] {
def receive = iteratee
}
val nil: Proc[Any] = new Proc[Any] { def receive = Function.const(()) }
}
开发者ID:wenkeyang,项目名称:finagle,代码行数:47,代码来源:Chan.scala
示例4: Step1PrepareSignedUsers
//设置package包名称以及导入依赖的类
package bbc.newsletter
import java.io._
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import io.gatling.http.cookie._
class Step1PrepareSignedUsers extends Simulation {
val httpProtocol = http
.baseURL("https://ssl.stage.bbc.co.uk")
.acceptCharsetHeader("ISO-8859-1,utf-8;q=0.7,*;q=0.7")
.acceptHeader("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8")
.acceptLanguageHeader("fr-FR,fr;q=0.8,en-US;q=0.6,en;q=0.4")
.acceptEncodingHeader("gzip, deflate, sdch")
.userAgentHeader("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.111 Safari/537.36")
val cookieJars = new ConcurrentLinkedQueue[CookieJar]
import java.util.concurrent.atomic._
val value = new AtomicInteger(1)
val prepareSignedUsers = scenario("prepareSignedUsers")
.exec(http("landingPage").get("/newsletters/thenolanshow"))
.exec(http("signIn").get("/id/signin?ptrt=https%3A%2F%2Fssl.stage.bbc.co.uk%2Fnewsletters%2Fthenolanshow"))
.exec(http("signInPost").post("/id/signin?ptrt=https%3A%2F%2Fssl.stage.bbc.co.uk%2Fnewsletters%2Fthenolanshow")
.formParam("unique", session => s"[email protected]${value.getAndIncrement}.com")
.formParam("password", "loadtest")
.formParam("rememberme", "1")
.formParam("bbcid_submit_button", "Sign in")
.check(substring("Please confirm your age")))
// drop non persistent cookies
.exec(flushSessionCookies)
// save the cookieJar in the global queue
.exec { session =>
CookieHandling.cookieJar(session).foreach(cookieJars.add)
session
}
setUp(prepareSignedUsers.inject(
rampUsersPerSec(10) to(20) during(5 minutes),
constantUsersPerSec(20) during(5 minutes)
).protocols(httpProtocol))
after {
CookieJarHelper.dumpCookieJarFeeder(cookieJars)
}
}
开发者ID:yabols,项目名称:bbcGatling,代码行数:56,代码来源:Step1PrepareSignedUsers.scala
示例5: ConfirmListenerAdapter
//设置package包名称以及导入依赖的类
package com.hellosoda.rmq.impl
import com.rabbitmq.client._
import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.Promise
private[rmq] class ConfirmListenerAdapter (
val confirms : ConcurrentLinkedQueue[PublisherConfirm])
extends ConfirmListener {
private def completeWith (
deliveryTag : Long,
multiple : Boolean)(
action : (Promise[Unit]) => Unit
) : Unit =
if (!multiple) {
val iter = confirms.iterator()
while (iter.hasNext) {
val pc = iter.next()
if (pc.seqNo == deliveryTag) {
confirms.remove(pc)
action(pc.promise)
return
}
}
} else {
val iter = confirms.iterator()
while (iter.hasNext) {
val pc = iter.next()
if (pc.seqNo > deliveryTag)
return
confirms.remove(pc)
action(pc.promise)
}
}
override def handleAck (
deliveryTag : Long,
multiple : Boolean
) : Unit =
completeWith(deliveryTag, multiple)(_.success(()))
override def handleNack (
deliveryTag : Long,
multiple : Boolean
) : Unit =
completeWith(deliveryTag, multiple)(_.failure(
new java.io.IOException("NACK")))
}
开发者ID:hellosoda,项目名称:RMQ,代码行数:52,代码来源:ConfirmListenerAdapter.scala
示例6: AsyncMutex
//设置package包名称以及导入依赖的类
package com.hellosoda.rmq.impl
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{
ExecutionContext,
Future,
Promise }
private[rmq] class AsyncMutex {
private val queue = new ConcurrentLinkedQueue[Runnable]()
private val queueSize = new AtomicInteger(0)
def acquire [T] (f : => T)(implicit ec : ExecutionContext) : Future[T] = {
val promise = Promise[T]()
queue.add(new Runnable {
def run () : Unit =
Future(f).onComplete { any =>
promise.complete(any)
queue.poll()
if (queueSize.decrementAndGet() > 0)
queue.peek().run()
}
})
if (queueSize.getAndIncrement() == 0) {
queue.peek().run()
}
promise.future
}
}
开发者ID:hellosoda,项目名称:RMQ,代码行数:34,代码来源:AsyncMutex.scala
示例7: CustomMessageQueue
//设置package包名称以及导入依赖的类
package com.nossin.ndb.custommailbox
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.ActorRef
import akka.dispatch.{Envelope, MessageQueue}
class CustomMessageQueue extends MessageQueue {
private final val queue = new
ConcurrentLinkedQueue[Envelope]()
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
{
if(handle.sender.path.name == "MyActor") {
println("in enqueue, actor identified ")
handle.sender ! "Hey dude, How are you?, I Know your name,processing your request"
queue.offer(handle)
}
else {
println("in enqueue, unknown actor")
handle.sender ! "I don't talk to strangers, Ibcan't process your request"
}
}
def dequeue(): Envelope = queue.poll
def numberOfMessages: Int = queue.size
def hasMessages: Boolean = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
开发者ID:michelnossin,项目名称:ndb,代码行数:36,代码来源:CustomMessageQueue.scala
示例8: close
//设置package包名称以及导入依赖的类
package com.twitter.finagle.util
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import java.util.logging.{Logger, Level}
trait Chan[-T] {
def !(elem: T)
def close()
}
trait Proc[-T] extends Chan[T] {
private[this] val q = new ConcurrentLinkedQueue[T]
private[this] val nq = new AtomicInteger(0)
@volatile private[this] var closed = false
def close() { closed = true }
def !(elem: T) {
q.offer(elem)
if (nq.getAndIncrement() == 0)
do {
val elem = q.poll()
// Swallow exceptions as these would cause
// unbounded queue growth.
if (!closed) {
try receiver(elem) catch {
case exc =>
Logger.getLogger("").log(Level.WARNING, "Exception thrown in proc", exc)
}
}
} while (nq.decrementAndGet() > 0)
}
def receive: T => Unit
private[this] val receiver = receive
}
object Proc {
def apply[T](iteratee: T => Unit): Proc[T] = new Proc[T] {
def receive = iteratee
}
val nil: Proc[Any] = new Proc[Any] { def receive = Function.const(()) }
}
开发者ID:deenar,项目名称:fintest,代码行数:47,代码来源:Chan.scala
示例9: ThrottleTest11
//设置package包名称以及导入依赖的类
package tests
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class ThrottleTest11 extends FlatSpec with Matchers {
case class Request()
object DesAPI {
def process(a: Request): Future[String] = Future {
Thread.sleep(500)
println("Done")
"Done"
}
}
val numActives = new AtomicInteger(0)
def queue = new ConcurrentLinkedQueue[Request]()
def apply(req: Request): Future[String] = {
if (numActives.get > 10) {
//two many concurrent requests - refused
println("429 Too many requests")
Future.successful("429 Too many requests")
} else {
//good to process
println(numActives.incrementAndGet())
val futureString = DesAPI.process(req)
futureString.onComplete {
_ => println(numActives.decrementAndGet())
}
futureString
}
}
"a" should "b" in {
val futures: Seq[Future[String]] = for (t <- 1 to 100) yield {
val eventualString: Future[String] = { Thread.sleep(50); apply(Request()) }
eventualString
}
Await.result(Future.sequence(futures), 5000 millis)
// Thread.sleep(5000)
}
}
开发者ID:ralreiroe,项目名称:antonio,代码行数:60,代码来源:ThrottleTest11.scala
示例10: Job
//设置package包名称以及导入依赖的类
package com.twitter.concurrent
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import com.twitter.util.{Future, Promise, Try}
trait Serialized {
protected case class Job[T](promise: Promise[T], doItToIt: () => T) {
def apply() {
promise.update { Try { doItToIt() } }
}
}
private[this] val nwaiters = new AtomicInteger(0)
protected val serializedQueue: java.util.Queue[Job[_]] = new ConcurrentLinkedQueue[Job[_]]
protected def serialized[A](f: => A): Future[A] = {
val result = new Promise[A]
serializedQueue add { Job(result, () => f) }
if (nwaiters.getAndIncrement() == 0) {
do {
Try { serializedQueue.remove()() }
} while (nwaiters.decrementAndGet() > 0)
}
result
}
}
开发者ID:lanshuijuntuan,项目名称:Java.util,代码行数:33,代码来源:Serialized.scala
示例11: IterableBlockingQueue
//设置package包名称以及导入依赖的类
package com.cj.messagestreaming
import java.util.concurrent.ConcurrentLinkedQueue
class IterableBlockingQueue[T]
extends java.lang.Iterable[T] with Queue[T] {
private val queue: java.util.Queue[T] = new ConcurrentLinkedQueue[T]
private var isDone: Boolean = false
def size: Int = queue.size
def done(): Unit = {
isDone = true
}
def add(o: T): Unit = {
queue.add(o)
}
def iterator: java.util.Iterator[T] = new java.util.Iterator[T] {
override def hasNext: Boolean = {
try
while (queue.isEmpty && !isDone) {
Thread.sleep(300)
}
catch {
case e: InterruptedException =>
}
!(queue.isEmpty && isDone)
}
override def next: T = {
queue.remove
}
override def remove(): Unit = queue.remove
}
}
开发者ID:cjdev,项目名称:message-streaming,代码行数:43,代码来源:IterableBlockingQueue.scala
示例12: ProcessorTest
//设置package包名称以及导入依赖的类
package alexsmirnov.stream
import scala.collection.JavaConverters._
import org.scalatest.FlatSpec
import org.scalatest.concurrent.Eventually
import org.scalatest.prop.PropertyChecks
import org.reactivestreams._
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
class ProcessorTest extends FlatSpec with Eventually with PropertyChecks {
class SubscriberProbe[A] extends Subscriber[A] {
private val _buffer = new ConcurrentLinkedQueue[A]()
private val _completes = new AtomicInteger
private val _errors = new ConcurrentLinkedQueue[Throwable]()
var subscription: Subscription = null
// implementation
def onSubscribe(s: Subscription) {subscription = s}
def onNext(a: A) = {_buffer.add(a)}
def onComplete() { _completes.incrementAndGet() }
def onError(e: Throwable) { _errors.add(e) }
// control methods
def received = _buffer.asScala
def errors = _errors.asScala
def cancel = subscription.cancel()
def request(n: Long) = subscription.request(n)
def completed = _completes.get
}
"FlatMap" should "request first event on request from subscriber" in {
}
it should "transform input to collections and send them to subscriber" in {
}
it should "pass cancel event back to producer and stop transmission" in {
}
it should "forget remaining events on cancel" in {
}
it should "start new stream after cancel on first request from subscriber" in {
}
}
开发者ID:alexsmirnov,项目名称:printrbot-g2-console,代码行数:45,代码来源:ProcessorTest.scala
示例13: Sync
//设置package包名称以及导入依赖的类
package pl.edu.agh.workflow_processes.synchronization
import java.util.concurrent.ConcurrentLinkedQueue
import pl.edu.agh.actions._
import pl.edu.agh.workflow_processes.Pattern
class Sync[T, R](var _name: String,
var _numOfIns: Int,
var _numOfOuts: Int,
var _ins: Seq[String],
var _outs: Seq[String],
var _action: IMultipleAction[T, R]) extends Pattern[T, R] {
def this() {
this("", 0, 0, Seq.empty[String], Seq.empty[String], EmptyMultipleAction[T, R]())
}
//Ewentualnie mozna uzyc LinkedBlockingQueue
lazy val syncPointsQueues = {
var res = Seq.empty[ConcurrentLinkedQueue[T]]
var insCount: Int = 0
if (_numOfIns > 0) {
insCount = _numOfIns
} else if (_ins.nonEmpty) {
insCount = _ins.size
}
for (i <- 0 until insCount) {
res :+= new ConcurrentLinkedQueue[T]()
}
res
}
override lazy val actor = SyncActor(_name, _numOfOuts, _ins, _outs, _action, syncPointsQueues)
}
object Sync {
def apply[T, R](name: String, numOfIns: Int, numOfOuts: Int, action: (Ins[T], Outs) => Unit) = new Sync[T, R](name, numOfIns, numOfOuts, Seq.empty, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, numOfIns: Int, outs: Seq[String], action: (Ins[T], Outs) => Unit)(implicit d: DummyImplicit) = new Sync[T, R](name, numOfIns, 0, Seq.empty, outs, ActionConverter[T, R](action))
def apply[T, R](name: String, ins: Seq[String], numOfOuts: Int, action: (Ins[T], Outs) => Unit) = new Sync[T, R](name, 0, numOfOuts, ins, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, ins: Seq[String], outs: Seq[String], action: (Ins[T], Outs) => Unit) = new Sync[T, R](name, 0, 0, ins, outs, ActionConverter[T, R](action))
def apply[T, R](name: String, numOfIns: Int, numOfOuts: Int, action: Ins[T] => Outs => Unit) = new Sync[T, R](name, numOfIns, numOfOuts, Seq.empty, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, numOfIns: Int, outs: Seq[String], action: Ins[T] => Outs => Unit)(implicit d: DummyImplicit) = new Sync[T, R](name, numOfIns, 0, Seq.empty, outs, ActionConverter[T, R](action))
def apply[T, R](name: String, ins: Seq[String], numOfOuts: Int, action: Ins[T] => Outs => Unit) = new Sync[T, R](name, 0, numOfOuts, ins, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, ins: Seq[String], outs: Seq[String], action: Ins[T] => Outs => Unit) = new Sync[T, R](name, 0, 0, ins, outs, ActionConverter[T, R](action))
def apply[T, R] = new Sync[T, R]
}
开发者ID:kkrzys,项目名称:akkaflow,代码行数:49,代码来源:Sync.scala
示例14: SyncActor
//设置package包名称以及导入依赖的类
package pl.edu.agh.workflow_processes.synchronization
import java.util.concurrent.ConcurrentLinkedQueue
import pl.edu.agh.workflow_processes.{PatternActor, PatternOuts}
import scala.util.control.Breaks._
import akka.actor.{ActorLogging, Props}
import pl.edu.agh.actions.{IMultipleAction, Ins, Outs}
import pl.edu.agh.messages._
class SyncActor[T, R](numOfOuts: Int, ins: Seq[String], outs: Seq[String], var multipleAction: IMultipleAction[T, R], syncPoints: Seq[ConcurrentLinkedQueue[T]]) extends PatternActor(numOfOuts, outs, multipleAction) with PatternOuts[R] with ActorLogging {
def receive = {
case SyncDataMessage(data: T, uId) =>
syncPoints(uId).offer(data)
self ! GetResult
case GetResult =>
var canExecuteAction = true
breakable {
for (q <- syncPoints) {
val el = q.peek()
if (el == null) {
canExecuteAction = false
break
}
}
}
if (canExecuteAction) {
var sync = Map.empty[String, T]
if (ins.nonEmpty) {
for (i <- 0 until syncPoints.size) {
sync = sync + (ins(i) -> syncPoints(i).poll())
}
} else {
for (i <- 0 until syncPoints.size) {
sync = sync + (("in" + i) -> syncPoints(i).poll())
}
}
multipleAction.execute(Ins(sync))(Outs(_outs))
}
case ChangeAction(act: IMultipleAction[T, R]) =>
multipleAction = act
case Get =>
sender ! this
}
}
object SyncActor {
import pl.edu.agh.utils.ActorUtils.system
def apply[T, R](name: String, numOfOuts: Int, ins: Seq[String], outs: Seq[String], action: IMultipleAction[T, R], syncPoints: Seq[ConcurrentLinkedQueue[T]]) = system.actorOf(SyncActor.props(numOfOuts, ins, outs, action, syncPoints), name)
def props[T, R](numOfOuts: Int, ins: Seq[String], outs: Seq[String], action: IMultipleAction[T, R], syncPoints: Seq[ConcurrentLinkedQueue[T]]) = Props(classOf[SyncActor[T, R]], numOfOuts, ins, outs, action, syncPoints)
}
开发者ID:kkrzys,项目名称:akkaflow,代码行数:60,代码来源:SyncActor.scala
示例15: DiscActor
//设置package包名称以及导入依赖的类
package pl.edu.agh.workflow_processes.discriminator
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.{ActorLogging, Props}
import pl.edu.agh.actions.{IMultipleAction, Ins, Outs}
import pl.edu.agh.messages._
import pl.edu.agh.workflow_processes.{PatternActor, PatternOuts}
class DiscActor[T, R](numOfOuts: Int, ins: Seq[String], n: Int, outs: Seq[String], var multipleAction: IMultipleAction[T, R], syncPoints: Seq[ConcurrentLinkedQueue[T]]) extends PatternActor(numOfOuts, outs, multipleAction) with PatternOuts[R] with ActorLogging {
var synchronizedInputs = Set.empty[Int]
def receive = {
case SyncDataMessage(data: T, uId) =>
syncPoints(uId).offer(data)
self ! GetResult
case GetResult =>
for (i <- 0 until syncPoints.size) {
val el = syncPoints(i).peek()
if (el != null && synchronizedInputs.size < n) {
synchronizedInputs += i
}
}
if (synchronizedInputs.size == n) {
var sync = Map.empty[String, T]
if (ins.nonEmpty) {
for (i <- synchronizedInputs) {
sync = sync + (ins(i) -> syncPoints(i).poll())
}
} else {
for (i <- synchronizedInputs) {
sync = sync + (("in" + i) -> syncPoints(i).poll())
}
}
synchronizedInputs = Set.empty[Int]
multipleAction.execute(Ins(sync))(Outs(_outs))
}
case ChangeAction(act: IMultipleAction[T, R]) =>
multipleAction = act
case Get =>
sender ! this
}
}
object DiscActor {
import pl.edu.agh.utils.ActorUtils.system
def apply[T, R](name: String, numOfOuts: Int, ins: Seq[String], n: Int, outs: Seq[String], action: IMultipleAction[T, R], syncPoints: Seq[ConcurrentLinkedQueue[T]]) = system.actorOf(DiscActor.props(numOfOuts, ins, n, outs, action, syncPoints), name)
def props[T, R](numOfOuts: Int, ins: Seq[String], n: Int, outs: Seq[String], action: IMultipleAction[T, R], syncPoints: Seq[ConcurrentLinkedQueue[T]]) = Props(classOf[DiscActor[T, R]], numOfOuts, ins, n, outs, action, syncPoints)
}
开发者ID:kkrzys,项目名称:akkaflow,代码行数:59,代码来源:DiscActor.scala
示例16: Disc
//设置package包名称以及导入依赖的类
package pl.edu.agh.workflow_processes.discriminator
import java.util.concurrent.ConcurrentLinkedQueue
import akka.actor.ActorRef
import pl.edu.agh.actions._
import pl.edu.agh.workflow_processes.Pattern
class Disc[T, R](var _name: String,
var _n: Int,
var _m: Int,
var _numOfOuts: Int,
var _ins: Seq[String],
var _outs: Seq[String],
var _action: IMultipleAction[T, R]) extends Pattern[T, R] {
def this() {
this("", 0, 0, 0, Seq.empty[String], Seq.empty[String], EmptyMultipleAction[T, R]())
}
lazy val syncPointsQueues = {
var res = Seq.empty[ConcurrentLinkedQueue[T]]
var insCount: Int = 0
if (_m > 0) {
insCount = _m
} else if (_ins.nonEmpty) {
insCount = _ins.size
}
for (i <- 0 until insCount) {
res :+= new ConcurrentLinkedQueue[T]()
}
res
}
override lazy val actor: ActorRef = DiscActor(_name, _numOfOuts, _ins, _n, _outs, _action, syncPointsQueues)
}
object Disc {
def apply[T, R](name: String, n: Int, m: Int, numOfOuts: Int, action: (Ins[T], Outs) => Unit) = new Disc[T, R](name, n, m, numOfOuts, Seq.empty, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, m: Int, outs: Seq[String], action: (Ins[T], Outs) => Unit)(implicit d: DummyImplicit) = new Disc[T, R](name, n, m, 0, Seq.empty, outs, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, ins: Seq[String], numOfOuts: Int, action: (Ins[T], Outs) => Unit) = new Disc[T, R](name, n, 0, numOfOuts, ins, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, ins: Seq[String], outs: Seq[String], action: (Ins[T], Outs) => Unit) = new Disc[T, R](name, n, 0, 0, ins, outs, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, m: Int, numOfOuts: Int, action: Ins[T] => Outs => Unit) = new Disc[T, R](name, n, m, numOfOuts, Seq.empty, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, m: Int, outs: Seq[String], action: Ins[T] => Outs => Unit)(implicit d: DummyImplicit) = new Disc[T, R](name, n, m, 0, Seq.empty, outs, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, ins: Seq[String], numOfOuts: Int, action: Ins[T] => Outs => Unit) = new Disc[T, R](name, n, 0, numOfOuts, ins, Seq.empty, ActionConverter[T, R](action))
def apply[T, R](name: String, n: Int, ins: Seq[String], outs: Seq[String], action: Ins[T] => Outs => Unit) = new Disc[T, R](name, n, 0, 0, ins, outs, ActionConverter[T, R](action))
def apply[T, R] = new Disc[T, R]
}
开发者ID:kkrzys,项目名称:akkaflow,代码行数:49,代码来源:Disc.scala
注:本文中的java.util.concurrent.ConcurrentLinkedQueue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论