本文整理汇总了Scala中scala.collection.immutable类的典型用法代码示例。如果您正苦于以下问题:Scala immutable类的具体用法?Scala immutable怎么用?Scala immutable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了immutable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: HTableStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase.javadsl
import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.internal.HBaseFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import scala.collection.immutable
import scala.concurrent.Future
object HTableStage {
def table[T](conf: Configuration,
tableName: TableName,
columnFamilies: java.util.List[String],
converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
import scala.compat.java8.FunctionConverters._
import scala.collection.JavaConverters._
HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
}
def sink[A](config: HTableSettings[A]): akka.stream.javadsl.Sink[A, Future[Done]] =
Flow[A].via(flow(config)).toMat(Sink.ignore)(Keep.right).asJava
def flow[A](settings: HTableSettings[A]): akka.stream.javadsl.Flow[A, A, NotUsed] =
Flow.fromGraph(new HBaseFlowStage[A](settings)).asJava
}
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:HTableStage.scala
示例2: Directory
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.file.scaladsl
import java.nio.file.{FileVisitOption, Files, Path}
import akka.NotUsed
import akka.stream.scaladsl.{Source, StreamConverters}
import scala.collection.immutable
object Directory {
def walk(directory: Path,
maxDepth: Option[Int] = None,
fileVisitOptions: immutable.Seq[FileVisitOption] = Nil): Source[Path, NotUsed] = {
require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
val factory = maxDepth match {
case None =>
() =>
Files.walk(directory, fileVisitOptions: _*)
case Some(maxDepth) =>
() =>
Files.walk(directory, maxDepth, fileVisitOptions: _*)
}
StreamConverters.fromJavaStream(factory)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:30,代码来源:Directory.scala
示例3: route
//设置package包名称以及导入依赖的类
package com.ulasakdeniz.hakker
import akka.http.scaladsl.marshalling.{Marshal, ToEntityMarshaller}
import akka.http.scaladsl.model.{HttpHeader, HttpResponse, ResponseEntity, StatusCode}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import com.typesafe.config.ConfigFactory
import com.ulasakdeniz.hakker.template.Render
import de.heikoseeberger.akkahttpcirce.CirceSupport._
import io.circe.{Encoder, Json}
import io.circe.syntax._
import scala.collection.immutable
import scala.concurrent.ExecutionContext
trait Controller extends Render {
override lazy val config = ConfigFactory.load()
val StatusCodes = akka.http.scaladsl.model.StatusCodes
def route: Route
def apply(): Route = {
get {
// render frontend files
pathPrefix("js") {
renderDir("js")
}
} ~ route
}
def send(statusCode: StatusCode): Route = complete(statusCode)
def send[T](statusCode: StatusCode, content: T, headers: immutable.Seq[HttpHeader] = Nil)(
implicit marshaller: ToEntityMarshaller[T],
ec: ExecutionContext): Route = {
val response = Marshal(content)
.to[ResponseEntity](marshaller, ec)
.map(entity => {
HttpResponse(statusCode, headers = headers).withEntity(entity)
})
complete(response)
}
def sendJson[T](statusCode: StatusCode, content: T)(implicit encoder: Encoder[T],
ec: ExecutionContext): Route = {
sendJson(statusCode, content.asJson)
}
def sendJson[T](content: T)(implicit encoder: Encoder[T], ec: ExecutionContext): Route = {
sendJson(StatusCodes.OK, content)
}
def sendJson(statusCode: StatusCode, json: Json)(implicit ec: ExecutionContext): Route = {
send(statusCode, Option(json.noSpaces))
}
}
开发者ID:ulasakdeniz,项目名称:hakker,代码行数:58,代码来源:Controller.scala
示例4: RelativeActorPathSpec
//设置package包名称以及导入依赖的类
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.Matchers
import java.net.URLEncoder
import scala.collection.immutable
class RelativeActorPathSpec extends WordSpec with Matchers {
def elements(path: String): immutable.Seq[String] = RelativeActorPath.unapply(path).getOrElse(Nil)
"RelativeActorPath" must {
"match single name" in {
elements("foo") should be(List("foo"))
}
"match path separated names" in {
elements("foo/bar/baz") should be(List("foo", "bar", "baz"))
}
"match url encoded name" in {
val name = URLEncoder.encode("akka://[email protected]:2552", "UTF-8")
elements(name) should be(List(name))
}
"match path with uid fragment" in {
elements("foo/bar/baz#1234") should be(List("foo", "bar", "baz#1234"))
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:28,代码来源:RelativeActorPathSpec.scala
示例5: TestSetup
//设置package包名称以及导入依赖的类
package akka.io
import scala.annotation.tailrec
import scala.collection.immutable
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.actor.ActorRef
import akka.io.Inet.SocketOption
import akka.testkit.SocketUtil._
import Tcp._
trait TcpIntegrationSpecSupport { _: AkkaSpec ?
class TestSetup(shouldBindServer: Boolean = true) {
val bindHandler = TestProbe()
val endpoint = temporaryServerAddress()
if (shouldBindServer) bindServer()
def bindServer(): Unit = {
val bindCommander = TestProbe()
bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
bindCommander.expectMsg(Bound(endpoint))
}
def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
val connectCommander = TestProbe()
connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions))
val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected]
val clientHandler = TestProbe()
connectCommander.sender() ! Register(clientHandler.ref)
val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected]
val serverHandler = TestProbe()
bindHandler.sender() ! Register(serverHandler.ref)
(clientHandler, connectCommander.sender(), serverHandler, bindHandler.sender())
}
@tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit =
if (remaining > 0) {
val recv = handler.expectMsgType[Received]
expectReceivedData(handler, remaining - recv.data.size)
}
def connectOptions: immutable.Traversable[SocketOption] = Nil
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:TcpIntegrationSpecSupport.scala
示例6: filterEvents
//设置package包名称以及导入依赖的类
package akka
import language.implicitConversions
import akka.actor.ActorSystem
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.reflect.ClassTag
import scala.collection.immutable
import java.util.concurrent.TimeUnit.MILLISECONDS
package object testkit {
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ? T)(implicit system: ActorSystem): T = {
def now = System.currentTimeMillis
system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq]))
try {
val result = block
val testKitSettings = TestKitExtension(system)
val stop = now + testKitSettings.TestEventFilterLeeway.toMillis
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitSettings.TestEventFilterLeeway + ") waiting for " + _)
if (failed.nonEmpty)
throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))
result
} finally {
system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq]))
}
}
def filterEvents[T](eventFilters: EventFilter*)(block: ? T)(implicit system: ActorSystem): T = filterEvents(eventFilters.toSeq)(block)
def filterException[T <: Throwable](block: ? Unit)(implicit system: ActorSystem, t: ClassTag[T]): Unit = EventFilter[T]() intercept (block)
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
def dilated(implicit system: ActorSystem): FiniteDuration =
(duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration]
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:43,代码来源:package.scala
示例7: ReflectiveDynamicAccess
//设置package包名称以及导入依赖的类
package akka.actor
import scala.collection.immutable
import java.lang.reflect.InvocationTargetException
import scala.reflect.ClassTag
import scala.util.Try
class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
Try[Class[_ <: T]]({
val c = Class.forName(fqcn, false, classLoader).asInstanceOf[Class[_ <: T]]
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c)
})
override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
Try {
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
val constructor = clazz.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*)
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
} recover { case i: InvocationTargetException if i.getTargetException ne null ? throw i.getTargetException }
override def createInstanceFor[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
getClassFor(fqcn) flatMap { c ? createInstanceFor(c, args) }
override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {
val classTry =
if (fqcn.endsWith("$")) getClassFor(fqcn)
else getClassFor(fqcn + "$") recoverWith { case _ ? getClassFor(fqcn) }
classTry flatMap { c ?
Try {
val module = c.getDeclaredField("MODULE$")
module.setAccessible(true)
val t = implicitly[ClassTag[T]].runtimeClass
module.get(null) match {
case null ? throw new NullPointerException
case x if !t.isInstance(x) ? throw new ClassCastException(fqcn + " is not a subtype of " + t)
case x: T ? x
}
} recover { case i: InvocationTargetException if i.getTargetException ne null ? throw i.getTargetException }
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:DynamicAccess.scala
示例8: TcpIncomingConnection
//设置package包名称以及导入依赖的类
package akka.io
import java.nio.channels.SocketChannel
import scala.collection.immutable
import akka.actor.ActorRef
import akka.io.Inet.SocketOption
private[io] class TcpIncomingConnection(_tcp: TcpExt,
_channel: SocketChannel,
registry: ChannelRegistry,
bindHandler: ActorRef,
options: immutable.Traversable[SocketOption],
readThrottling: Boolean)
extends TcpConnection(_tcp, _channel, readThrottling) {
signDeathPact(bindHandler)
registry.register(channel, initialOps = 0)
def receive = {
case registration: ChannelRegistration ? completeConnect(registration, bindHandler, options)
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:25,代码来源:TcpIncomingConnection.scala
示例9: UdpSender
//设置package包名称以及导入依赖的类
package akka.io
import java.nio.channels.DatagramChannel
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.io.Inet.SocketOption
import akka.io.Udp._
import akka.actor._
private[io] class UdpSender(val udp: UdpExt,
channelRegistry: ChannelRegistry,
commander: ActorRef,
options: immutable.Traversable[SocketOption])
extends Actor with ActorLogging with WithUdpSend with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
val channel = {
val datagramChannel = DatagramChannel.open
datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket
options foreach { _.beforeDatagramBind(socket) }
datagramChannel
}
channelRegistry.register(channel, initialOps = 0)
def receive: Receive = {
case registration: ChannelRegistration ?
commander ! SimpleSenderReady
context.become(sendHandlers(registration))
}
override def postStop(): Unit = if (channel.isOpen) {
log.debug("Closing DatagramChannel after being stopped")
try channel.close()
catch {
case NonFatal(e) ? log.debug("Error closing DatagramChannel: {}", e)
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:43,代码来源:UdpSender.scala
示例10: Collections
//设置package包名称以及导入依赖的类
package akka.util
import scala.collection.immutable
import scala.annotation.tailrec
private[akka] object Collections {
case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
override final def iterator = Iterator.empty
override final def apply(idx: Int): Nothing = throw new java.lang.IndexOutOfBoundsException(idx.toString)
override final def length: Int = 0
}
abstract class PartialImmutableValuesIterable[From, To] extends immutable.Iterable[To] {
def isDefinedAt(from: From): Boolean
def apply(from: From): To
def valuesIterator: Iterator[From]
final def iterator: Iterator[To] = {
val superIterator = valuesIterator
new Iterator[To] {
private[this] var _next: To = _
private[this] var _hasNext = false
@tailrec override final def hasNext: Boolean =
if (!_hasNext && superIterator.hasNext) { // If we need and are able to look for the next value
val potentiallyNext = superIterator.next()
if (isDefinedAt(potentiallyNext)) {
_next = apply(potentiallyNext)
_hasNext = true
true
} else hasNext //Attempt to find the next
} else _hasNext // Return if we found one
override final def next(): To =
if (hasNext) {
val ret = _next
_next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
_hasNext = false // Mark as consumed (we need to look for the next value)
ret
} else throw new java.util.NoSuchElementException("next")
}
}
override lazy val size: Int = iterator.size
override def foreach[C](f: To ? C) = iterator foreach f
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:Collections.scala
示例11: preparePersistentBatch
//设置package包名称以及导入依赖的类
package akka.persistence.journal
import akka.persistence.{ PersistentRepr, Resequenceable }
import akka.actor.Actor
import scala.collection.immutable
private[akka] trait WriteJournalBase {
this: Actor ?
protected def preparePersistentBatch(rb: immutable.Seq[Resequenceable]): immutable.Seq[PersistentRepr] =
rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations
private def persistentPrepareWrite(r: Resequenceable): Boolean = r match {
case p: PersistentRepr ?
p.prepareWrite(); true
case _ ?
false
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:21,代码来源:WriteJournalBase.scala
示例12: IterableProducerTest
//设置package包名称以及导入依赖的类
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import scala.collection.immutable
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class IterableProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[Int](env, publisherShutdownTimeout)
with WithActorSystem with TestNGSuiteLike {
implicit val system = _system
def this(system: ActorSystem) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis)
}
def this() {
this(ActorSystem(classOf[IterableProducerTest].getSimpleName, AkkaSpec.testConf))
}
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == 0)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements
Flow(iterable).toProducer(materializer).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
Flow[Int](Nil).toProducer(materializer).getPublisher
override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:42,代码来源:IterableProducerTest.scala
示例13: FlowGroupedSpec
//设置package包名称以及导入依赖的类
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.ScriptedTest
import scala.collection.immutable
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ? random }
class FlowGroupedSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
"A Grouped" must {
"group evenly" in {
def script = Script((1 to 20) map { _ ? val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*)
(1 to 30) foreach (_ ? runScript(script, settings)(_.grouped(3)))
}
"group with rest" in {
def script = Script(((1 to 20).map { _ ? val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }
:+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*)
(1 to 30) foreach (_ ? runScript(script, settings)(_.grouped(3)))
}
}
}
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:32,代码来源:FlowGroupedSpec.scala
示例14: AccountSerializerRegistry
//设置package包名称以及导入依赖的类
package org.ioreskovic.greatmaterialcontinuum.impl.ent.acc
import com.lightbend.lagom.scaladsl.playjson.{JsonSerializer, JsonSerializerRegistry}
import org.ioreskovic.greatmaterialcontinuum.impl.cmd.acc._
import org.ioreskovic.greatmaterialcontinuum.impl.evt.acc._
import org.ioreskovic.greatmaterialcontinuum.impl.stt.acc.AccountState
import scala.collection.immutable
object AccountSerializerRegistry extends JsonSerializerRegistry {
override def serializers: immutable.Seq[JsonSerializer[_]] = immutable.Seq(
JsonSerializer[AccountState],
JsonSerializer[CreateAccount],
JsonSerializer[DeleteAccount],
JsonSerializer[ActivateAccount],
JsonSerializer[DeactivateAccount],
JsonSerializer[RetrieveAccount],
JsonSerializer[AccountCreated],
JsonSerializer[AccountDeleted],
JsonSerializer[AccountActivated],
JsonSerializer[AccountDeactivated],
JsonSerializer[AccountRetrieved]
)
}
开发者ID:ioreskovic,项目名称:great-material-continuum,代码行数:27,代码来源:AccountSerializerRegistry.scala
示例15: HTableSettings
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import scala.collection.immutable
final case class HTableSettings[T](conf: Configuration,
tableName: TableName,
columnFamilies: immutable.Seq[String],
converter: T => Put)
object HTableSettings {
def create[T](conf: Configuration,
tableName: TableName,
columnFamilies: java.util.List[String],
converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
import scala.compat.java8.FunctionConverters._
import scala.collection.JavaConverters._
HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
}
}
开发者ID:akka,项目名称:alpakka,代码行数:24,代码来源:HBaseSettings.scala
示例16: ServerSideEncryption
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.alpakka.s3.acl.CannedAcl
import scala.collection.immutable
sealed abstract class ServerSideEncryption(algorithm: String,
kmsKeyId: Option[String] = None,
context: Option[String] = None) {
def headers: immutable.Seq[HttpHeader] = algorithm match {
case "AES256" => RawHeader("x-amz-server-side-encryption", "AES256") :: Nil
case "aws:kms" if kmsKeyId.isDefined && context.isEmpty =>
RawHeader("x-amz-server-side-encryption", "aws:kms") ::
RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) ::
Nil
case "aws:kms" if kmsKeyId.isDefined && context.isDefined =>
RawHeader("x-amz-server-side-encryption", "aws:kms") ::
RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) ::
RawHeader("x-amz-server-side-encryption-context", context.get) ::
Nil
case _ => throw new IllegalArgumentException("Unsupported encryption algorithm.")
}
}
object ServerSideEncryption {
case object AES256 extends ServerSideEncryption("AES256")
case class KMS(keyId: String, context: Option[String] = None)
extends ServerSideEncryption("aws:kms", Some(keyId), context)
}
开发者ID:akka,项目名称:alpakka,代码行数:33,代码来源:S3Headers.scala
示例17: format
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv.scaladsl
import java.nio.charset.{Charset, StandardCharsets}
import akka.NotUsed
import akka.stream.alpakka.csv.{javadsl, CsvFormatter}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import scala.collection.immutable
def format[T <: immutable.Iterable[String]](
delimiter: Char = Comma,
quoteChar: Char = DoubleQuote,
escapeChar: Char = Backslash,
endOfLine: String = "\r\n",
quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
charset: Charset = StandardCharsets.UTF_8,
byteOrderMark: Option[ByteString] = None
): Flow[T, ByteString, NotUsed] = {
val formatter =
new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine, quotingStyle, charset)
byteOrderMark.fold {
Flow[T].map(formatter.toCsv).named("CsvFormatting")
} { bom =>
Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
}
}
}
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:CsvFormatting.scala
示例18: CsvToMapStage
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv
import java.nio.charset.Charset
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import scala.collection.immutable
private[csv] class CsvToMapStage(columnNames: Option[immutable.Seq[String]], charset: Charset)
extends GraphStage[FlowShape[immutable.Seq[ByteString], Map[String, ByteString]]] {
override protected def initialAttributes: Attributes = Attributes.name("CsvToMap")
private val in = Inlet[immutable.Seq[ByteString]]("CsvToMap.in")
private val out = Outlet[Map[String, ByteString]]("CsvToMap.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var headers = columnNames
setHandlers(in, out, this)
override def onPush(): Unit = {
val elem = grab(in)
if (headers.isDefined) {
val map = headers.get.zip(elem).toMap
push(out, map)
} else {
headers = Some(elem.map(_.decodeString(charset)))
pull(in)
}
}
override def onPull(): Unit = pull(in)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:41,代码来源:CsvToMapStage.scala
示例19: Collections
//设置package包名称以及导入依赖的类
package akka.util
import scala.collection.immutable
import scala.annotation.tailrec
private[akka] object Collections {
case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
override final def iterator = Iterator.empty
override final def apply(idx: Int): Nothing = throw new java.lang.IndexOutOfBoundsException(idx.toString)
override final def length: Int = 0
}
abstract class PartialImmutableValuesIterable[From, To] extends immutable.Iterable[To] {
def isDefinedAt(from: From): Boolean
def apply(from: From): To
def valuesIterator: Iterator[From]
final def iterator: Iterator[To] = {
val superIterator = valuesIterator
new Iterator[To] {
private[this] var _next: To = _
private[this] var _hasNext = false
@tailrec override final def hasNext: Boolean =
if (!_hasNext && superIterator.hasNext) { // If we need and are able to look for the next value
val potentiallyNext = superIterator.next()
if (isDefinedAt(potentiallyNext)) {
_next = apply(potentiallyNext)
_hasNext = true
true
} else hasNext //Attempt to find the next
} else _hasNext // Return if we found one
override final def next(): To =
if (hasNext) {
val ret = _next
_next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
_hasNext = false // Mark as consumed (we need to look for the next value)
ret
} else throw new java.util.NoSuchElementException("next")
}
}
override lazy val size: Int = iterator.size
override def foreach[C](f: To ? C) = iterator foreach f
}
}
开发者ID:Starofall,项目名称:Chakka,代码行数:50,代码来源:Collections.scala
示例20: AccumulateWhileUnchanged
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import scala.collection.immutable
final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
extends GraphStage[FlowShape[E, immutable.Seq[E]]] {
val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")
override def shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
private var currentState: Option[P] = None
private val buffer = Vector.newBuilder[E]
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
if (currentState.isEmpty || currentState.contains(nextState)) {
buffer += nextElement
pull(in)
} else {
val result = buffer.result()
buffer.clear()
buffer += nextElement
push(out, result)
}
currentState = Some(nextState)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
val result = buffer.result()
if (result.nonEmpty) {
emit(out, result)
}
completeStage()
}
})
override def postStop(): Unit = {
buffer.clear()
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:59,代码来源:AccumulateWhileUnchanged.scala
注:本文中的scala.collection.immutable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论