• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Source类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中akka.stream.javadsl.Source的典型用法代码示例。如果您正苦于以下问题:Java Source类的具体用法?Java Source怎么用?Java Source使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Source类属于akka.stream.javadsl包,在下文中一共展示了Source类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: sendEmail

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
public CompletableFuture<String> sendEmail(String to, String subject, String content) {
    return ws.url(String.format("https://api.mailgun.net/v3/%s/messages", mailgunDomain))
            .setAuth("api", mailgunKey, WSAuthScheme.BASIC)
            .post(Source.from(Arrays.asList(
                    new DataPart("from", mailgunFrom),
                    new DataPart("to", to),
                    new DataPart("subject", subject),
                    new DataPart("html", content))))
            .handleAsync((response, throwable) -> {
                if(throwable != null) {
                    log.error("sendEmail: Exception", throwable);
                } else if(response.getStatus() != 200) {
                    log.error("sendEmail: Non-200 response, status={}, body={}", response.getStatus(), response.getBody());
                } else {
                    log.error("sendEmail: OK, status={}, body={}", response.getStatus(), response.getBody());
                    return response.asJson().get("id").textValue();
                }
                return null;
            }).toCompletableFuture();
}
 
开发者ID:bekce,项目名称:oauthly,代码行数:22,代码来源:MailgunService.java


示例2: main

import akka.stream.javadsl.Source; //导入依赖的package包/类
public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("KafkaProducerSystem");

    final Materializer materializer = ActorMaterializer.create(system);

    final ProducerSettings<byte[], String> producerSettings =
            ProducerSettings
                    .create(system, new ByteArraySerializer(), new StringSerializer())
                    .withBootstrapServers("localhost:9092");

    CompletionStage<Done> done =
            Source.range(1, 100)
                    .map(n -> n.toString())
                    .map(elem ->
                            new ProducerRecord<byte[], String>(
                                    "topic1-ts",
                                    0,
                                    Instant.now().getEpochSecond(),
                                    null,
                                    elem))
                    .runWith(Producer.plainSink(producerSettings), materializer);

    done.whenComplete((d, ex) -> System.out.println("sent"));
}
 
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:25,代码来源:KafkaProducer.java


示例3: getLiveChirps

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
public ServiceCall<LiveChirpsRequest, Source<Chirp, ?>> getLiveChirps() {
    return req -> chirps.getRecentChirps(req.userIds).thenApply(recentChirps -> {
        List<Source<Chirp, ?>> sources = new ArrayList<>();
        for (String userId : req.userIds) {
            sources.add(topic.subscriber(userId));
        }
        HashSet<String> users = new HashSet<>(req.userIds);
        Source<Chirp, ?> publishedChirps = Source.from(sources).flatMapMerge(sources.size(), s -> s)
                .filter(c -> users.contains(c.userId));

        // We currently ignore the fact that it is possible to get duplicate chirps
        // from the recent and the topic. That can be solved with a de-duplication stage.
        return Source.from(recentChirps).concat(publishedChirps);
    });
}
 
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:17,代码来源:ChirpServiceImpl.java


示例4: shouldIncludeSomeOldChirpsInLiveFeed

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldIncludeSomeOldChirpsInLiveFeed() throws Exception {
    ChirpService chirpService = server.client(ChirpService.class);

    Chirp chirp1 = new Chirp("usr3", "hi 1");
    chirpService.addChirp("usr3").invoke(chirp1).toCompletableFuture().get(3, SECONDS);

    Chirp chirp2 = new Chirp("usr4", "hi 2");
    chirpService.addChirp("usr4").invoke(chirp2).toCompletableFuture().get(3, SECONDS);

    LiveChirpsRequest request = new LiveChirpsRequest(TreePVector.<String>empty().plus("usr3").plus("usr4"));

    eventually(FiniteDuration.create(10, SECONDS), () -> {
        Source<Chirp, ?> chirps = chirpService.getLiveChirps().invoke(request).toCompletableFuture().get(3, SECONDS);
        Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
        probe.request(10);
        probe.expectNextUnordered(chirp1, chirp2);

        Chirp chirp3 = new Chirp("usr4", "hi 3");
        chirpService.addChirp("usr4").invoke(chirp3).toCompletableFuture().get(3, SECONDS);
        probe.expectNext(chirp3);

        probe.cancel();
    });
}
 
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:26,代码来源:ChirpServiceTest.java


示例5: shouldRetrieveOldChirps

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldRetrieveOldChirps() throws Exception {
    ChirpService chirpService = server.client(ChirpService.class);

    Chirp chirp1 = new Chirp("usr5", "msg 1");
    chirpService.addChirp("usr5").invoke(chirp1).toCompletableFuture().get(3, SECONDS);

    Chirp chirp2 = new Chirp("usr6", "msg 2");
    chirpService.addChirp("usr6").invoke(chirp2).toCompletableFuture().get(3, SECONDS);

    HistoricalChirpsRequest request = new HistoricalChirpsRequest(Instant.now().minusSeconds(20),
            TreePVector.<String>empty().plus("usr5").plus("usr6"));

    eventually(FiniteDuration.create(10, SECONDS), () -> {
        Source<Chirp, ?> chirps = chirpService.getHistoricalChirps().invoke(request).toCompletableFuture().get(3, SECONDS);
        Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
        probe.request(10);
        probe.expectNextUnordered(chirp1, chirp2);
        probe.expectComplete();
    });
}
 
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:22,代码来源:ChirpServiceTest.java


示例6: testReactiveStream

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void testReactiveStream() throws InterruptedException, TimeoutException {
    final ActorSystem system =
            ActorSystem.create("MySystem");

    final Materializer mat =
            ActorMaterializer.create(system);

    Source<Integer, NotUsed> source = Source.range(1, 5);

    Flow<Integer, Integer, NotUsed> flow = Flow
            .fromFunction(x -> x + 1);

    Source<Integer, NotUsed> source2 = source.via(flow);

    Sink<Integer, CompletionStage<Integer>> fold =
            Sink.<Integer, Integer>fold(0, (next, total) -> total + next);

    CompletionStage<Integer> integerCompletionStage = source2.runWith(fold, mat);
    integerCompletionStage.thenAccept(System.out::println);
    Thread.sleep(3000);
    Await.ready(system.terminate(), Duration.apply(10, TimeUnit.SECONDS));
}
 
开发者ID:dhinojosa,项目名称:intro_to_reactive,代码行数:24,代码来源:ReactiveStreamsTest.java


示例7: testReactiveStreamRefined

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void testReactiveStreamRefined() throws InterruptedException, TimeoutException {
    final ActorSystem system =
            ActorSystem.create("MySystem");

    final Materializer mat =
            ActorMaterializer.create(system);

    Source<Integer, NotUsed> source = Source.range(1, 5).map(x -> x + 1)
            .fold(0, (next, total) -> total + next);

    CompletionStage<Integer> integerCompletionStage =
            source.runWith(Sink.head(), mat);

    integerCompletionStage.thenAccept(System.out::println);
    Thread.sleep(3000);
    Await.ready(system.terminate(), Duration.apply(10, TimeUnit.SECONDS));
}
 
开发者ID:dhinojosa,项目名称:intro_to_reactive,代码行数:19,代码来源:ReactiveStreamsTest.java


示例8: getHistoricalChirps

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
public ServiceCall<HistoricalChirpsRequest, Source<Chirp, ?>> getHistoricalChirps() {
  return req -> {
    List<Source<Chirp, ?>> sources = new ArrayList<>();
    for (String uid : req.getUserIds()) {
        Source<Chirp, NotUsed> select = db
          .select("SELECT * FROM chirp WHERE userId = ? AND timestamp >= ? ORDER BY timestamp ASC", uid,
              req.getFromTime().toEpochMilli())
          .map(this::mapChirp);
      sources.add(select);
    }
      // Chirps from one user are ordered by timestamp, but chirps from different
      // users are not ordered. That can be improved by implementing a smarter
      // merge that takes the timestamps into account.
    Source<Chirp, ?> result = Source.from(sources).flatMapMerge(sources.size(), s -> s);
    return CompletableFuture.completedFuture(result);
  };
}
 
开发者ID:negokaz,项目名称:lagom-hands-on-development,代码行数:19,代码来源:ChirpServiceImpl.java


示例9: shouldIncludeSomeOldChirpsInLiveFeed

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldIncludeSomeOldChirpsInLiveFeed() throws Exception {
  ChirpService chirpService = server.client(ChirpService.class);

  Chirp chirp1 = Chirp.of("usr3", "hi 1");
  chirpService.addChirp("usr3").invoke(chirp1).toCompletableFuture().get(3, SECONDS);

  Chirp chirp2 = Chirp.of("usr4", "hi 2");
  chirpService.addChirp("usr4").invoke(chirp2).toCompletableFuture().get(3, SECONDS);

  LiveChirpsRequest request = LiveChirpsRequest.of(TreePVector.<String>empty().plus("usr3").plus("usr4"));
  Source<Chirp, ?> chirps = chirpService.getLiveChirps("user3").invoke(request).toCompletableFuture().get(3, SECONDS);
  Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
  probe.request(10);
  probe.expectNextUnordered(chirp1, chirp2);

  Chirp chirp3 = Chirp.of("usr4", "hi 3");
  chirpService.addChirp("usr4").invoke(chirp3).toCompletableFuture().get(3, SECONDS);
  probe.expectNext(chirp3);

  probe.cancel();
}
 
开发者ID:negokaz,项目名称:lagom-hands-on-development,代码行数:23,代码来源:ChirpServiceTest.java


示例10: shouldRetrieveOldChirps

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldRetrieveOldChirps() throws Exception {
  ChirpService chirpService = server.client(ChirpService.class);

  Chirp chirp1 = Chirp.of("usr5", "msg 1");
  chirpService.addChirp("usr5").invoke(chirp1).toCompletableFuture().get(3, SECONDS);

  Chirp chirp2 = Chirp.of("usr6", "msg 2");
  chirpService.addChirp("usr6").invoke(chirp2).toCompletableFuture().get(3, SECONDS);

  HistoricalChirpsRequest request = HistoricalChirpsRequest.of(Instant.now().minusSeconds(20),
      TreePVector.<String>empty().plus("usr5").plus("usr6"));
  Source<Chirp, ?> chirps = chirpService.getHistoricalChirps().invoke(request).toCompletableFuture().get(3, SECONDS);
  Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
  probe.request(10);
  probe.expectNextUnordered(chirp1, chirp2);
  probe.expectComplete();
}
 
开发者ID:negokaz,项目名称:lagom-hands-on-development,代码行数:19,代码来源:ChirpServiceTest.java


示例11: importAll

import akka.stream.javadsl.Source; //导入依赖的package包/类
public CompletableFuture<Done> importAll() {
    
    AtomicLong batch = new AtomicLong(0);
    long start = System.currentTimeMillis();
    
    final Materializer materializer = ActorMaterializer.create(ActorSystem.create("actor-system", ConfigFactory.load()));
    Source<Page, NotUsed> channelSource = Source.fromIterator(() -> reader);
    
    Flow<Page, List<Page>, NotUsed> processing = Flow.of(Page.class)
                                                     .filter(x -> x != null && x.getNamespace() == 0)
                                                     .buffer(cores * 100, OverflowStrategy.backpressure())
                                                     .mapAsyncUnordered(cores, x -> cleaner.clean(x))
                                                     .groupedWithin(BATCH_SIZE,
                                                                    FiniteDuration.apply(1, TimeUnit.SECONDS))
                                                     .mapAsyncUnordered(cores, x -> datastore.save(x));
    
    final CompletionStage<Done> promise = channelSource.via(processing).runForeach(x -> {
        
        System.out.printf("Inserted: %d, Time: %d sec\n",
                          batch.incrementAndGet() * BATCH_SIZE,
                          (System.currentTimeMillis() - start) / 1000);
    }, materializer);
    
    return promise.toCompletableFuture();
    
}
 
开发者ID:crtomirmajer,项目名称:wiki2mongo,代码行数:27,代码来源:Wiki2MongoImporter.java


示例12: main

import akka.stream.javadsl.Source; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
  final ActorSystem system = ActorSystem.create("Sys");
  final ActorMaterializer materializer = ActorMaterializer.create(system);

  final String text =
    "Lorem Ipsum is simply dummy text of the printing and typesetting industry. " +
    "Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, " +
    "when an unknown printer took a galley of type and scrambled it to make a type " +
    "specimen book.";

  Source.from(Arrays.asList(text.split("\\s"))).
    // transform
    map(e -> e.toUpperCase()).
    // print to console
    runForeach(System.out::println, materializer).
    handle((done, failure) -> {
      system.terminate();
      return NotUsed.getInstance();
    });
}
 
开发者ID:typesafehub,项目名称:activator-akka-stream-java8,代码行数:21,代码来源:BasicTransformation.java


示例13: client

import akka.stream.javadsl.Source; //导入依赖的package包/类
public static void client(ActorSystem system, InetSocketAddress serverAddress) {
  final ActorMaterializer materializer = ActorMaterializer.create(system);

  final List<ByteString> testInput = new ArrayList<>();
  for (char c = 'a'; c <= 'z'; c++) {
    testInput.add(ByteString.fromString(String.valueOf(c)));
  }

  Source<ByteString, NotUsed> responseStream =
    Source.from(testInput).via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()));

  CompletionStage<ByteString> result = responseStream.runFold(
    ByteString.empty(), (acc, in) -> acc.concat(in), materializer);

  result.handle((success, failure) -> {
    if (failure != null) {
      System.err.println("Failure: " + failure.getMessage());
    } else {
      System.out.println("Result: " + success.utf8String());
    }
    System.out.println("Shutting down client");
    system.shutdown();
    return NotUsed.getInstance();
  });
}
 
开发者ID:typesafehub,项目名称:activator-akka-stream-java8,代码行数:26,代码来源:TcpEcho.java


示例14: stream

import akka.stream.javadsl.Source; //导入依赖的package包/类
public Result stream() {
    Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
            .mapMaterializedValue(sourceActor -> {
                sourceActor.tell(ByteString.fromString("kiki"), null);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                sourceActor.tell(ByteString.fromString("foo"), null);
                sourceActor.tell(ByteString.fromString("bar"), null);
                sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
                new CreateTraceEntry().traceEntryMarker();
                return null;
            });
    return ok().chunked(source);
}
 
开发者ID:glowroot,项目名称:glowroot,代码行数:18,代码来源:StreamController.java


示例15: streamForTag

import akka.stream.javadsl.Source; //导入依赖的package包/类
private Source<Pair<BasketEvent, Offset>, ?> streamForTag(AggregateEventTag<PEBasketEvent> tag, Offset offset) {
    return registry
            .eventStream(tag, offset)
            .filter(eventOffset ->
                    eventOffset.first() instanceof ItemAdded ||
                            eventOffset.first() instanceof ItemDeleted ||
                            eventOffset.first() instanceof CheckedOut
            )
            .map(this::toTopic);
}
 
开发者ID:ignasi35,项目名称:lagom-java-workshop,代码行数:11,代码来源:BasketServiceImpl.java


示例16: run1

import akka.stream.javadsl.Source; //导入依赖的package包/类
private void run1() {
    final ActorSystem actorSystem = ActorSystem.create();
    final Materializer materializer = ActorMaterializer.create(actorSystem);
    final Source<Integer, NotUsed> source = Source.range(0, 10);
    final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer i) -> i.toString());
    final Sink<String, CompletionStage<Done>> sink = Sink.foreach(s -> System.out.println("Example1 - Number: " + s));
    final RunnableGraph runnable = source.via(flow).to(sink);
    runnable.run(materializer);
    actorSystem.terminate();
}
 
开发者ID:henrikengstrom,项目名称:ujug2017,代码行数:11,代码来源:RSExample1.java


示例17: run2

import akka.stream.javadsl.Source; //导入依赖的package包/类
private void run2() {
    final ActorSystem actorSystem = ActorSystem.create();
    final Materializer materializer = ActorMaterializer.create(actorSystem);
    Source.range(0, 10)
            .map(Object::toString)
            .runForeach(s -> System.out.println("Example 2 - Number: " + s), materializer);
    actorSystem.terminate();
}
 
开发者ID:henrikengstrom,项目名称:ujug2017,代码行数:9,代码来源:RSExample1.java


示例18: helloStream

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void helloStream() throws Exception {
    // Important to concat our source with a maybe, this ensures the connection doesn't get closed once we've
    // finished feeding our elements in, and then also to take 3 from the response stream, this ensures our
    // connection does get closed once we've received the 3 elements.
    Source<String, ?> response = await(streamService.stream().invoke(
            Source.from(Arrays.asList("a", "b", "c"))
                    .concat(Source.maybe())));
    List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
    assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);
}
 
开发者ID:MarioAriasC,项目名称:lagomkotlin,代码行数:12,代码来源:StreamIT.java


示例19: hide

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
protected Publisher hide(Processor processor) {
    return (Publisher) Source
            .fromPublisher(processor)
            .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:8,代码来源:AkkaProcProxy.java


示例20: filter

import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
protected <T> Publisher<T> filter(Processor processor, final Class<T> filterClass) {
    Source src = Source.fromPublisher(processor)
            .filter(o -> filterClass.isAssignableFrom(o.getClass()));
    return (Publisher<T>) src.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:8,代码来源:AkkaProcProxy.java



注:本文中的akka.stream.javadsl.Source类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AHBottomNavigationItem类代码示例发布时间:2022-05-21
下一篇:
Java Buffer类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap