• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala immutable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中scala.collection.immutable的典型用法代码示例。如果您正苦于以下问题:Scala immutable类的具体用法?Scala immutable怎么用?Scala immutable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了immutable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: HTableStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase.javadsl

import akka.stream.alpakka.hbase.HTableSettings
import akka.stream.alpakka.hbase.internal.HBaseFlowStage
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.{Done, NotUsed}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put

import scala.collection.immutable
import scala.concurrent.Future

object HTableStage {

  def table[T](conf: Configuration,
               tableName: TableName,
               columnFamilies: java.util.List[String],
               converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
    import scala.compat.java8.FunctionConverters._
    import scala.collection.JavaConverters._
    HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
  }

  def sink[A](config: HTableSettings[A]): akka.stream.javadsl.Sink[A, Future[Done]] =
    Flow[A].via(flow(config)).toMat(Sink.ignore)(Keep.right).asJava

  def flow[A](settings: HTableSettings[A]): akka.stream.javadsl.Flow[A, A, NotUsed] =
    Flow.fromGraph(new HBaseFlowStage[A](settings)).asJava

} 
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:HTableStage.scala


示例2: Directory

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.file.scaladsl

import java.nio.file.{FileVisitOption, Files, Path}

import akka.NotUsed
import akka.stream.scaladsl.{Source, StreamConverters}

import scala.collection.immutable

