本文整理汇总了Scala中akka.stream.scaladsl.StreamConverters类的典型用法代码示例。如果您正苦于以下问题:Scala StreamConverters类的具体用法?Scala StreamConverters怎么用?Scala StreamConverters使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了StreamConverters类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Directory
//设置package包名称以及导入依赖的类
package akka.stream.alpakka.file.scaladsl
import java.nio.file.{FileVisitOption, Files, Path}
import akka.NotUsed
import akka.stream.scaladsl.{Source, StreamConverters}
import scala.collection.immutable
object Directory {
def walk(directory: Path,
maxDepth: Option[Int] = None,
fileVisitOptions: immutable.Seq[FileVisitOption] = Nil): Source[Path, NotUsed] = {
require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
val factory = maxDepth match {
case None =>
() =>
Files.walk(directory, fileVisitOptions: _*)
case Some(maxDepth) =>
() =>
Files.walk(directory, maxDepth, fileVisitOptions: _*)
}
StreamConverters.fromJavaStream(factory)
}
}
开发者ID:akka,项目名称:alpakka,代码行数:30,代码来源:Directory.scala
示例2: StreamUtilsSpec
//设置package包名称以及导入依赖的类
package com.bluelabs.akkaaws
import java.security.{DigestInputStream, MessageDigest}
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{FlatSpecLike, Matchers}
import scala.concurrent.Future
class StreamUtilsSpec(_system: ActorSystem) extends TestKit(_system) with FlatSpecLike with Matchers with ScalaFutures {
def this() = this(ActorSystem("StreamUtilsSpec"))
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))
implicit val defaultPatience =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(30, Millis))
"digest" should "calculate the digest of a short string" in {
val bytes: Array[Byte] = "abcdefghijklmnopqrstuvwxyz".getBytes()
val flow: Future[ByteString] = Source.single(ByteString(bytes)).runWith(StreamUtils.digest())
val testDigest = MessageDigest.getInstance("SHA-256").digest(bytes)
whenReady(flow) { result =>
result should contain theSameElementsInOrderAs testDigest
}
}
it should "calculate the digest of a file" in {
val input = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("/testdata.txt"))
val flow: Future[ByteString] = input.runWith(StreamUtils.digest())
val testDigest = MessageDigest.getInstance("SHA-256")
val dis: DigestInputStream = new DigestInputStream(getClass.getResourceAsStream("/testdata.txt"), testDigest)
val buffer = new Array[Byte](1024)
var bytesRead: Int = dis.read(buffer)
while (bytesRead > -1) {
bytesRead = dis.read(buffer)
}
whenReady(flow) { result =>
result should contain theSameElementsInOrderAs dis.getMessageDigest.digest()
}
}
}
开发者ID:bluelabsio,项目名称:s3-stream,代码行数:56,代码来源:StreamUtilsSpec.scala
示例3: PrometheusService
//设置package包名称以及导入依赖的类
package com.example.akka.http
import java.io.{ OutputStreamWriter, PipedInputStream, PipedOutputStream }
import scala.concurrent.{ ExecutionContext, Future }
import akka.http.scaladsl.model.{ HttpCharsets, HttpEntity, MediaType }
import akka.http.scaladsl.server.{ Directives, Route }
import akka.stream.scaladsl.StreamConverters
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.common.TextFormat
object PrometheusService extends Directives {
lazy val prometheusTextType =
MediaType.customWithFixedCharset("text", "plain", HttpCharsets.`UTF-8`, params = Map("version" -> "0.0.4"))
def route(implicit executionContext: ExecutionContext): Route = {
path("metrics") {
complete {
val in = new PipedInputStream
val out = new OutputStreamWriter(new PipedOutputStream(in), HttpCharsets.`UTF-8`.value)
val byteSource = StreamConverters.fromInputStream(() => in)
Future {
try {
TextFormat.write004(out, CollectorRegistry.defaultRegistry.metricFamilySamples())
out.flush()
} finally {
out.close()
}
}
HttpEntity(prometheusTextType, byteSource)
}
}
}
}
开发者ID:pjfanning,项目名称:prometheus-akka-sample,代码行数:37,代码来源:PrometheusService.scala
示例4: Tools
//设置package包名称以及导入依赖的类
package org.play.test.helpers
import java.io.InputStream
import java.security.cert.X509Certificate
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.StreamConverters
import play.api.mvc._
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
object Tools {
class ReqHeader(
override val id: Long = 1L,
override val tags: Map[String, String] = Map.empty,
override val uri: String = "",
override val path: String = "",
override val method: String = "GET",
override val version: String = "1",
override val queryString: Map[String, Seq[String]] = Map.empty,
override val headers: Headers = Headers.apply(),
override val remoteAddress: String = "",
override val secure: Boolean = false,
override val clientCertificateChain: Option[Seq[X509Certificate]] = None) extends RequestHeader {
}
def convertStreamToString(is: java.io.InputStream) : String = {
val s = new java.util.Scanner(is).useDelimiter("\\A")
if (s.hasNext()) s.next() else ""
}
def getBody(result: Future[Result], duration: FiniteDuration = 5.seconds): String = {
implicit val system = ActorSystem("flowtest")
implicit val mater = ActorMaterializer()
val stream = Await.result(result, duration).body.dataStream.runWith(StreamConverters.asInputStream(duration))
convertStreamToString(stream)
}
implicit class BodyExtractor(action: Action[AnyContent]) {
def body: String = getBody(action.apply(null))
}
}
开发者ID:haribageski,项目名称:mockserver,代码行数:48,代码来源:Tools.scala
示例5: HomeController
//设置package包名称以及导入依赖的类
package controllers
import java.io.OutputStream
import javax.inject._
import akka.stream.scaladsl.{Source, StreamConverters}
import play.api._
import play.api.libs.iteratee.Enumerator
import play.api.libs.streams.Streams
import play.api.mvc._
import scala.concurrent.ExecutionContext.Implicits.global
@Singleton
class HomeController @Inject() extends Controller {
def index = Action {
val source = brokenSource
Ok.chunked(source).as("text/plain")
}
private def brokenSource = StreamConverters.asOutputStream().mapMaterializedValue(write)
private def workingSource = {
val enumerator = Enumerator.outputStream(write)
Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))
}
def write(out: OutputStream) {
try {
val bytes = ("*" * 1024).getBytes
out.write(bytes)
} finally {
out.flush()
out.close()
}
}
}
开发者ID:halfninja,项目名称:play-outputstream-bug,代码行数:40,代码来源:HomeController.scala
示例6: Demo5
//设置package包名称以及导入依赖的类
package lew.bing.akka.stream
import java.io.ByteArrayInputStream
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, StreamConverters}
object Demo5 {
def main(args: Array[String]): Unit = {
//????
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
import scala.concurrent.ExecutionContext.Implicits.global
//??is
val source = StreamConverters.fromInputStream(() => new ByteArrayInputStream("I am better man!".getBytes))
//??os
val sink = StreamConverters.fromOutputStream(() => System.out)
val result = source.runWith(sink)
val result2 =FileIO.fromPath(Paths.get("factorial2.txt")).to(FileIO.toPath(Paths.get("factorial3.txt"))).run()
result.flatMap(f => result2).onComplete(_ => system.terminate())
}
}
开发者ID:liuguobing634,项目名称:akka,代码行数:28,代码来源:Demo5.scala
示例7: BucketFiles
//设置package包名称以及导入依赖的类
package com.malliina.pics
import java.nio.file.Path
import akka.stream.scaladsl.StreamConverters
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.malliina.storage.StorageLong
import scala.collection.JavaConversions._
class BucketFiles(val aws: AmazonS3, val bucket: BucketName) extends PicFiles {
val bucketName: String = bucket.name
override def load(from: Int, until: Int): Seq[Key] = {
val size = until - from
if (size <= 0) Nil
else loadAcc(until, aws.listObjects(bucketName), Nil).drop(from)
}
private def loadAcc(desiredSize: Int, current: ObjectListing, acc: Seq[Key]): Seq[Key] = {
val newAcc = acc ++ current.getObjectSummaries.map(s => Key(s.getKey))
if (!current.isTruncated || newAcc.size >= desiredSize) newAcc take desiredSize
else loadAcc(desiredSize, aws.listNextBatchOfObjects(current), newAcc)
}
override def contains(key: Key): Boolean =
aws.doesObjectExist(bucketName, key.key)
override def get(key: Key): DataStream = {
val obj = aws.getObject(bucketName, key.key)
val meta = obj.getObjectMetadata
DataStream(
StreamConverters.fromInputStream(obj.getObjectContent),
Option(meta.getContentLength).map(_.bytes),
Option(meta.getContentType).map(ContentType.apply)
)
}
override def put(key: Key, file: Path): Unit =
aws.putObject(bucketName, key.key, file.toFile)
override def remove(key: Key): Unit =
aws.deleteObject(bucketName, key.key)
}
object BucketFiles {
def forBucket(bucket: BucketName) = forS3(Regions.EU_WEST_1, bucket)
def forS3(region: Regions, bucket: BucketName): BucketFiles = {
val bucketName = bucket.name
val aws = AmazonS3ClientBuilder.standard().withRegion(region).build()
if (!aws.doesBucketExist(bucketName))
aws.createBucket(bucketName)
new BucketFiles(aws, bucket)
}
}
开发者ID:malliina,项目名称:pics,代码行数:59,代码来源:BucketFiles.scala
示例8: OsmStream
//设置package包名称以及导入依赖的类
package com.github.lzenczuk.akka.course.streams
import java.io.FileInputStream
import java.nio.charset.Charset
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Framing, RunnableGraph, Sink, Source, StreamConverters}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
object OsmStream extends App {
implicit private val system: ActorSystem = ActorSystem("osm-reading-system")
implicit private val materializer: ActorMaterializer = ActorMaterializer()
private val uncompressedOsmFileSource: Source[ByteString, Future[IOResult]] =
StreamConverters.fromInputStream(() => new BZip2MultiStreamCompressorInputStream(new FileInputStream("/home/dev/Documents/osm/dublin/dublin_ireland.osm.bz2")))
private val bytesToStringLines: Flow[ByteString, String, NotUsed] = Framing.delimiter(ByteString("\n"), 1024).map(bs => {
bs.decodeString(Charset.defaultCharset())
})
val groupNodes: Flow[String, String, NotUsed] = Flow[String]
.scan(Idle().asInstanceOf[OsmSegment])((segment:OsmSegment, line:String) => segment.parse(line))
.filter(s => s.isInstanceOf[FullSegment]).map(s => s.content)
private val osmStream: RunnableGraph[Future[IOResult]] =
uncompressedOsmFileSource
.via(bytesToStringLines)
.via(groupNodes)
.to(Sink.foreach(line => {println(s"Line: $line")}))
osmStream.run().andThen{
case Success(ioResult) =>
println(s"Success read: ${ioResult.count/1048576} MB")
case Failure(throwable) => println(s"Error: ${throwable.getMessage}")
}.andThen{
case _ => system.terminate()
}
}
开发者ID:lzenczuk,项目名称:akka-app-one,代码行数:47,代码来源:OsmStream.scala
注:本文中的akka.stream.scaladsl.StreamConverters类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论