本文整理汇总了Java中reactor.core.publisher.ReplayProcessor类的典型用法代码示例。如果您正苦于以下问题:Java ReplayProcessor类的具体用法?Java ReplayProcessor怎么用?Java ReplayProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ReplayProcessor类属于reactor.core.publisher包,在下文中一共展示了ReplayProcessor类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: echo
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void echo() throws Exception {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(getUrl("/echo"),
session -> {
logger.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> logger.debug("outbound " + s)).map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doOnSuccessOrError((aVoid, ex) ->
logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
assertEquals(input.collectList().block(Duration.ofMillis(5000)),
output.collectList().block(Duration.ofMillis(5000)));
}
开发者ID:spring-cloud,项目名称:spring-cloud-gateway,代码行数:24,代码来源:WebSocketIntegrationTests.java
示例2: main
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static final void main(String[] args) throws URISyntaxException {
WebSocketClient client = new ReactorNettyWebSocketClient();
// client.execute(new URI("ws://localhost:8080/echo"), (WebSocketSession session) -> {
// session.send().log().;
// });
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
ReplayProcessor<Object> output = ReplayProcessor.create(count);
client.execute(new URI("ws://localhost:8080/echo"),
session -> {
log.debug("Starting to send messages");
return session
.send(input.doOnNext(s -> log.debug("outbound " + s)).map(session::textMessage))
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> log.debug("inbound " + s))
.then()
.doOnTerminate((aVoid, ex) ->
log.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
})
.block(Duration.ofMillis(5000));
// assertEquals(input.collectList().block(Duration.ofMillis(5000)),
// output.collectList().block(Duration.ofMillis(5000)));
// client.execute(new URI("ws://localhost:8080/echo")), session -> {
// session.
// }
// ).blockMillis(5000);
}
开发者ID:hantsy,项目名称:spring-reactive-sample,代码行数:33,代码来源:WebSocketDemoClient.java
示例3: analyticsTest
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void analyticsTest() throws Exception {
ReplayProcessor<Integer> source = ReplayProcessor.create();
long avgTime = 50l;
Mono<Long> result = source
.log("delay")
.publishOn(asyncGroup)
.delayElements(Duration.ofMillis(avgTime))
.elapsed()
.skip(1)
.groupBy(w -> w.getT1())
.flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
.log("elapsed")
.collectSortedList(Comparator.comparing(Tuple2::getT1))
.flatMapMany(Flux::fromIterable)
.reduce(-1L, (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) : next.getT1())
.log("reduced-elapsed")
.cache();
source.subscribe();
for (int j = 0; j < 10; j++) {
source.onNext(1);
}
source.onComplete();
Assert.assertTrue(result.block(Duration.ofSeconds(5)) >= avgTime * 0.6);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FluxTests.java
示例4: combineWithOneElement
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void combineWithOneElement() throws InterruptedException, TimeoutException {
AtomicReference<Object> ref = new AtomicReference<>(null);
Phaser phaser = new Phaser(2);
Flux<Object> s1 = ReplayProcessor.cacheLastOrDefault(new Object())
.publishOn(asyncGroup);
Flux<Object> s2 = ReplayProcessor.cacheLastOrDefault(new Object())
.publishOn(asyncGroup);
// The following works:
//List<Flux<Object>> list = Arrays.collectList(s1);
// The following fails:
List<Flux<Object>> list = Arrays.asList(s1, s2);
Flux.combineLatest(list, t -> t)
.log()
.doOnNext(obj -> {
ref.set(obj);
phaser.arrive();
})
.subscribe();
phaser.awaitAdvanceInterruptibly(phaser.arrive(), 1, TimeUnit.SECONDS);
Assert.assertNotNull(ref.get());
}
开发者ID:reactor,项目名称:reactor-core,代码行数:28,代码来源:FluxTests.java
示例5: whenProcessorIsStreamed
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void whenProcessorIsStreamed() {
// "When a processor is streamed"
// given: "a source composable and a async downstream"
ReplayProcessor<Integer> source = ReplayProcessor.create();
Scheduler scheduler = Schedulers.newParallel("test", 2);
try {
Mono<List<Integer>> res = source.subscribeOn(scheduler)
.delaySubscription(Duration.ofMillis(1L))
.log("streamed")
.map(it -> it * 2)
.buffer()
.publishNext();
res.subscribe();
// when: "the source accepts a value"
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
// then: "the res is passed on"
assertThat(res.block()).containsExactly(2, 4, 6, 8);
}
finally {
scheduler.dispose();
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FluxSpecTests.java
示例6: behaviorProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy behaviorProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create(1), PASS);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例7: serializedBehaviorProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy serializedBehaviorProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create(1).serialize(), PASS);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例8: replayProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy replayProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create(), PASS);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例9: serializedReplayProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy serializedReplayProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create().serialize(), PASS);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例10: safeBehaviorProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeBehaviorProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create(1), WRAP);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例11: safeSerializedBehaviorProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeSerializedBehaviorProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create(1).serialize(), WRAP);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例12: safeReplayProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeReplayProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create(), WRAP);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例13: safeSerializedReplayProcessorProxy
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeSerializedReplayProcessorProxy() {
return new ReactorProcProxy(ReplayProcessor.create().serialize(), WRAP);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例14: duplexEcho
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Test
public void duplexEcho() throws Exception {
int c = 10;
CountDownLatch clientLatch = new CountDownLatch(c);
CountDownLatch serverLatch = new CountDownLatch(c);
FluxProcessor<String, String> server =
ReplayProcessor.<String>create().serialize();
FluxProcessor<String, String> client =
ReplayProcessor.<String>create().serialize();
server.log("server")
.subscribe(v -> serverLatch.countDown());
client.log("client")
.subscribe(v -> clientLatch.countDown());
httpServer = HttpServer.create(0)
.newHandler((in, out) -> out.sendWebsocket((i, o) -> o.sendString(
i.receive()
.asString()
.take(c)
.subscribeWith(server))))
.block(Duration.ofSeconds(30));
Flux.interval(Duration.ofMillis(200))
.map(Object::toString)
.subscribe(client::onNext);
HttpClient.create(httpServer.address()
.getPort())
.ws("/test")
.flatMap(in -> in.receiveWebsocket((i, o) -> o.options(opt -> opt.flushOnEach())
.sendString(i.receive()
.asString()
.subscribeWith(
client))))
.subscribe();
Assert.assertTrue(serverLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue(clientLatch.await(10, TimeUnit.SECONDS));
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:43,代码来源:WebsocketTest.java
示例15: anotherBefore
import reactor.core.publisher.ReplayProcessor; //导入依赖的package包/类
@Before
public void anotherBefore() {
ts = AssertSubscriber.create();
emitter1 = ReplayProcessor.create();
emitter2 = ReplayProcessor.create();
}
开发者ID:reactor,项目名称:reactor-core,代码行数:7,代码来源:CombinationTests.java
注:本文中的reactor.core.publisher.ReplayProcessor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论