object Directory {

  
  def walk(directory: Path,
           maxDepth: Option[Int] = None,
           fileVisitOptions: immutable.Seq[FileVisitOption] = Nil): Source[Path, NotUsed] = {
    require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
    val factory = maxDepth match {
      case None =>
        () =>
          Files.walk(directory, fileVisitOptions: _*)
      case Some(maxDepth) =>
        () =>
          Files.walk(directory, maxDepth, fileVisitOptions: _*)
    }

    StreamConverters.fromJavaStream(factory)
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:30,代码来源:Directory.scala


示例3: route

//设置package包名称以及导入依赖的类
package com.ulasakdeniz.hakker

import akka.http.scaladsl.marshalling.{Marshal, ToEntityMarshaller}
import akka.http.scaladsl.model.{HttpHeader, HttpResponse, ResponseEntity, StatusCode}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import com.typesafe.config.ConfigFactory
import com.ulasakdeniz.hakker.template.Render
import de.heikoseeberger.akkahttpcirce.CirceSupport._
import io.circe.{Encoder, Json}
import io.circe.syntax._

import scala.collection.immutable
import scala.concurrent.ExecutionContext

trait Controller extends Render {

  override lazy val config = ConfigFactory.load()
  val StatusCodes          = akka.http.scaladsl.model.StatusCodes

  def route: Route

  def apply(): Route = {
    get {
      // render frontend files
      pathPrefix("js") {
        renderDir("js")
      }
    } ~ route
  }

  def send(statusCode: StatusCode): Route = complete(statusCode)

  def send[T](statusCode: StatusCode, content: T, headers: immutable.Seq[HttpHeader] = Nil)(
      implicit marshaller: ToEntityMarshaller[T],
      ec: ExecutionContext): Route = {
    val response = Marshal(content)
      .to[ResponseEntity](marshaller, ec)
      .map(entity => {
        HttpResponse(statusCode, headers = headers).withEntity(entity)
      })
    complete(response)
  }

  def sendJson[T](statusCode: StatusCode, content: T)(implicit encoder: Encoder[T],
                                                      ec: ExecutionContext): Route = {
    sendJson(statusCode, content.asJson)
  }

  def sendJson[T](content: T)(implicit encoder: Encoder[T], ec: ExecutionContext): Route = {
    sendJson(StatusCodes.OK, content)
  }

  def sendJson(statusCode: StatusCode, json: Json)(implicit ec: ExecutionContext): Route = {
    send(statusCode, Option(json.noSpaces))
  }
} 
开发者ID:ulasakdeniz,项目名称:hakker,代码行数:58,代码来源:Controller.scala


示例4: RelativeActorPathSpec

//设置package包名称以及导入依赖的类
package akka.actor

import org.scalatest.WordSpec
import org.scalatest.Matchers
import java.net.URLEncoder
import scala.collection.immutable

class RelativeActorPathSpec extends WordSpec with Matchers {

  def elements(path: String): immutable.Seq[String] = RelativeActorPath.unapply(path).getOrElse(Nil)

  "RelativeActorPath" must {
    "match single name" in {
      elements("foo") should be(List("foo"))
    }
    "match path separated names" in {
      elements("foo/bar/baz") should be(List("foo", "bar", "baz"))
    }
    "match url encoded name" in {
      val name = URLEncoder.encode("akka://[email protected]:2552", "UTF-8")
      elements(name) should be(List(name))
    }
    "match path with uid fragment" in {
      elements("foo/bar/baz#1234") should be(List("foo", "bar", "baz#1234"))
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:28,代码来源:RelativeActorPathSpec.scala


示例5: TestSetup

//设置package包名称以及导入依赖的类
package akka.io

import scala.annotation.tailrec
import scala.collection.immutable
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.actor.ActorRef
import akka.io.Inet.SocketOption
import akka.testkit.SocketUtil._
import Tcp._

trait TcpIntegrationSpecSupport { _: AkkaSpec ?

  class TestSetup(shouldBindServer: Boolean = true) {
    val bindHandler = TestProbe()
    val endpoint = temporaryServerAddress()

    if (shouldBindServer) bindServer()

    def bindServer(): Unit = {
      val bindCommander = TestProbe()
      bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
      bindCommander.expectMsg(Bound(endpoint))
    }

    def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
      val connectCommander = TestProbe()
      connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions))
      val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected]
      val clientHandler = TestProbe()
      connectCommander.sender() ! Register(clientHandler.ref)

      val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected]
      val serverHandler = TestProbe()
      bindHandler.sender() ! Register(serverHandler.ref)

      (clientHandler, connectCommander.sender(), serverHandler, bindHandler.sender())
    }

    @tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit =
      if (remaining > 0) {
        val recv = handler.expectMsgType[Received]
        expectReceivedData(handler, remaining - recv.data.size)
      }

    
    def connectOptions: immutable.Traversable[SocketOption] = Nil
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:TcpIntegrationSpecSupport.scala


示例6: filterEvents

//设置package包名称以及导入依赖的类
package akka

import language.implicitConversions

import akka.actor.ActorSystem
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.reflect.ClassTag
import scala.collection.immutable
import java.util.concurrent.TimeUnit.MILLISECONDS

package object testkit {
  def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ? T)(implicit system: ActorSystem): T = {
    def now = System.currentTimeMillis

    system.eventStream.publish(TestEvent.Mute(eventFilters.to[immutable.Seq]))

    try {
      val result = block

      val testKitSettings = TestKitExtension(system)
      val stop = now + testKitSettings.TestEventFilterLeeway.toMillis
      val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitSettings.TestEventFilterLeeway + ") waiting for " + _)
      if (failed.nonEmpty)
        throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))

      result
    } finally {
      system.eventStream.publish(TestEvent.UnMute(eventFilters.to[immutable.Seq]))
    }
  }

  def filterEvents[T](eventFilters: EventFilter*)(block: ? T)(implicit system: ActorSystem): T = filterEvents(eventFilters.toSeq)(block)

  def filterException[T <: Throwable](block: ? Unit)(implicit system: ActorSystem, t: ClassTag[T]): Unit = EventFilter[T]() intercept (block)

  
  implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
    def dilated(implicit system: ActorSystem): FiniteDuration =
      (duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration]
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:43,代码来源:package.scala


示例7: ReflectiveDynamicAccess

//设置package包名称以及导入依赖的类
package akka.actor

import scala.collection.immutable
import java.lang.reflect.InvocationTargetException
import scala.reflect.ClassTag
import scala.util.Try


class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {

  override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
    Try[Class[_ <: T]]({
      val c = Class.forName(fqcn, false, classLoader).asInstanceOf[Class[_ <: T]]
      val t = implicitly[ClassTag[T]].runtimeClass
      if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c)
    })

  override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
    Try {
      val types = args.map(_._1).toArray
      val values = args.map(_._2).toArray
      val constructor = clazz.getDeclaredConstructor(types: _*)
      constructor.setAccessible(true)
      val obj = constructor.newInstance(values: _*)
      val t = implicitly[ClassTag[T]].runtimeClass
      if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
    } recover { case i: InvocationTargetException if i.getTargetException ne null ? throw i.getTargetException }

  override def createInstanceFor[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
    getClassFor(fqcn) flatMap { c ? createInstanceFor(c, args) }

  override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = {
    val classTry =
      if (fqcn.endsWith("$")) getClassFor(fqcn)
      else getClassFor(fqcn + "$") recoverWith { case _ ? getClassFor(fqcn) }
    classTry flatMap { c ?
      Try {
        val module = c.getDeclaredField("MODULE$")
        module.setAccessible(true)
        val t = implicitly[ClassTag[T]].runtimeClass
        module.get(null) match {
          case null                  ? throw new NullPointerException
          case x if !t.isInstance(x) ? throw new ClassCastException(fqcn + " is not a subtype of " + t)
          case x: T                  ? x
        }
      } recover { case i: InvocationTargetException if i.getTargetException ne null ? throw i.getTargetException }
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:DynamicAccess.scala


示例8: TcpIncomingConnection

//设置package包名称以及导入依赖的类
package akka.io

import java.nio.channels.SocketChannel
import scala.collection.immutable
import akka.actor.ActorRef
import akka.io.Inet.SocketOption


private[io] class TcpIncomingConnection(_tcp: TcpExt,
                                        _channel: SocketChannel,
                                        registry: ChannelRegistry,
                                        bindHandler: ActorRef,
                                        options: immutable.Traversable[SocketOption],
                                        readThrottling: Boolean)
  extends TcpConnection(_tcp, _channel, readThrottling) {

  signDeathPact(bindHandler)

  registry.register(channel, initialOps = 0)

  def receive = {
    case registration: ChannelRegistration ? completeConnect(registration, bindHandler, options)
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:25,代码来源:TcpIncomingConnection.scala


示例9: UdpSender

//设置package包名称以及导入依赖的类
package akka.io

import java.nio.channels.DatagramChannel
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.io.Inet.SocketOption
import akka.io.Udp._
import akka.actor._


private[io] class UdpSender(val udp: UdpExt,
                            channelRegistry: ChannelRegistry,
                            commander: ActorRef,
                            options: immutable.Traversable[SocketOption])
  extends Actor with ActorLogging with WithUdpSend with RequiresMessageQueue[UnboundedMessageQueueSemantics] {

  val channel = {
    val datagramChannel = DatagramChannel.open
    datagramChannel.configureBlocking(false)
    val socket = datagramChannel.socket

    options foreach { _.beforeDatagramBind(socket) }

    datagramChannel
  }
  channelRegistry.register(channel, initialOps = 0)

  def receive: Receive = {
    case registration: ChannelRegistration ?
      commander ! SimpleSenderReady
      context.become(sendHandlers(registration))
  }

  override def postStop(): Unit = if (channel.isOpen) {
    log.debug("Closing DatagramChannel after being stopped")
    try channel.close()
    catch {
      case NonFatal(e) ? log.debug("Error closing DatagramChannel: {}", e)
    }
  }
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:43,代码来源:UdpSender.scala


示例10: Collections

//设置package包名称以及导入依赖的类
package akka.util

import scala.collection.immutable
import scala.annotation.tailrec


private[akka] object Collections {

  case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
    override final def iterator = Iterator.empty
    override final def apply(idx: Int): Nothing = throw new java.lang.IndexOutOfBoundsException(idx.toString)
    override final def length: Int = 0
  }

  abstract class PartialImmutableValuesIterable[From, To] extends immutable.Iterable[To] {
    def isDefinedAt(from: From): Boolean
    def apply(from: From): To
    def valuesIterator: Iterator[From]
    final def iterator: Iterator[To] = {
      val superIterator = valuesIterator
      new Iterator[To] {
        private[this] var _next: To = _
        private[this] var _hasNext = false

        @tailrec override final def hasNext: Boolean =
          if (!_hasNext && superIterator.hasNext) { // If we need and are able to look for the next value
            val potentiallyNext = superIterator.next()
            if (isDefinedAt(potentiallyNext)) {
              _next = apply(potentiallyNext)
              _hasNext = true
              true
            } else hasNext //Attempt to find the next
          } else _hasNext // Return if we found one

        override final def next(): To =
          if (hasNext) {
            val ret = _next
            _next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
            _hasNext = false // Mark as consumed (we need to look for the next value)
            ret
          } else throw new java.util.NoSuchElementException("next")
      }
    }

    override lazy val size: Int = iterator.size
    override def foreach[C](f: To ? C) = iterator foreach f
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:50,代码来源:Collections.scala


示例11: preparePersistentBatch

//设置package包名称以及导入依赖的类
package akka.persistence.journal

import akka.persistence.{ PersistentRepr, Resequenceable }
import akka.actor.Actor
import scala.collection.immutable

private[akka] trait WriteJournalBase {
  this: Actor ?

  protected def preparePersistentBatch(rb: immutable.Seq[Resequenceable]): immutable.Seq[PersistentRepr] =
    rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations

  private def persistentPrepareWrite(r: Resequenceable): Boolean = r match {
    case p: PersistentRepr ?
      p.prepareWrite(); true
    case _ ?
      false
  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:21,代码来源:WriteJournalBase.scala


示例12: IterableProducerTest

//设置package包名称以及导入依赖的类
package akka.stream

import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import scala.collection.immutable
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec

class IterableProducerTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
  extends PublisherVerification[Int](env, publisherShutdownTimeout)
  with WithActorSystem with TestNGSuiteLike {

  implicit val system = _system

  def this(system: ActorSystem) {
    this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system)), Timeouts.publisherShutdownTimeoutMillis)
  }

  def this() {
    this(ActorSystem(classOf[IterableProducerTest].getSimpleName, AkkaSpec.testConf))
  }

  val materializer = FlowMaterializer(MaterializerSettings(
    maximumInputBufferSize = 512))(system)

  def createPublisher(elements: Int): Publisher[Int] = {
    val iterable: immutable.Iterable[Int] =
      if (elements == 0)
        new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
      else
        0 until elements
    Flow(iterable).toProducer(materializer).getPublisher
  }

  override def createCompletedStatePublisher(): Publisher[Int] =
    Flow[Int](Nil).toProducer(materializer).getPublisher

  override def createErrorStatePublisher(): Publisher[Int] = null // ignore error-state tests
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:42,代码来源:IterableProducerTest.scala


示例13: FlowGroupedSpec

//设置package包名称以及导入依赖的类
package akka.stream

import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.ScriptedTest
import scala.collection.immutable
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ? random }

class FlowGroupedSpec extends AkkaSpec with ScriptedTest {

  val settings = MaterializerSettings(
    initialInputBufferSize = 2,
    maximumInputBufferSize = 16,
    initialFanOutBufferSize = 1,
    maxFanOutBufferSize = 16)

  "A Grouped" must {

    "group evenly" in {
      def script = Script((1 to 20) map { _ ? val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*)
      (1 to 30) foreach (_ ? runScript(script, settings)(_.grouped(3)))
    }

    "group with rest" in {
      def script = Script(((1 to 20).map { _ ? val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }
        :+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*)
      (1 to 30) foreach (_ ? runScript(script, settings)(_.grouped(3)))
    }

  }

} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:32,代码来源:FlowGroupedSpec.scala


示例14: AccountSerializerRegistry

//设置package包名称以及导入依赖的类
package org.ioreskovic.greatmaterialcontinuum.impl.ent.acc

import com.lightbend.lagom.scaladsl.playjson.{JsonSerializer, JsonSerializerRegistry}
import org.ioreskovic.greatmaterialcontinuum.impl.cmd.acc._
import org.ioreskovic.greatmaterialcontinuum.impl.evt.acc._
import org.ioreskovic.greatmaterialcontinuum.impl.stt.acc.AccountState

import scala.collection.immutable

object AccountSerializerRegistry extends JsonSerializerRegistry {
  override def serializers: immutable.Seq[JsonSerializer[_]] = immutable.Seq(
    JsonSerializer[AccountState],

    JsonSerializer[CreateAccount],
    JsonSerializer[DeleteAccount],
    JsonSerializer[ActivateAccount],
    JsonSerializer[DeactivateAccount],
    JsonSerializer[RetrieveAccount],

    JsonSerializer[AccountCreated],
    JsonSerializer[AccountDeleted],
    JsonSerializer[AccountActivated],
    JsonSerializer[AccountDeactivated],
    JsonSerializer[AccountRetrieved]
  )
} 
开发者ID:ioreskovic,项目名称:great-material-continuum,代码行数:27,代码来源:AccountSerializerRegistry.scala


示例15: HTableSettings

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.hbase

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put

import scala.collection.immutable

final case class HTableSettings[T](conf: Configuration,
                                   tableName: TableName,
                                   columnFamilies: immutable.Seq[String],
                                   converter: T => Put)

object HTableSettings {
  def create[T](conf: Configuration,
                tableName: TableName,
                columnFamilies: java.util.List[String],
                converter: java.util.function.Function[T, Put]): HTableSettings[T] = {
    import scala.compat.java8.FunctionConverters._
    import scala.collection.JavaConverters._
    HTableSettings(conf, tableName, immutable.Seq(columnFamilies.asScala: _*), asScalaFromFunction(converter))
  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:24,代码来源:HBaseSettings.scala


示例16: ServerSideEncryption

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.s3.impl

import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.alpakka.s3.acl.CannedAcl

import scala.collection.immutable


sealed abstract class ServerSideEncryption(algorithm: String,
                                           kmsKeyId: Option[String] = None,
                                           context: Option[String] = None) {
  def headers: immutable.Seq[HttpHeader] = algorithm match {
    case "AES256" => RawHeader("x-amz-server-side-encryption", "AES256") :: Nil
    case "aws:kms" if kmsKeyId.isDefined && context.isEmpty =>
      RawHeader("x-amz-server-side-encryption", "aws:kms") ::
      RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) ::
      Nil
    case "aws:kms" if kmsKeyId.isDefined && context.isDefined =>
      RawHeader("x-amz-server-side-encryption", "aws:kms") ::
      RawHeader("x-amz-server-side-encryption-aws-kms-key-id", kmsKeyId.get) ::
      RawHeader("x-amz-server-side-encryption-context", context.get) ::
      Nil
    case _ => throw new IllegalArgumentException("Unsupported encryption algorithm.")
  }
}

object ServerSideEncryption {
  case object AES256 extends ServerSideEncryption("AES256")
  case class KMS(keyId: String, context: Option[String] = None)
      extends ServerSideEncryption("aws:kms", Some(keyId), context)
} 
开发者ID:akka,项目名称:alpakka,代码行数:33,代码来源:S3Headers.scala


示例17: format

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv.scaladsl

import java.nio.charset.{Charset, StandardCharsets}

import akka.NotUsed
import akka.stream.alpakka.csv.{javadsl, CsvFormatter}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

import scala.collection.immutable


  def format[T <: immutable.Iterable[String]](
      delimiter: Char = Comma,
      quoteChar: Char = DoubleQuote,
      escapeChar: Char = Backslash,
      endOfLine: String = "\r\n",
      quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
      charset: Charset = StandardCharsets.UTF_8,
      byteOrderMark: Option[ByteString] = None
  ): Flow[T, ByteString, NotUsed] = {
    val formatter =
      new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine, quotingStyle, charset)
    byteOrderMark.fold {
      Flow[T].map(formatter.toCsv).named("CsvFormatting")
    } { bom =>
      Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
    }

  }
} 
开发者ID:akka,项目名称:alpakka,代码行数:32,代码来源:CsvFormatting.scala


示例18: CsvToMapStage

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.csv

import java.nio.charset.Charset

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

import scala.collection.immutable


private[csv] class CsvToMapStage(columnNames: Option[immutable.Seq[String]], charset: Charset)
    extends GraphStage[FlowShape[immutable.Seq[ByteString], Map[String, ByteString]]] {

  override protected def initialAttributes: Attributes = Attributes.name("CsvToMap")

  private val in = Inlet[immutable.Seq[ByteString]]("CsvToMap.in")
  private val out = Outlet[Map[String, ByteString]]("CsvToMap.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      private var headers = columnNames

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        val elem = grab(in)
        if (headers.isDefined) {
          val map = headers.get.zip(elem).toMap
          push(out, map)
        } else {
          headers = Some(elem.map(_.decodeString(charset)))
          pull(in)
        }
      }

      override def onPull(): Unit = pull(in)
    }
} 
开发者ID:akka,项目名称:alpakka,代码行数:41,代码来源:CsvToMapStage.scala


示例19: Collections

//设置package包名称以及导入依赖的类
package akka.util

import scala.collection.immutable
import scala.annotation.tailrec


private[akka] object Collections {

  case object EmptyImmutableSeq extends immutable.Seq[Nothing] {
    override final def iterator = Iterator.empty
    override final def apply(idx: Int): Nothing = throw new java.lang.IndexOutOfBoundsException(idx.toString)
    override final def length: Int = 0
  }

  abstract class PartialImmutableValuesIterable[From, To] extends immutable.Iterable[To] {
    def isDefinedAt(from: From): Boolean
    def apply(from: From): To
    def valuesIterator: Iterator[From]
    final def iterator: Iterator[To] = {
      val superIterator = valuesIterator
      new Iterator[To] {
        private[this] var _next: To = _
        private[this] var _hasNext = false

        @tailrec override final def hasNext: Boolean =
          if (!_hasNext && superIterator.hasNext) { // If we need and are able to look for the next value
          val potentiallyNext = superIterator.next()
            if (isDefinedAt(potentiallyNext)) {
              _next = apply(potentiallyNext)
              _hasNext = true
              true
            } else hasNext //Attempt to find the next
          } else _hasNext // Return if we found one

        override final def next(): To =
          if (hasNext) {
            val ret = _next
            _next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
            _hasNext = false // Mark as consumed (we need to look for the next value)
            ret
          } else throw new java.util.NoSuchElementException("next")
      }
    }

    override lazy val size: Int = iterator.size
    override def foreach[C](f: To ? C) = iterator foreach f
  }

} 
开发者ID:Starofall,项目名称:Chakka,代码行数:50,代码来源:Collections.scala


示例20: AccumulateWhileUnchanged

//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

import scala.collection.immutable


final class AccumulateWhileUnchanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

  val in = Inlet[E]("AccumulateWhileUnchanged.in")
  val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

  override def shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    private var currentState: Option[P] = None
    private val buffer = Vector.newBuilder[E]

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)

        if (currentState.isEmpty || currentState.contains(nextState)) {
          buffer += nextElement
          pull(in)
        } else {
          val result = buffer.result()
          buffer.clear()
          buffer += nextElement
          push(out, result)
        }

        currentState = Some(nextState)
      }

      override def onPull(): Unit = {
        pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        val result = buffer.result()
        if (result.nonEmpty) {
          emit(out, result)
        }
        completeStage()
      }
    })

    override def postStop(): Unit = {
      buffer.clear()
    }
  }
} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:59,代码来源:AccumulateWhileUnchanged.scala



注:本文中的scala.collection.immutable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala WordSpecLike类代码示例发布时间:2022-05-23
下一篇:
Scala Logger类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap