• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala StreamConverters类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.stream.scaladsl.StreamConverters的典型用法代码示例。如果您正苦于以下问题:Scala StreamConverters类的具体用法?Scala StreamConverters怎么用?Scala StreamConverters使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了StreamConverters类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Directory

//设置package包名称以及导入依赖的类
package akka.stream.alpakka.file.scaladsl

import java.nio.file.{FileVisitOption, Files, Path}

import akka.NotUsed
import akka.stream.scaladsl.{Source, StreamConverters}

import scala.collection.immutable

object Directory {

  
  def walk(directory: Path,
           maxDepth: Option[Int] = None,
           fileVisitOptions: immutable.Seq[FileVisitOption] = Nil): Source[Path, NotUsed] = {
    require(Files.isDirectory(directory), s"Path must be a directory, $directory isn't")
    val factory = maxDepth match {
      case None =>
        () =>
          Files.walk(directory, fileVisitOptions: _*)
      case Some(maxDepth) =>
        () =>
          Files.walk(directory, maxDepth, fileVisitOptions: _*)
    }

    StreamConverters.fromJavaStream(factory)
  }

} 
开发者ID:akka,项目名称:alpakka,代码行数:30,代码来源:Directory.scala


示例2: StreamUtilsSpec

//设置package包名称以及导入依赖的类
package com.bluelabs.akkaaws

import java.security.{DigestInputStream, MessageDigest}

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{FlatSpecLike, Matchers}

import scala.concurrent.Future

class StreamUtilsSpec(_system: ActorSystem) extends TestKit(_system) with FlatSpecLike with Matchers with ScalaFutures {
  def this() = this(ActorSystem("StreamUtilsSpec"))

  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withDebugLogging(true))

  implicit val defaultPatience =
    PatienceConfig(timeout =  Span(5, Seconds), interval = Span(30, Millis))


  "digest" should "calculate the digest of a short string" in {
    val bytes: Array[Byte] = "abcdefghijklmnopqrstuvwxyz".getBytes()
    val flow: Future[ByteString] = Source.single(ByteString(bytes)).runWith(StreamUtils.digest())

    val testDigest = MessageDigest.getInstance("SHA-256").digest(bytes)
    whenReady(flow) { result =>
      result should contain theSameElementsInOrderAs testDigest
    }
  }

  it should "calculate the digest of a file" in {
    val input = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("/testdata.txt"))
    val flow: Future[ByteString] = input.runWith(StreamUtils.digest())

    val testDigest = MessageDigest.getInstance("SHA-256")
    val dis: DigestInputStream = new DigestInputStream(getClass.getResourceAsStream("/testdata.txt"), testDigest)

    val buffer = new Array[Byte](1024)

    var bytesRead: Int = dis.read(buffer)
    while (bytesRead > -1) {
      bytesRead = dis.read(buffer)
    }

    whenReady(flow) { result =>
      result should contain theSameElementsInOrderAs dis.getMessageDigest.digest()
    }

  }

} 
开发者ID:bluelabsio,项目名称:s3-stream,代码行数:56,代码来源:StreamUtilsSpec.scala


示例3: PrometheusService

//设置package包名称以及导入依赖的类
package com.example.akka.http

import java.io.{ OutputStreamWriter, PipedInputStream, PipedOutputStream }

import scala.concurrent.{ ExecutionContext, Future }

import akka.http.scaladsl.model.{ HttpCharsets, HttpEntity, MediaType }
import akka.http.scaladsl.server.{ Directives, Route }
import akka.stream.scaladsl.StreamConverters
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.exporter.common.TextFormat

object PrometheusService extends Directives {

  lazy val prometheusTextType =
    MediaType.customWithFixedCharset("text", "plain", HttpCharsets.`UTF-8`, params = Map("version" -> "0.0.4"))

  def route(implicit executionContext: ExecutionContext): Route = {
    path("metrics") {
      complete {
        val in = new PipedInputStream
        val out = new OutputStreamWriter(new PipedOutputStream(in), HttpCharsets.`UTF-8`.value)
        val byteSource = StreamConverters.fromInputStream(() => in)
        Future {
          try {
            TextFormat.write004(out, CollectorRegistry.defaultRegistry.metricFamilySamples())
            out.flush()
          } finally {
            out.close()
          }
        }
        HttpEntity(prometheusTextType, byteSource)
      }
    }
  }
} 
开发者ID:pjfanning,项目名称:prometheus-akka-sample,代码行数:37,代码来源:PrometheusService.scala


示例4: Tools

//设置package包名称以及导入依赖的类
package org.play.test.helpers

import java.io.InputStream
import java.security.cert.X509Certificate

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.StreamConverters
import play.api.mvc._

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._

object Tools {

  class ReqHeader(
                           override val id: Long = 1L,
                           override val tags: Map[String, String] = Map.empty,
                           override val uri: String = "",
                           override val path: String = "",
                           override val method: String = "GET",
                           override val version: String = "1",
                           override val queryString: Map[String, Seq[String]] = Map.empty,
                           override val headers: Headers = Headers.apply(),
                           override val remoteAddress: String = "",
                           override val secure: Boolean = false,
                           override val clientCertificateChain: Option[Seq[X509Certificate]] = None) extends RequestHeader {
  }

  def convertStreamToString(is: java.io.InputStream) : String = {
    val s = new java.util.Scanner(is).useDelimiter("\\A")
    if (s.hasNext()) s.next() else ""
  }

  def getBody(result: Future[Result], duration: FiniteDuration = 5.seconds): String = {
    implicit val system = ActorSystem("flowtest")
    implicit val mater = ActorMaterializer()

    val stream = Await.result(result, duration).body.dataStream.runWith(StreamConverters.asInputStream(duration))
    convertStreamToString(stream)
  }

  implicit class BodyExtractor(action: Action[AnyContent]) {
    def body: String = getBody(action.apply(null))
  }
} 
开发者ID:haribageski,项目名称:mockserver,代码行数:48,代码来源:Tools.scala


示例5: HomeController

//设置package包名称以及导入依赖的类
package controllers

import java.io.OutputStream
import javax.inject._

import akka.stream.scaladsl.{Source, StreamConverters}
import play.api._
import play.api.libs.iteratee.Enumerator
import play.api.libs.streams.Streams
import play.api.mvc._

import scala.concurrent.ExecutionContext.Implicits.global

@Singleton
class HomeController @Inject() extends Controller {

  def index = Action {
    val source = brokenSource
    Ok.chunked(source).as("text/plain")
  }

  private def brokenSource = StreamConverters.asOutputStream().mapMaterializedValue(write)

  private def workingSource = {
    val enumerator = Enumerator.outputStream(write)
    Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))
  }

  def write(out: OutputStream) {
    try {
      val bytes = ("*" * 1024).getBytes
      out.write(bytes)
    } finally {
      out.flush()
      out.close()
    }
  }

} 
开发者ID:halfninja,项目名称:play-outputstream-bug,代码行数:40,代码来源:HomeController.scala


示例6: Demo5

//设置package包名称以及导入依赖的类
package lew.bing.akka.stream

import java.io.ByteArrayInputStream
import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, StreamConverters}


object Demo5 {

  def main(args: Array[String]): Unit = {
    //????
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()
    import scala.concurrent.ExecutionContext.Implicits.global
    //??is
    val source = StreamConverters.fromInputStream(() => new ByteArrayInputStream("I am better man!".getBytes))
    //??os
    val sink = StreamConverters.fromOutputStream(() => System.out)
    val result = source.runWith(sink)
    val result2 =FileIO.fromPath(Paths.get("factorial2.txt")).to(FileIO.toPath(Paths.get("factorial3.txt"))).run()
    result.flatMap(f => result2).onComplete(_ => system.terminate())
  }

} 
开发者ID:liuguobing634,项目名称:akka,代码行数:28,代码来源:Demo5.scala


示例7: BucketFiles

//设置package包名称以及导入依赖的类
package com.malliina.pics

import java.nio.file.Path

import akka.stream.scaladsl.StreamConverters
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.malliina.storage.StorageLong

import scala.collection.JavaConversions._

class BucketFiles(val aws: AmazonS3, val bucket: BucketName) extends PicFiles {
  val bucketName: String = bucket.name

  override def load(from: Int, until: Int): Seq[Key] = {
    val size = until - from
    if (size <= 0) Nil
    else loadAcc(until, aws.listObjects(bucketName), Nil).drop(from)
  }

  private def loadAcc(desiredSize: Int, current: ObjectListing, acc: Seq[Key]): Seq[Key] = {
    val newAcc = acc ++ current.getObjectSummaries.map(s => Key(s.getKey))
    if (!current.isTruncated || newAcc.size >= desiredSize) newAcc take desiredSize
    else loadAcc(desiredSize, aws.listNextBatchOfObjects(current), newAcc)
  }

  override def contains(key: Key): Boolean =
    aws.doesObjectExist(bucketName, key.key)

  override def get(key: Key): DataStream = {
    val obj = aws.getObject(bucketName, key.key)
    val meta = obj.getObjectMetadata
    DataStream(
      StreamConverters.fromInputStream(obj.getObjectContent),
      Option(meta.getContentLength).map(_.bytes),
      Option(meta.getContentType).map(ContentType.apply)
    )
  }

  override def put(key: Key, file: Path): Unit =
    aws.putObject(bucketName, key.key, file.toFile)

  override def remove(key: Key): Unit =
    aws.deleteObject(bucketName, key.key)
}

object BucketFiles {
  def forBucket(bucket: BucketName) = forS3(Regions.EU_WEST_1, bucket)

  def forS3(region: Regions, bucket: BucketName): BucketFiles = {
    val bucketName = bucket.name
    val aws = AmazonS3ClientBuilder.standard().withRegion(region).build()
    if (!aws.doesBucketExist(bucketName))
      aws.createBucket(bucketName)
    new BucketFiles(aws, bucket)
  }
} 
开发者ID:malliina,项目名称:pics,代码行数:59,代码来源:BucketFiles.scala


示例8: OsmStream

//设置package包名称以及导入依赖的类
package com.github.lzenczuk.akka.course.streams

import java.io.FileInputStream
import java.nio.charset.Charset

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Framing, RunnableGraph, Sink, Source, StreamConverters}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}



object OsmStream extends App {
  implicit private val system: ActorSystem = ActorSystem("osm-reading-system")
  implicit private val materializer: ActorMaterializer = ActorMaterializer()

  private val uncompressedOsmFileSource: Source[ByteString, Future[IOResult]] =
    StreamConverters.fromInputStream(() => new BZip2MultiStreamCompressorInputStream(new FileInputStream("/home/dev/Documents/osm/dublin/dublin_ireland.osm.bz2")))

  private val bytesToStringLines: Flow[ByteString, String, NotUsed] = Framing.delimiter(ByteString("\n"), 1024).map(bs => {
    bs.decodeString(Charset.defaultCharset())
  })

  val groupNodes: Flow[String, String, NotUsed] = Flow[String]
    .scan(Idle().asInstanceOf[OsmSegment])((segment:OsmSegment, line:String) => segment.parse(line))
    .filter(s => s.isInstanceOf[FullSegment]).map(s => s.content)

  private val osmStream: RunnableGraph[Future[IOResult]] =
    uncompressedOsmFileSource
      .via(bytesToStringLines)
      .via(groupNodes)
      .to(Sink.foreach(line => {println(s"Line: $line")}))

  osmStream.run().andThen{
    case Success(ioResult) =>
      println(s"Success read: ${ioResult.count/1048576} MB")
    case Failure(throwable) => println(s"Error: ${throwable.getMessage}")
  }.andThen{
    case _ => system.terminate()
  }
} 
开发者ID:lzenczuk,项目名称:akka-app-one,代码行数:47,代码来源:OsmStream.scala



注:本文中的akka.stream.scaladsl.StreamConverters类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Invalid类代码示例发布时间:2022-05-23
下一篇:
Scala JsoupBrowser类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap