• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java ReplayProcessor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.core.publisher.ReplayProcessor的典型用法代码示例。如果您正苦于以下问题:Java ReplayProcessor类的具体用法?Java ReplayProcessor怎么用?Java ReplayProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ReplayProcessor类属于reactor.core.publisher包,在下文中一共展示了ReplayProcessor类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: echo

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void echo() throws Exception {
	int count = 100;
	Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
	ReplayProcessor<Object> output = ReplayProcessor.create(count);

	client.execute(getUrl("/echo"),
			session -> {
				logger.debug("Starting to send messages");
				return session
						.send(input.doOnNext(s -> logger.debug("outbound " + s)).map(session::textMessage))
						.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
						.subscribeWith(output)
						.doOnNext(s -> logger.debug("inbound " + s))
						.then()
						.doOnSuccessOrError((aVoid, ex) ->
								logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
			})
			.block(Duration.ofMillis(5000));

	assertEquals(input.collectList().block(Duration.ofMillis(5000)),
			output.collectList().block(Duration.ofMillis(5000)));
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-gateway,代码行数:24,代码来源:WebSocketIntegrationTests.java


示例2: main

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static final void main(String[] args) throws URISyntaxException {

        WebSocketClient client = new ReactorNettyWebSocketClient();
//        client.execute(new URI("ws://localhost:8080/echo"), (WebSocketSession session) -> {
//            session.send().log().;
//        });
        
        int count = 100;
		Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
		ReplayProcessor<Object> output = ReplayProcessor.create(count);

		client.execute(new URI("ws://localhost:8080/echo"),
				session -> {
					log.debug("Starting to send messages");
					return session
							.send(input.doOnNext(s -> log.debug("outbound " + s)).map(session::textMessage))
							.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
							.subscribeWith(output)
							.doOnNext(s -> log.debug("inbound " + s))
							.then()
							.doOnTerminate((aVoid, ex) ->
									log.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
				})
				.block(Duration.ofMillis(5000));

//		assertEquals(input.collectList().block(Duration.ofMillis(5000)),
//				output.collectList().block(Duration.ofMillis(5000)));
//        client.execute(new URI("ws://localhost:8080/echo")), session -> {
//            session.
//        }
//        ).blockMillis(5000);
    }
 
开发者ID:hantsy,项目名称:spring-reactive-sample,代码行数:33,代码来源:WebSocketDemoClient.java


示例3: analyticsTest

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void analyticsTest() throws Exception {
	ReplayProcessor<Integer> source = ReplayProcessor.create();

	long avgTime = 50l;

	Mono<Long> result = source
			.log("delay")
			.publishOn(asyncGroup)
	                          .delayElements(Duration.ofMillis(avgTime))

	                          .elapsed()
	                          .skip(1)
	                          .groupBy(w -> w.getT1())
							  .flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
	                          .log("elapsed")
	                          .collectSortedList(Comparator.comparing(Tuple2::getT1))
	                          .flatMapMany(Flux::fromIterable)
	                          .reduce(-1L, (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) : next.getT1())
	                          .log("reduced-elapsed")
	                          .cache();

	source.subscribe();

	for (int j = 0; j < 10; j++) {
		source.onNext(1);
	}
	source.onComplete();

	Assert.assertTrue(result.block(Duration.ofSeconds(5)) >= avgTime * 0.6);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FluxTests.java


示例4: combineWithOneElement

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void combineWithOneElement() throws InterruptedException, TimeoutException {
	AtomicReference<Object> ref = new AtomicReference<>(null);

	Phaser phaser = new Phaser(2);

	Flux<Object> s1 = ReplayProcessor.cacheLastOrDefault(new Object())
	                                 .publishOn(asyncGroup);
	Flux<Object> s2 = ReplayProcessor.cacheLastOrDefault(new Object())
	                                .publishOn(asyncGroup);

	// The following works:
	//List<Flux<Object>> list = Arrays.collectList(s1);
	// The following fails:
	List<Flux<Object>> list = Arrays.asList(s1, s2);

	Flux.combineLatest(list, t -> t)
	       .log()
	       .doOnNext(obj -> {
		       ref.set(obj);
		       phaser.arrive();
	       })
	       .subscribe();

	phaser.awaitAdvanceInterruptibly(phaser.arrive(), 1, TimeUnit.SECONDS);
	Assert.assertNotNull(ref.get());
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:28,代码来源:FluxTests.java


示例5: whenProcessorIsStreamed

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
	public void whenProcessorIsStreamed() {
//		"When a processor is streamed"
//		given: "a source composable and a async downstream"
		ReplayProcessor<Integer> source = ReplayProcessor.create();
		Scheduler scheduler = Schedulers.newParallel("test", 2);

		try {
			Mono<List<Integer>> res = source.subscribeOn(scheduler)
			                                .delaySubscription(Duration.ofMillis(1L))
			                                .log("streamed")
			                                .map(it -> it * 2)
			                                .buffer()
			                                .publishNext();

			res.subscribe();

//		when: "the source accepts a value"
			source.onNext(1);
			source.onNext(2);
			source.onNext(3);
			source.onNext(4);
			source.onComplete();

//		then: "the res is passed on"
			assertThat(res.block()).containsExactly(2, 4, 6, 8);
		}
		finally {
			scheduler.dispose();
		}
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FluxSpecTests.java


示例6: behaviorProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy behaviorProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create(1), PASS);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例7: serializedBehaviorProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy serializedBehaviorProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create(1).serialize(), PASS);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例8: replayProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy replayProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create(), PASS);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例9: serializedReplayProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy serializedReplayProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create().serialize(), PASS);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例10: safeBehaviorProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeBehaviorProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create(1), WRAP);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例11: safeSerializedBehaviorProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeSerializedBehaviorProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create(1).serialize(), WRAP);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例12: safeReplayProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeReplayProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create(), WRAP);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例13: safeSerializedReplayProcessorProxy

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeSerializedReplayProcessorProxy() {
    return new ReactorProcProxy(ReplayProcessor.create().serialize(), WRAP);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例14: duplexEcho

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void duplexEcho() throws Exception {

	int c = 10;
	CountDownLatch clientLatch = new CountDownLatch(c);
	CountDownLatch serverLatch = new CountDownLatch(c);

	FluxProcessor<String, String> server =
			ReplayProcessor.<String>create().serialize();
	FluxProcessor<String, String> client =
			ReplayProcessor.<String>create().serialize();

	server.log("server")
	      .subscribe(v -> serverLatch.countDown());
	client.log("client")
	      .subscribe(v -> clientLatch.countDown());

	httpServer = HttpServer.create(0)
	                       .newHandler((in, out) -> out.sendWebsocket((i, o) -> o.sendString(
			                       i.receive()
			                        .asString()
			                        .take(c)
			                        .subscribeWith(server))))
	                       .block(Duration.ofSeconds(30));

	Flux.interval(Duration.ofMillis(200))
	    .map(Object::toString)
	    .subscribe(client::onNext);

	HttpClient.create(httpServer.address()
	                            .getPort())
	          .ws("/test")
	          .flatMap(in -> in.receiveWebsocket((i, o) -> o.options(opt -> opt.flushOnEach())
	                                                     .sendString(i.receive()
	                                                                  .asString()
	                                                                  .subscribeWith(
			                                                                  client))))
	          .subscribe();

	Assert.assertTrue(serverLatch.await(10, TimeUnit.SECONDS));
	Assert.assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
 
开发者ID:reactor,项目名称:reactor-netty,代码行数:43,代码来源:WebsocketTest.java


示例15: anotherBefore

import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Before
public void anotherBefore() {
	ts = AssertSubscriber.create();
	emitter1 = ReplayProcessor.create();
	emitter2 = ReplayProcessor.create();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:7,代码来源:CombinationTests.java



注:本文中的reactor.core.publisher.ReplayProcessor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java IPlatformRunnable类代码示例发布时间:2022-05-23
下一篇:
Java InternalGeoHashGrid类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap