本文整理汇总了Scala中akka.stream.testkit.scaladsl.TestSink类的典型用法代码示例。如果您正苦于以下问题:Scala TestSink类的具体用法?Scala TestSink怎么用?Scala TestSink使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TestSink类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: AccumulateWhileUnchangedTest
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.hpi.esb.datavalidator.data.SimpleRecord
import org.scalatest.FunSuite
import scala.collection.immutable
class AccumulateWhileUnchangedTest extends FunSuite {
implicit val system = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
val windowSize = 1000
def windowStart(timestamp: Long): Long = {
timestamp - (timestamp % windowSize)
}
val numberOfElements = 3000
test("accumulation of two windows on element stream") {
val firstWindowElements = immutable.Seq.range(1, 999, 10).map(t => SimpleRecord(1)(t))
val secondWindowElements = immutable.Seq.range(1000, 1999, 10).map(t => SimpleRecord(1)(t))
val records = firstWindowElements ++ secondWindowElements
val s = TestSink.probe[Seq[SimpleRecord]]
val (_, sink) = Source(records)
.via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
.toMat(s)(Keep.both)
.run()
sink.request(numberOfElements)
sink.expectNext(firstWindowElements, secondWindowElements)
sink.expectComplete()
}
test("accumulation on empty stream") {
val s = TestSink.probe[Seq[SimpleRecord]]
val records = List[SimpleRecord]()
val (_, sink) = Source(records)
.via(new AccumulateWhileUnchanged(r => windowStart(r.timestamp)))
.toMat(s)(Keep.both)
.run()
sink.request(numberOfElements)
sink.expectComplete()
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:57,代码来源:AccumulateWhileUnchangedTest.scala
示例2: IgnoreLastElementsTest
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.validation.graphstage
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.FunSuite
import scala.collection.immutable
class IgnoreLastElementsTest extends FunSuite {
implicit val system = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
test("happy list") {
val start = 0
val end = 10
val values = immutable.Seq.range(start, end)
val ignoreCount = 2
val s = TestSink.probe[Int]
val (_, sink) = Source(values)
.via(new IgnoreLastElements[Int](ignoreCount))
.toMat(s)(Keep.both)
.run()
val numberOfElements = end
sink.request(numberOfElements)
values.dropRight(ignoreCount).foreach(v => sink.expectNext(v))
sink.expectComplete()
}
test("empty list") {
val s = TestSink.probe[Int]
val (_, sink) = Source(List[Int]())
.via(new IgnoreLastElements[Int](ignoreCount = 2))
.toMat(s)(Keep.both)
.run()
val numberOfElements = 1
sink.request(numberOfElements)
sink.expectComplete()
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:50,代码来源:IgnoreLastElementsTest.scala
示例3: GeminiMarketsTest
//设置package包名称以及导入依赖的类
package io.allquantor.scemini.client.gemini
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.testkit.scaladsl.TestSink
import io.allquantor.scemini.adt.gemini.GeminiEvents.{ChangeEvent, GeminiEvent}
import io.allquantor.scemini.adt.gemini.GeminiConstants.{GeminiEventReasons, GeminiEventTypes, CurrencyPairs}
import io.allquantor.scemini.adt.gemini.GeminiConstants.CurrencyPairs.CurrencyPair
import io.allquantor.scemini.client.ExchangePlatformClient
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
class GeminiMarketsTest extends FlatSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher
override def afterAll {
system.terminate()
}
"Gemini MarketClient" should "retrieve gemini sandbox market stream " in {
val currencyPairs = Seq(CurrencyPairs.btcusd,CurrencyPairs.ethbtc)
val client = ExchangePlatformClient.asGeminiClient(currencyPairs)
val source = client.source
type ResultType = Either[io.circe.Error, GeminiEvent]
val testSink = TestSink.probe[ResultType](system)
source.
runWith(testSink)
.ensureSubscription()
.request(1)
.expectNextChainingPF(
{ case Right(e:GeminiEvent) => e.currencyPair.get shouldBe an[CurrencyPair] })
}
}
开发者ID:allquantor,项目名称:scemini,代码行数:43,代码来源:GeminiMarketsTest.scala
示例4: RepeatNTimesSpec
//设置package包名称以及导入依赖的类
package scaladays.akka.stream
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
class RepeatNTimesSpec extends WordSpec with Matchers with BeforeAndAfterAll
with ScalaFutures {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
"RepeatNTimesSpec" must {
"repeat 3 times" in {
val xs = Source.single("x")
.via(RepeatNTimes(3))
.runWith(Sink.seq).futureValue
xs should === (List("x", "x", "x"))
}
"repeat 3 things 3 times" in {
val xs = Source.fromIterator(() => List("a", "b", "c").iterator)
.via(RepeatNTimes(3))
.runWith(Sink.seq).futureValue
xs should === (List("a", "a", "a", "b", "b", "b", "c", "c", "c"))
}
"show TestSink" in {
val probe: Probe[String] =
Source.fromIterator(() => List("a").iterator)
.via(RepeatNTimes(3))
.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("a")
probe.requestNext("a")
probe.expectComplete()
}
}
}
开发者ID:ktoso,项目名称:scaladays-berlin-akka-streams,代码行数:48,代码来源:RepeatNTimesSpec.scala
示例5: HomeControllerSpec
//设置package包名称以及导入依赖的类
package controllers
import actors.TestKitSpec
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.testkit.TestProbe
import org.scalatest.MustMatchers
import play.api.libs.json.{JsValue, Json}
import scala.concurrent.ExecutionContext
class HomeControllerSpec extends TestKitSpec with MustMatchers {
"createWebSocketFlow" should {
"create a websocket flow and send a message through" in {
implicit val materializer = ActorMaterializer()(system)
implicit val ec: ExecutionContext = system.dispatcher
val stocksActor = TestProbe("stocksActor")
val userParentActor = TestProbe("userParentActor")
val userActor = TestProbe("userActor")
// http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-testkit.html
val publisher = akka.stream.testkit.TestPublisher.probe[JsValue]()
// instantiate the controller...
val controller = new HomeController(stocksActor.ref, userParentActor.ref)
// call method under test...
val flowUnderTest = controller.createWebSocketFlow(publisher, userActor.ref)
// create a test source and sink around the flow
val (pub, sub) = TestSource.probe[JsValue]
.via(flowUnderTest)
.toMat(TestSink.probe[JsValue])(Keep.both)
.run()
val jsvalue = Json.obj("herp" -> "derp")
// check that a message sent in will come out the other end
sub.request(n = 1)
publisher.sendNext(jsvalue)
sub.expectNext(jsvalue)
}
}
}
开发者ID:ChrisCooper,项目名称:customer-feedback-screen,代码行数:52,代码来源:HomeControllerSpec.scala
示例6: GitHubSpec
//设置package包名称以及导入依赖的类
package jp.co.dzl.example.akka.api.service
import akka.actor.ActorSystem
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{ HttpMethods, HttpRequest, HttpResponse }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.testkit.scaladsl.TestSink
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, FlatSpec, Matchers }
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class GitHubSpec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll with MockFactory {
implicit val system = ActorSystem("github-spec")
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
override protected def afterAll: Unit = {
Await.result(system.terminate(), Duration.Inf)
}
"#from" should "merge original headers to github request" in {
val github = new GitHubImpl("127.0.0.1", 8000, 5, mock[HttpClient])
val request = HttpRequest(HttpMethods.GET, "/")
.addHeader(RawHeader("host", "dummy"))
.addHeader(RawHeader("timeout-access", "dummy"))
val result = Source.single(HttpRequest(HttpMethods.GET, "/v1/github/users/xxxxxx"))
.via(github.from(request))
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext()
result.headers.filter(_.lowercaseName() == "host") shouldBe empty
result.headers.filter(_.lowercaseName() == "timeout-access") shouldBe empty
result.headers.filter(_.lowercaseName() == "x-forwarded-host") shouldNot be(empty)
}
"#send" should "connect using http client" in {
val httpResponse = HttpResponse()
val httpClient = mock[HttpClient]
(httpClient.connectionHttps _).expects(*, *, *).returning(Flow[HttpRequest].map(_ => httpResponse))
val github = new GitHubImpl("127.0.0.1", 8000, 5, httpClient)
val result = Source.single(HttpRequest(HttpMethods.GET, "/"))
.via(github.send)
.runWith(TestSink.probe[HttpResponse])
.request(1)
.expectNext()
result shouldBe httpResponse
}
}
开发者ID:dazzle-lab,项目名称:akka-api-gateway-example,代码行数:57,代码来源:GitHubSpec.scala
示例7: HTTPPollingActorSpec
//设置package包名称以及导入依赖的类
package polling
import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ResponseEntity
import akka.stream.scaladsl.{Flow, Keep}
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.testkit.TestKit
import org.json4s.JsonAST.JValue
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike}
class HTTPPollingActorSpec(_system: ActorSystem) extends TestKit(_system)
with FlatSpecLike with BeforeAndAfterAll {
override def afterAll = {
TestKit.shutdownActorSystem(system)
}
def testExchangeFlowPubSub(flow: Flow[(Instant, ResponseEntity), (String, JValue), NotUsed]) =
TestSource.probe[(Instant, ResponseEntity)]
.via(flow)
.toMat(TestSink.probe[(String, JValue)])(Keep.both)
}
开发者ID:blbradley,项目名称:kafka-cryptocoin,代码行数:27,代码来源:HTTPPollingActorSpec.scala
示例8: TwitterStreamSpec
//设置package包名称以及导入依赖的类
package ch.becompany.social.twitter
import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import ch.becompany.social.Status
import com.typesafe.scalalogging.LazyLogging
import org.scalatest.{FlatSpec, Inspectors, Matchers}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Try}
class TwitterStreamSpec extends FlatSpec with Matchers with Inspectors with LazyLogging {
import ch.becompany.util.StreamLogging._
implicit val system = ActorSystem("twitter-stream-spec")
implicit val materializer = ActorMaterializer()
"Twitter stream" should "stream tweets by keyword" in {
val results = TwitterStream("track" -> "happy").
stream.
via(logElements).
runWith(TestSink.probe[(Instant, Try[Status])]).
request(20).
expectNextN(20)
forAll(results)(_ should matchPattern { case (_, Success(Status( _, _))) => })
}
"Twitter stream" should "stream tweets by user ID" in {
val results = TwitterStream("follow" -> "20536157").
stream.
via(logElements).
runWith(TestSink.probe[(Instant, Try[Status])]).
request(2).
expectNextN(2)
forAll(results)(_ should matchPattern { case (_, Success(Status(_, _))) => })
}
"Twitter stream" should "retry when an error occurs" in {
val results = new TwitterStream(Map("follow" -> "20536157"), "notfound").
stream.
via(logElements).
runWith(TestSink.probe[(Instant, Try[Status])]).
request(2).
expectNextN(2)
forAll(results)(_ should matchPattern { case (_, Success(Status(_, _))) => })
}
}
开发者ID:becompany,项目名称:akka-social-stream,代码行数:59,代码来源:TwitterStreamSpec.scala
示例9: GithubFeedSpec
//设置package包名称以及导入依赖的类
package ch.becompany.social.github
import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import ch.becompany.social.Status
import com.typesafe.scalalogging.LazyLogging
import org.scalatest.FlatSpec
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Try}
class GithubFeedSpec extends FlatSpec with LazyLogging {
implicit val system = ActorSystem("github-feed-spec")
implicit val materializer = ActorMaterializer()
"Github event feed" should "stream GitHub events" in {
val feed = new GithubFeed("becompany")
feed.stream().
map(t => { logger.info(t.toString); t }).
runWith(TestSink.probe[(Instant, Try[Status])]).
request(30).
expectNextChainingPF {
case (date, Success(test)) => logger.info(s"$$date $test")
}
}
}
开发者ID:becompany,项目名称:akka-social-stream,代码行数:33,代码来源:GithubFeedSpec.scala
示例10: AlphaApplication
//设置package包名称以及导入依赖的类
package com.lightbend.lagom.scaladsl.testkit
import akka.persistence.query.Offset
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraPersistenceComponents
import com.lightbend.lagom.scaladsl.playjson.EmptyJsonSerializerRegistry
import com.lightbend.lagom.scaladsl.server.{ LagomApplication, LagomApplicationContext, LagomServer, LocalServiceLocator }
import com.lightbend.lagom.scaladsl.testkit.services.{ AlphaEvent, AlphaService }
import org.scalatest.{ AsyncWordSpec, Matchers }
import play.api.libs.ws.ahc.AhcWSComponents
abstract class AlphaApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with CassandraPersistenceComponents
with TestTopicComponents
with AhcWSComponents {
override lazy val lagomServer = serverFor[AlphaService](new AlphaServiceImpl())
override lazy val jsonSerializerRegistry = EmptyJsonSerializerRegistry
}
class AlphaServiceImpl extends AlphaService {
override def messages: Topic[AlphaEvent] =
TopicProducer.singleStreamWithOffset { offset =>
val events = (1 to 10).filter(_ % 2 == 0).map(AlphaEvent.apply)
Source(events).map(event => (event, Offset.sequence(event.message / 2)))
}
}
class TopicPublishingSpec extends AsyncWordSpec with Matchers {
"The AlphaService" should {
"publish events on alpha topic" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
new AlphaApplication(ctx) with LocalServiceLocator
} { server =>
implicit val system = server.actorSystem
implicit val mat = server.materializer
val client: AlphaService = server.serviceClient.implement[AlphaService]
val source = client.messages.subscribe.atMostOnceSource
source.runWith(TestSink.probe[AlphaEvent])
.request(1)
.expectNext should ===(AlphaEvent(2))
}
}
}
开发者ID:lagom,项目名称:lagom,代码行数:55,代码来源:TopicPublishingSpec.scala
示例11: FileReaderSpec
//设置package包名称以及导入依赖的类
package ch.becompany.akka.io.file
import java.nio.charset.StandardCharsets
import java.nio.file._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}
import scala.collection.JavaConverters._
class FileReaderSpec extends FlatSpec with Matchers with ScalaFutures {
implicit val system = ActorSystem("akka-file-io")
implicit val materializer = ActorMaterializer()
"File reader" should "continuously read a file" in {
val f = Files.createTempFile(null, ".log")
def writeAsync(lines: String*): Unit = {
new Thread() {
override def run(): Unit = {
Thread.sleep(10)
Files.write(f, lines.asJava, StandardCharsets.UTF_8, StandardOpenOption.APPEND)
println(s"Wrote $lines")
}
}.start()
}
val src = FileReader.readContinuously(f.toString, false, Some("UTF-8"))
val probe = src.runWith(TestSink.probe[String])
Thread.sleep(500)
writeAsync("foo", "bar")
probe.
request(1).
expectNext("foo")
writeAsync("baz")
probe.
request(2).
expectNext("bar", "baz")
}
}
开发者ID:becompany,项目名称:akka-file-io,代码行数:48,代码来源:FileReaderSpec.scala
示例12: UsersSpec
//设置package包名称以及导入依赖的类
package mm4s.api
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.{HttpRequest, RequestEntity}
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import mm4s.api.UserModels.{CreateUser, LoginByUsername}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
class UsersSpec extends TestKit(ActorSystem("UsersSpec"))
with WordSpecLike with Matchers with ScalaFutures {
implicit val mat = ActorMaterializer()
"api calls" should {
"have proper paths" when {
"create" in {
import UserProtocols._
val o = CreateUser("", "", "", "")
val e = Marshal(o).to[RequestEntity].futureValue
val path = uripath("/users/create")
Users.create(o)
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(HttpRequest(uri = path, method = POST, entity = e))
.expectComplete()
}
"login" in {
import UserProtocols._
val o = LoginByUsername("", "", "")
val e = Marshal(o).to[RequestEntity].futureValue
val path = uripath("/users/login")
Users.login(o)
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(HttpRequest(uri = path, method = POST, entity = e))
.expectComplete()
}
}
}
}
开发者ID:jw3,项目名称:mm4s,代码行数:52,代码来源:UsersSpec.scala
示例13: StreamsSpec
//设置package包名称以及导入依赖的类
package mm4s.api
import java.util.UUID
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpEntity, HttpMethods, HttpRequest}
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import org.scalatest.{Matchers, WordSpecLike}
import scala.concurrent.Future
class StreamsSpec extends TestKit(ActorSystem("StreamsSpec"))
with WordSpecLike with Matchers {
implicit val mat = ActorMaterializer()
"request factory" should {
val path = s"/${UUID.randomUUID.toString.take(5)}"
val expected = HttpRequest(uri = uripath(path))
"provide empty GET" when {
val expectedEmpty = expected.withEntity(HttpEntity.Empty)
"GET factory called" in {
Streams.get(path)
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(expectedEmpty)
.expectComplete()
}
"request defaults to GET" in {
Streams.request(path)(Future.successful)
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(expectedEmpty)
.expectComplete()
}
}
"provide modified request" when {
"request set to POST" in {
Streams.request(path)(r => Future.successful(r.withMethod(HttpMethods.POST)))
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(expected.withMethod(HttpMethods.POST))
.expectComplete()
}
}
}
}
开发者ID:jw3,项目名称:mm4s,代码行数:55,代码来源:StreamsSpec.scala
示例14: TeamsSpec
//设置package包名称以及导入依赖的类
package mm4s.api
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model.headers.Cookie
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, RequestEntity}
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import mm4s.api.TeamModels.CreateTeam
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}
class TeamsSpec extends TestKit(ActorSystem("TeamsSpec"))
with WordSpecLike with Matchers with ScalaFutures {
implicit val mat = ActorMaterializer()
"api calls" should {
"have proper paths" when {
"create" in {
import TeamProtocols._
val o = CreateTeam("", "", "")
val e = Marshal(o).to[RequestEntity].futureValue
val path = uripath("/teams/create")
Teams.create(o)
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(HttpRequest(uri = path, method = POST, entity = e))
.expectComplete()
}
"all" in {
val path = uripath("/teams/all")
Teams.list("token")
.runWith(TestSink.probe[HttpRequest])
.request(1)
.expectNext(HttpRequest(uri = path, method = GET, headers = List(auth("token"))))
.expectComplete()
}
}
}
}
开发者ID:jw3,项目名称:mm4s,代码行数:48,代码来源:TeamsSpec.scala
示例15: AccumulateSpecAutoFusingOn
//设置package包名称以及导入依赖的类
package akka.stream.contrib
import akka.stream.scaladsl.{ Keep, Source }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
class AccumulateSpecAutoFusingOn extends { val autoFusing = true } with AccumulateSpec
class AccumulateSpecAutoFusingOff extends { val autoFusing = false } with AccumulateSpec
trait AccumulateSpec extends BaseStreamSpec {
"Accumulate" should {
"emit folded vaules starting with the result of applying the given function to the given zero and the first pushed element" in {
val (source, sink) = TestSource.probe[Int]
.via(Accumulate(0)(_ + _))
.toMat(TestSink.probe)(Keep.both)
.run()
sink.request(99)
source.sendNext(1)
source.sendNext(2)
source.sendNext(3)
sink.expectNext(1, 3, 6)
source.sendComplete()
sink.expectComplete()
}
"not emit any value for an empty source" in {
Source(Vector.empty[Int])
.via(Accumulate(0)(_ + _))
.runWith(TestSink.probe)
.request(99)
.expectComplete()
}
"fail on upstream failure" in {
val (source, sink) = TestSource.probe[Int]
.via(Accumulate(0)(_ + _))
.toMat(TestSink.probe)(Keep.both)
.run()
sink.request(99)
source.sendError(new Exception)
sink.expectError()
}
}
}
开发者ID:akka,项目名称:akka-stream-contrib,代码行数:45,代码来源:AccumulateSpec.scala
示例16: TimeWindowSpecAutoFusingOn
//设置package包名称以及导入依赖的类
package akka.stream.contrib
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.TestDuration
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
class TimeWindowSpecAutoFusingOn extends { val autoFusing = true } with TimeWindowSpec
trait TimeWindowSpec extends BaseStreamSpec with ScalaFutures {
private val timeWindow = 100.milliseconds
"TimeWindow flow" should {
"aggregate data for predefined amount of time" in {
val summingWindow = TimeWindow(timeWindow.dilated, eager = false)(identity[Int])(_ + _)
val (pub, sub) = TestSource.probe[Int]
.via(summingWindow)
.toMat(TestSink.probe)(Keep.both)
.run
sub.request(2)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
sub.expectNext(timeWindow * 2, 5)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
sub.expectNext(timeWindow * 2, 5)
}
"emit the first seed if eager" in {
val summingWindow = TimeWindow(timeWindow.dilated, eager = true)(identity[Int])(_ + _)
val (pub, sub) = TestSource.probe[Int]
.via(summingWindow)
.toMat(TestSink.probe)(Keep.both)
.run
sub.request(2)
pub.sendNext(1)
sub.expectNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
pub.sendNext(1)
sub.expectNext(5)
}
}
}
开发者ID:akka,项目名称:akka-stream-contrib,代码行数:62,代码来源:TimeWindowSpec.scala
注:本文中的akka.stream.testkit.scaladsl.TestSink类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论