本文整理汇总了Scala中akka.stream.scaladsl.Framing类的典型用法代码示例。如果您正苦于以下问题:Scala Framing类的具体用法?Scala Framing怎么用?Scala Framing使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Framing类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Repl_
//设置package包名称以及导入依赖的类
package akka_in_action.streams.StreamingIO
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Source, Flow, Tcp}
import akka.util.ByteString
object Repl_ extends App {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = Tcp().outgoingConnection("127.0.0.1", 8888)
val replParser =
Flow[String].takeWhile(_ != "q")
.concat(Source.single("BYE"))
.map(elem => ByteString(s"$elem\n"))
val repl = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true
))
.map(_.utf8String)
.map(text => println("Server: "+text))
.map(_ => readLine("> "))
.via(replParser)
connection.join(repl).run()
}
开发者ID:rockdragon,项目名称:fourthgala,代码行数:34,代码来源:Repl_.scala
示例2: BaseStationSource
//设置package包名称以及导入依赖的类
package com.darienmt.airplaneadventures.basestation.collector.streams
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Framing, Source, Tcp }
import akka.util.ByteString
import com.darienmt.airplaneadventures.basestation.collector.parsing.MessageParser
import com.darienmt.airplaneadventures.basestation.data.BaseStation.Message
import scala.collection.immutable.IndexedSeq
object BaseStationSource {
def apply(address: String, port: Int)(implicit actorSystem: ActorSystem): Source[Message, NotUsed] =
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(address, port)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(MessageParser(_))
}
开发者ID:darienmt,项目名称:airplane-adventures,代码行数:24,代码来源:BaseStationSource.scala
示例3: SerializationDemo
//设置package包名称以及导入依赖的类
package com.martinseeler.dtf
import java.io.File
import java.nio.file.StandardOpenOption._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing}
import akka.util.ByteString
import com.martinseeler.dtf.stages.{DeltaToByteStringStage, TickToDeltaStage}
object SerializationDemo extends App {
implicit val system = ActorSystem("serialization")
implicit val mat = ActorMaterializer()
val csvSource = FileIO
.fromPath(new File("ticks_1MM.csv").toPath)
.via(Framing.delimiter(ByteString("\n"), 1024))
.map(_.utf8String.split(','))
.map(xs => Tick(xs(0).toLong, xs(1).toDouble, xs(2).toDouble))
val dtffSink = FileIO
.toPath(new File("ticks_1MM.dtff").toPath, options = Set(CREATE, WRITE, TRUNCATE_EXISTING))
csvSource
.via(new TickToDeltaStage())
.via(new DeltaToByteStringStage())
.runWith(dtffSink)
.onComplete(_ => system.terminate())(scala.concurrent.ExecutionContext.global)
}
开发者ID:MartinSeeler,项目名称:dense-tick-file-format,代码行数:32,代码来源:SerializationDemo.scala
示例4: FileUploadStream
//设置package包名称以及导入依赖的类
package com.shashank.akkahttp.basic.routing
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, Multipart, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import scala.concurrent.Future
object FileUploadStream extends BaseSpec{
def main(args: Array[String]) {
val route =
extractRequestContext { ctx =>
implicit val materializer = ctx.materializer
implicit val ec = ctx.executionContext
fileUpload("csv") {
case (metadata, byteSource) =>
val sumF: Future[Int] =
// sum the numbers as they arrive so that we can
byteSource.via(Framing.delimiter(ByteString("\n"), 1024))
.mapConcat(_.utf8String.split(",").toVector)
.map(_.toInt)
.runFold(0) { (acc, n) => acc + n }
onSuccess(sumF) { sum => complete(s"Sum: $sum") }
}
}
//Test file upload stream
val multipartForm =
Multipart.FormData(Multipart.FormData.BodyPart.Strict(
"csv",
HttpEntity(ContentTypes.`text/plain(UTF-8)`, "2,3,5\n7,11,13,17,23\n29,31,37\n"),
Map("filename" -> "primes.csv")))
Post("/", multipartForm) ~> route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual "Sum: 178"
}
system.terminate()
}
}
//File upload direct
//curl --form "[email protected]" http://<host>:<port>
开发者ID:shashankgowdal,项目名称:introduction-to-akkahttp,代码行数:51,代码来源:FileUploadStream.scala
示例5: MovieNameToIMDBPipeline
//设置package包名称以及导入依赖的类
package com.stacktrace.yo.scrapeline.imdb.pipelines
import java.net.URLEncoder
import java.nio.file.Paths
import akka.stream.scaladsl.{FileIO, Flow, Framing, Source}
import akka.stream.{ActorMaterializer, IOResult}
import akka.util.ByteString
import akka.{Done, NotUsed}
import com.stacktrace.yo.scrapeline.imdb.Domain.MovieNameAndDetailUrl
import scala.concurrent.{ExecutionContext, Future}
class MovieNameToIMDBPipeline(implicit val m: ActorMaterializer, implicit val ec: ExecutionContext) {
private def getPipelineSource: Source[String, Future[IOResult]] = {
FileIO.fromPath(Paths.get("movie.txt"))
.via(Framing.delimiter(ByteString("\n"), 256)
.map(_.utf8String)
)
}
private def getDetailUrlFlow: Flow[String, MovieNameAndDetailUrl, NotUsed] = {
Flow[String]
.mapAsyncUnordered(100)(mapPipeToImdbSearch)
}
private def mapPipeToImdbSearch(in: String): Future[MovieNameAndDetailUrl] = Future {
val encodedString: String = URLEncoder.encode(in, "UTF-8")
MovieNameAndDetailUrl(in, "http://www.imdb.com/find?ref_=nv_sr_fn&q=" + encodedString + "&s=tt")
}
def getOutput: Future[Done] = {
getPipelineSource
.via(getDetailUrlFlow)
.via(new IMDBSearchSubPipe().getSubFlow)
.runForeach(x => println(x.name + ":" + x.url))
}
}
开发者ID:StackTraceYo,项目名称:scrapeline,代码行数:43,代码来源:MovieNameToIMDBPipeline.scala
示例6: materializer
//设置package包名称以及导入依赖的类
package reactivehub.akka.stream.apns.manager
import akka.actor.ActorRef
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.http.scaladsl.model.Multipart
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Sink}
import akka.util.{ByteString, Timeout}
import reactivehub.akka.stream.apns.manager.DeviceService._
import scala.concurrent.ExecutionContext
import spray.json.DefaultJsonProtocol
trait RestApi extends SprayJsonSupport
with DefaultJsonProtocol
with DeviceJsonFormats {
implicit def materializer: ActorMaterializer
implicit def dispatcher: ExecutionContext
implicit def timeout: Timeout
def route(service: ActorRef): Route =
path("jobs") {
post {
service ! PingDevices
complete(Accepted)
}
} ~ path("devices") {
post {
entity(as[NewDevice]) { device =>
val result = service.ask(CreateDevices(List(device)))
complete(result.map(_ => Created))
} ~ entity(as[Multipart.FormData]) { formData =>
val result = formData.parts
.filter(_.entity.contentType == `application/octet-stream`)
.mapAsync(1) { bodyPart =>
bodyPart.entity.dataBytes
.via(Framing.delimiter(ByteString('\n'), 200))
.map(bs => NewDevice(bs.utf8String))
.grouped(10)
.mapAsync(1)(devices => service.ask(CreateDevices(devices.toList)))
.runWith(Sink.ignore)
}.runWith(Sink.ignore)
complete(result.map(_ => Created))
}
}
}
}
开发者ID:reactive-hub,项目名称:reactive-apns-example,代码行数:53,代码来源:RestApi.scala
示例7: UploadingFileServer
//设置package包名称以及导入依赖的类
package com.packt.chapter9
import akka.http.scaladsl.model.Multipart
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.HttpApp
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object UploadingFileServer extends HttpApp {
val route =
extractRequestContext { ctx =>
implicit val materializer = ctx.materializer
implicit val ec = ctx.executionContext
path("regularupload") {
entity(as[Multipart.FormData]) { formData =>
val partsFuture = formData.parts.mapAsync(1) { b =>
b.toStrict(2.seconds).map(_.entity.data.utf8String)
}.runFold(List.empty[String])(_ :+ _)
onSuccess(partsFuture) { allParts =>
complete {
val wholeFile = allParts.mkString
s"Regular upload: submitted file has ${wholeFile.split("\n").size} lines"
}
}
}
} ~
path("streamupload") {
entity(as[Multipart.FormData]) { formData =>
val partsFuture = formData.parts.mapAsync(1) { b =>
b.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
.map(_ => 1)
.runReduce(_ + _)
}.runReduce(_ + _)
onSuccess(partsFuture) { lines =>
complete {
s"Regular upload: submitted file has $lines lines"
}
}
}
}
}
}
object UploadingFileServerApplication extends App {
UploadingFileServer.startServer("0.0.0.0", 8088, ServerSettings(ConfigFactory.load))
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:55,代码来源:UploadingFileServer.scala
示例8: StreamTcpClient
//设置package包名称以及导入依赖的类
package socket.client
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Flow, Framing, Source, Tcp}
import akka.util.ByteString
object StreamTcpClient extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val connection = Tcp().outgoingConnection("127.0.0.1", 8080)
val parse = Flow[String].takeWhile(_ != "q")
.concat(Source.single("BYE"))
.map(elem => ByteString(s"$elem\n"))
val flow = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(text => println("Server: " + text))
.map(_ => "ping")
.via(parse)
connection.join(flow).run()
}
开发者ID:focusj,项目名称:address-service,代码行数:28,代码来源:StreamTcpClient.scala
示例9: LogEntityMarshaller
//设置package包名称以及导入依赖的类
package aia.stream
import akka.NotUsed
import akka.stream.scaladsl.Framing
import akka.stream.scaladsl.JsonFraming
import akka.http.scaladsl.model.HttpCharsets._
import akka.http.scaladsl.model.MediaTypes._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model._
import akka.stream.scaladsl.Source
import akka.util.ByteString
import spray.json._
import akka.http.scaladsl.marshalling.Marshaller
import akka.http.scaladsl.marshalling.ToEntityMarshaller
object LogEntityMarshaller extends EventMarshalling {
type LEM = ToEntityMarshaller[Source[ByteString, _]]
def create(maxJsonObject: Int): LEM = {
val js = ContentTypes.`application/json`
val txt = ContentTypes.`text/plain(UTF-8)`
val jsMarshaller = Marshaller.withFixedContentType(js) {
src:Source[ByteString, _] =>
HttpEntity(js, src)
}
val txtMarshaller = Marshaller.withFixedContentType(txt) {
src:Source[ByteString, _] =>
HttpEntity(txt, toText(src, maxJsonObject))
}
Marshaller.oneOf(jsMarshaller, txtMarshaller)
}
def toText(src: Source[ByteString, _],
maxJsonObject: Int): Source[ByteString, _] = {
src.via(LogJson.jsonToLogFlow(maxJsonObject))
}
}
开发者ID:gilbutITbook,项目名称:006877,代码行数:46,代码来源:LogEntityMarshaller.scala
示例10: TransactionProcessor
//设置package包名称以及导入依赖的类
package frdomain.ch7
package streams
import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl.{Tcp, Source, Sink, Framing}
import akka.util.ByteString
import scala.concurrent.duration._
class TransactionProcessor(host: String, port: Int)(implicit val system: ActorSystem) extends Logging {
def run(): Unit = {
implicit val mat = ActorMaterializer()
val summarizer = system.actorOf(Props[Summarizer])
logger.info(s"Receiver: binding to $host:$port")
Tcp().bind(host, port).runForeach { conn =>
val receiveSink =
conn.flow
.via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 4000, allowTruncation = true))
.map(_.utf8String)
.map(_.split(","))
.mapConcat(Transaction(_).toList)
.to(Sink.fromSubscriber(ActorSubscriber[Transaction](summarizer)))
receiveSink.runWith(Source.maybe)
}
import system.dispatcher
system.scheduler.schedule(0.seconds, 5.seconds, summarizer, LogSummaryBalance)
}
}
object TransactionProcessor extends App {
implicit val system = ActorSystem("processor")
new TransactionProcessor("127.0.0.1", 9982).run()
}
开发者ID:ricardo8504,项目名称:frdomain-master,代码行数:43,代码来源:TransactionProcessor.scala
示例11: FileURLsStoreSpec
//设置package包名称以及导入依赖的类
package io.scalac.newspaper.crawler.urls
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import scala.collection.immutable
import scala.concurrent.Future
class FileURLsStoreSpec extends TestKit(ActorSystem("test-system")) with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures {
override def afterAll(): Unit = TestKit.shutdownActorSystem(system)
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
"FileURLsStore" should {
"be able to delete url from file" in {
val path = Paths.get("src/test/resources/urls")
val google = "http://google.pl"
val scalac = "http://blog.scalac.io"
val urls = immutable.Seq(google, scalac)
val resultF = for {
_ <- Source(urls)
.map(s => ByteString(s + "\n"))
.runWith(FileIO.toPath(path))
cut <- Future.successful(new FileURLsStore(path))
_ <- cut.removeURL(scalac)
urlsFromStore <- cut.getURLs.runWith(Sink.seq)
urlsFromFile <- FileIO.fromPath(path).via(Framing.delimiter(ByteString("\n"), 1024, true)).map(_.utf8String).runWith(Sink.seq)
} yield(urlsFromFile, urlsFromStore)
val (urlsFromFile, urlsFromStore) = resultF.futureValue
urlsFromFile shouldEqual urlsFromStore
urlsFromFile shouldEqual Vector(google)
}
}
}
开发者ID:ScalaConsultants,项目名称:newspaper,代码行数:50,代码来源:FileURLsStoreSpec.scala
示例12: BlueprintExample
//设置package包名称以及导入依赖的类
package code
import java.nio.file.FileSystems
import akka.NotUsed
import akka.stream.IOResult
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, RunnableGraph}
import akka.util.ByteString
import scala.concurrent.Future
object BlueprintExample extends AkkaStreamsApp {
val file = this.getClass.getClassLoader.getResource("current_inventory.csv")
val inPath = FileSystems.getDefault.getPath(file.getPath)
val outPath = FileSystems.getDefault.getPath("no_inventory.csv")
val fileSource = FileIO.fromPath(inPath)
val fileSink = FileIO.toPath(outPath)
val csvHandler: Flow[String, List[String], NotUsed] =
Flow[String]
.drop(1)
.map(_.split(",").toList)
.log("csvHandler")
val lowInventoryFlow: RunnableGraph[Future[IOResult]] =
fileSource
.via(Framing.delimiter(ByteString("\n"), Integer.MAX_VALUE))
.map(_.utf8String)
.log("Before CSV Handler")
.via(csvHandler)
.filter(list => list(2).toInt == 0)
.log("After filter")
.map { list =>
ByteString(list.mkString(",")) ++ ByteString("\n")
}.toMat(fileSink)(Keep.right)
override def akkaStreamsExample: Future[_] =
lowInventoryFlow.run()
runExample
}
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:42,代码来源:BlueprintExample.scala
示例13: responseToDocs
//设置package包名称以及导入依赖的类
package http_handling
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Sink, Source}
import akka.util.ByteString
import models.docs.Doc
import scala.concurrent.Future
trait RequestToResponse extends Protocols with Flows {
implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
def responseToDocs(httpReq: Seq[HttpRequest], host: String): Future[Seq[Doc]] = {
responseAsStream(httpReq, host)
.runWith(Sink.seq)
}
def responseAsStream(httpReq: Seq[HttpRequest], host: String): Source[Doc, Any] =
Source(httpReq.toList.map(_ -> 1))
.via(connecFlow(host))
.via(dropBrokenRequests)
.via(checkHttpStatus)
.via(Framing.delimiter(ByteString(delimiter), 80000, allowTruncation = true))
.drop(1)
.map(_.utf8String)
.via(readdDelimiter)
.via(deserialize)
.mapConcat(_.response.docs)
}
开发者ID:yannick-cw,项目名称:nyt4s,代码行数:34,代码来源:RequestToResponse.scala
示例14: FileTailerPublisherTest
//设置package包名称以及导入依赖的类
package teleporter.integration.component.file
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.LockSupport
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Sink, Source}
import akka.util.ByteString
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.LoggerContext
import org.apache.logging.log4j.core.appender.RollingFileAppender
import org.apache.logging.log4j.scala.Logging
import org.scalatest.FunSuite
import scala.collection.JavaConverters._
import scala.concurrent.duration._
class FileTailerPublisherTest extends FunSuite with Logging {
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
test("tail publish") {
val loggerContext = LogManager.getContext(true).asInstanceOf[LoggerContext]
val path = loggerContext.getConfiguration.getAppenders.asScala.collect {
case (_, v: RollingFileAppender) ? Paths.get(v.getFileName)
}.head
println(path.toFile.getCanonicalPath)
FileTailer.source(path)
.via(Framing.delimiter(ByteString(System.lineSeparator()), 1024 * 5))
.runForeach(bs ? println(bs.utf8String))
val counter = new AtomicLong()
Source.tick(2.seconds, 2.seconds, "ttttet").map(x ? logger.info(counter.incrementAndGet().toString)).to(Sink.ignore).run()
logger.info("====================")
LockSupport.park()
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:39,代码来源:FileTailerPublisherTest.scala
示例15: GraphMaker
//设置package包名称以及导入依赖的类
package gatling.protocol.protoclient
import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, Framing, GraphDSL, RunnableGraph, Sink, Source, Tcp}
import akka.stream.{ClosedShape, FlowShape}
import akka.util.ByteString
object GraphMaker {
import GraphDSL.Implicits._
implicit val sys = ActorSystem("mySystem")
def escape(raw: String): String = {
import scala.reflect.runtime.universe._
Literal(Constant(raw)).toString
}
def bsframer: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].via(Framing.delimiter(ByteString("!"), maximumFrameLength = 1000, allowTruncation = true))
def printer: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(x => {
println(escape(x.utf8String));
x
})
def tcpFlow(ip: String, port: Int, router: ActorRef) = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
val source = b.add(Source.actorPublisher[String](MessageSender.props(router)))
val tcp = b.add(Tcp().outgoingConnection(ip, port))
val sink = b.add(Sink.actorSubscriber(MessageReceiver.props(router)))
val mapToByteStr = b.add(Flow[String].map(x => ByteString(x + "!")))
//val framer: FlowShape[ByteString, ByteString] = b.add(bsframer)
//val separator = b.add(Flow[String].map(x => ByteString(x + "?")))
source ~> mapToByteStr ~> tcp ~> sink
ClosedShape
}
)
}
开发者ID:tg44,项目名称:akka-streams-simple-chat,代码行数:41,代码来源:GraphMaker.scala
注:本文中的akka.stream.scaladsl.Framing类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论