本文整理汇总了Java中reactor.core.publisher.TopicProcessor类的典型用法代码示例。如果您正苦于以下问题:Java TopicProcessor类的具体用法?Java TopicProcessor怎么用?Java TopicProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TopicProcessor类属于reactor.core.publisher包,在下文中一共展示了TopicProcessor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: setupFakeProtocolListener
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
private void setupFakeProtocolListener() throws Exception {
broadcaster = TopicProcessor.create();
final Processor<List<String>, List<String>> processor =
WorkQueueProcessor.<List<String>>builder().autoCancel(false).build();
Flux.from(broadcaster)
.buffer(5)
.subscribe(processor);
httpServer = HttpServer.create(0)
.newRouter(r -> r.get("/data",
(req, resp) -> resp.options(NettyPipeline.SendOptions::flushOnEach)
.send(Flux.from(processor)
.log("server")
.timeout(Duration.ofSeconds(
2),
Flux.empty())
.concatWith(Flux.just(
new ArrayList<>()))
.map(new DummyListEncoder(
resp.alloc()
)))))
.block(Duration.ofSeconds(30));
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:24,代码来源:ClientServerHttpTests.java
示例2: RobotWSService
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public RobotWSService(PositionsFlux positions, MovementCommandSubscriber movementSubscriber)
throws UnknownHostException {
this.positions = positions;
this.movements = movementSubscriber;
try {
ipAddress = findMyInetAddress();
} catch (SocketException e1) {
e1.printStackTrace();
}
try {
setup();
movementCommands = TopicProcessor.create();
movementCommands.subscribe(movementSubscriber);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
开发者ID:iproduct,项目名称:course-social-robotics,代码行数:21,代码来源:RobotWSService.java
示例3: afterPropertiesSet
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Override
public void afterPropertiesSet() throws Exception {
if (!isShared()) {
this.dispatcher = TopicProcessor.create(
getName(),
getBacklog(),
(null != waitStrategy ? waitStrategy : WaitStrategy.blocking())
);
} else {
this.dispatcher = TopicProcessor.share(
getName(),
getBacklog(),
(null != waitStrategy ? waitStrategy : WaitStrategy.blocking())
);
}
if (isAutoStartup()) {
start();
}
}
开发者ID:reactor,项目名称:reactor-spring,代码行数:20,代码来源:RingBufferAsyncTaskExecutor.java
示例4: sensorOdd
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public Flux<SensorData> sensorOdd() {
if (sensorOdd == null) {
// this is the stream we publish odd-numbered events to
this.sensorOdd = TopicProcessor.<SensorData>builder().name("odd").build();
// add substream to "master" list
//allSensors().add(sensorOdd.reduce(this::computeMin).timeout(1000));
}
return sensorOdd.log("odd");
}
开发者ID:reactor,项目名称:reactor-core,代码行数:12,代码来源:CombinationTests.java
示例5: sensorEven
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public Flux<SensorData> sensorEven() {
if (sensorEven == null) {
// this is the stream we publish even-numbered events to
this.sensorEven = TopicProcessor.<SensorData>builder().name("even").build();
// add substream to "master" list
//allSensors().add(sensorEven.reduce(this::computeMin).timeout(1000));
}
return sensorEven.log("even");
}
开发者ID:reactor,项目名称:reactor-core,代码行数:11,代码来源:CombinationTests.java
示例6: indexBugTest
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test
public void indexBugTest() throws InterruptedException {
int numOfItems = 20;
//this line causes an java.lang.ArrayIndexOutOfBoundsException unless there is a break point in ZipAction
// .createSubscriber()
TopicProcessor<String> ring = TopicProcessor.<String>builder().name("test").bufferSize(1024).build();
// EmitterProcessor<String> ring = EmitterProcessor.create();
Flux<String> stream2 = ring
.zipWith(Mono.fromCallable(System::currentTimeMillis).repeat(), (t1, t2) ->
String.format("%s : %s", t1, t2));
Mono<List<String>> p = stream2
.doOnNext(System.out::println)
.buffer(numOfItems)
.next();
for (int curr = 0; curr < numOfItems; curr++) {
if (curr % 5 == 0 && curr % 3 == 0) ring.onNext("FizBuz"+curr);
else if (curr % 3 == 0) ring.onNext("Fiz"+curr);
else if (curr % 5 == 0) ring.onNext("Buz"+curr);
else ring.onNext(String.valueOf(curr));
}
Assert.assertTrue("Has not returned list", p.block(Duration.ofSeconds(5)) !=
null);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FizzBuzzTests.java
示例7: testDiamond
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test
@Ignore
public void testDiamond() throws InterruptedException, IOException {
Flux<Point> points = Flux.<Double, Random>generate(Random::new, (r, sub) -> {
sub.next(r.nextDouble());
return r;
}).log("points")
.buffer(2)
.map(pairs -> new Point(pairs.get(0), pairs.get(1)))
.subscribeWith(TopicProcessor.<Point>builder().name("tee").bufferSize(32).build());
Flux<InnerSample> innerSamples = points.log("inner-1")
.filter(Point::isInner)
.map(InnerSample::new)
.log("inner-2");
Flux<OuterSample> outerSamples = points.log("outer-1")
.filter(p -> !p.isInner())
.map(OuterSample::new)
.log("outer-2");
Flux.merge(innerSamples, outerSamples)
.publishOn(asyncGroup)
.scan(new SimulationState(0l, 0l), SimulationState::withNextSample)
.log("result")
.map(s -> System.out.printf("After %8d samples π is approximated as %.5f", s.totalSamples, s.pi()))
.take(10000)
.subscribe();
System.in.read();
}
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FluxTests.java
示例8: multiplexUsingProcessors
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test(timeout = TIMEOUT)
public void multiplexUsingProcessors() throws Exception {
final Flux<Integer> forkStream = Flux.just(1, 2, 3)
.log("begin-computation");
final Flux<Integer> forkStream2 = Flux.just(1, 2, 3)
.log("begin-persistence");
final TopicProcessor<Integer> computationEmitterProcessor = TopicProcessor.<Integer>builder()
.name("computation")
.bufferSize(BACKLOG)
.build();
final Flux<String> computationStream = computationEmitterProcessor
.map(i -> Integer.toString(i));
final TopicProcessor<Integer> persistenceEmitterProcessor = TopicProcessor.<Integer>builder()
.name("persistence")
.bufferSize(BACKLOG)
.build();
final Flux<String> persistenceStream = persistenceEmitterProcessor
.map(i -> "done " + i);
forkStream.subscribe(computationEmitterProcessor);
forkStream2.subscribe(persistenceEmitterProcessor);
final Semaphore doneSemaphore = new Semaphore(0);
final Flux<List<String>> joinStream =
Flux.zip(computationStream.log("log1"), persistenceStream.log("log2"), (a, b) -> Arrays.asList(a,b));
// Method chaining doesn't compile.
joinStream.log("log-final")
.subscribe(list -> println("Joined: ", list), t -> println("Join failed: ", t.getMessage()), () -> {
println("Join complete.");
doneSemaphore.release();
});
doneSemaphore.acquire();
}
开发者ID:reactor,项目名称:reactor-core,代码行数:41,代码来源:FluxTests.java
示例9: createIdentityProcessor
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
Flux<String> otherStream = Flux.just("test", "test2", "test3");
// System.out.println("Providing new downstream");
FluxProcessor<Long, Long> p =
WorkQueueProcessor.<Long>builder().name("fluxion-raw-fork").bufferSize(bufferSize).build();
cumulated.set(0);
cumulatedJoin.set(0);
BiFunction<Long, String, Long> combinator = (t1, t2) -> t1;
return FluxProcessor.wrap(p,
p.groupBy(k -> k % 2 == 0)
.flatMap(stream -> stream.scan((prev, next) -> next)
.map(integer -> -integer)
.filter(integer -> integer <= 0)
.map(integer -> -integer)
.bufferTimeout(1024, Duration.ofMillis(50))
.flatMap(Flux::fromIterable)
.doOnNext(array -> cumulated.getAndIncrement())
.flatMap(i -> Flux.zip(Flux.just(i),
otherStream,
combinator)))
.doOnNext(array -> cumulatedJoin.getAndIncrement())
.subscribeWith(TopicProcessor.<Long>builder().name("fluxion-raw-join").bufferSize(bufferSize).build())
.doOnError(Throwable::printStackTrace));
}
开发者ID:reactor,项目名称:reactor-core,代码行数:28,代码来源:FluxWithProcessorVerification.java
示例10: GroupedTickSubscriber
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public GroupedTickSubscriber(
TickerManager tickerManager,
TopicProcessor<IndicatorJson> indicators,
TopicProcessor<StrategyJson> strategies) {
this.tickerManager = tickerManager;
this.indicators = indicators;
this.strategies = strategies;
logger.info("created GroupedTickSubscriber:{}", toString());
}
开发者ID:the-james-burton,项目名称:the-turbine,代码行数:10,代码来源:GroupedTickSubscriber.java
示例11: ReactorTickSubscriber
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public ReactorTickSubscriber(
String name,
TickerManager tickerManager,
TopicProcessor<IndicatorJson> indicators,
TopicProcessor<StrategyJson> strategies) {
super(name);
this.tickerManager = tickerManager;
this.indicators = indicators;
this.strategies = strategies;
}
开发者ID:the-james-burton,项目名称:the-turbine,代码行数:11,代码来源:ReactorTickSubscriber.java
示例12: RingBufferApplicationEventPublisher
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public RingBufferApplicationEventPublisher(int backlog, boolean autoStartup) {
this.autoStartup = autoStartup;
this.processor = TopicProcessor.share("ringBufferAppEventPublisher", backlog);
if(autoStartup) {
start();
}
}
开发者ID:reactor,项目名称:reactor-spring,代码行数:10,代码来源:RingBufferApplicationEventPublisher.java
示例13: ReactorSubscribableChannel
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
/**
* Create a {@literal ReactorSubscribableChannel} with a {@code ProducerType.SINGLE} if {@code
* singleThreadedProducer} is {@code true}, otherwise use {@code ProducerType.MULTI}.
*
* @param singleThreadedProducer whether to create a single-threaded producer or not
*/
public ReactorSubscribableChannel(boolean singleThreadedProducer) {
this.beanName = String.format("%[email protected]%s", getClass().getSimpleName(), ObjectUtils.getIdentityHexString(this));
if (singleThreadedProducer) {
this.processor = TopicProcessor.create();
} else {
this.processor = TopicProcessor.share();
}
}
开发者ID:reactor,项目名称:reactor-spring,代码行数:15,代码来源:ReactorSubscribableChannel.java
示例14: topicProcessorProxy
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy topicProcessorProxy() {
return new ReactorProcProxy(TopicProcessor.create(), PASS);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例15: serializedTopicProcessorProxy
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy serializedTopicProcessorProxy() {
return new ReactorProcProxy(TopicProcessor.create().serialize(), PASS);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例16: safeTopicProcessorProxy
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeTopicProcessorProxy() {
return new ReactorProcProxy(TopicProcessor.create(), WRAP);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例17: safeSerializedTopicProcessorProxy
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeSerializedTopicProcessorProxy() {
return new ReactorProcProxy(TopicProcessor.create().serialize(), WRAP);
}
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java
示例18: setupFakeProtocolListener
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
private void setupFakeProtocolListener() throws Exception {
processor = TopicProcessor.<ByteBuf>builder().autoCancel(false).build();
workProcessor = WorkQueueProcessor.<ByteBuf>builder().autoCancel(false).build();
Flux<ByteBuf> bufferStream = Flux.from(processor)
.window(windowBatch)
.doOnNext(d -> windows.getAndIncrement())
.flatMap(s -> s.take(Duration.ofSeconds(2))
.reduceWith(Unpooled::buffer,
ByteBuf::writeBytes))
.doOnNext(d -> postReduce.getAndIncrement())
//.log("log", LogOperator.REQUEST)
.subscribeWith(workProcessor);
httpServer = HttpServer.create(opts -> opts.port(port))
.newHandler((request, response) -> {
response.chunkedTransfer(false);
return response.addHeader("Content-type", "text/plain")
.addHeader("Expires", "0")
.addHeader("X-GPFDIST-VERSION",
"Spring XD")
.addHeader("X-GP-PROTO", "1")
.addHeader("Cache-Control", "no-cache")
.addHeader("Connection", "close")
.options(NettyPipeline.SendOptions::flushOnEach)
.send(bufferStream.doOnNext(d -> integer.getAndIncrement())
.take(takeCount)
.doOnNext(d -> integerPostTake.getAndIncrement())
.timeout(Duration.ofSeconds(2), Flux.empty())
.doOnNext(d -> integerPostTimeout.getAndIncrement())
.concatWith(Flux.just(
dummy ?
Unpooled.copiedBuffer(
new byte[0]) :
Unpooled.copiedBuffer(
"END".getBytes()))
.doOnComplete(
integerPostConcat::decrementAndGet))//END
.doOnNext(d -> integerPostConcat.getAndIncrement())
.map(dummy ?
Function.identity() :
new GpdistCodec(
response
.alloc())));
})
.block(Duration.ofSeconds(30));
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:50,代码来源:SmokeTests.java
示例19: konamiCode
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test
public void konamiCode() throws InterruptedException {
final TopicProcessor<Integer> keyboardStream = TopicProcessor.create();
Mono<List<Boolean>> konamis = keyboardStream
.skipWhile(key -> KeyEvent.VK_UP != key)
.buffer(10, 1)
.map(keys -> keys.size() == 10 &&
keys.get(0) == KeyEvent.VK_UP &&
keys.get(1) == KeyEvent.VK_UP &&
keys.get(2) == KeyEvent.VK_DOWN &&
keys.get(3) == KeyEvent.VK_DOWN &&
keys.get(4) == KeyEvent.VK_LEFT &&
keys.get(5) == KeyEvent.VK_RIGHT &&
keys.get(6) == KeyEvent.VK_LEFT &&
keys.get(7) == KeyEvent.VK_RIGHT &&
keys.get(8) == KeyEvent.VK_B &&
keys.get(9) == KeyEvent.VK_A)
.collectList();
keyboardStream.onNext(KeyEvent.VK_UP);
keyboardStream.onNext(KeyEvent.VK_UP);
keyboardStream.onNext(KeyEvent.VK_UP);
keyboardStream.onNext(KeyEvent.VK_DOWN);
keyboardStream.onNext(KeyEvent.VK_DOWN);
keyboardStream.onNext(KeyEvent.VK_LEFT);
keyboardStream.onNext(KeyEvent.VK_RIGHT);
keyboardStream.onNext(KeyEvent.VK_LEFT);
keyboardStream.onNext(KeyEvent.VK_RIGHT);
keyboardStream.onNext(KeyEvent.VK_B);
keyboardStream.onNext(KeyEvent.VK_A);
keyboardStream.onNext(KeyEvent.VK_C);
keyboardStream.onComplete();
List<Boolean> res = konamis.block();
Assert.assertTrue(res.size() == 12);
Assert.assertFalse(res.get(0));
Assert.assertTrue(res.get(1));
Assert.assertFalse(res.get(2));
Assert.assertFalse(res.get(3));
Assert.assertFalse(res.get(4));
Assert.assertFalse(res.get(5));
Assert.assertFalse(res.get(6));
Assert.assertFalse(res.get(7));
Assert.assertFalse(res.get(8));
Assert.assertFalse(res.get(9));
Assert.assertFalse(res.get(10));
Assert.assertFalse(res.get(11));
}
开发者ID:reactor,项目名称:reactor-core,代码行数:51,代码来源:FluxTests.java
示例20: setupPipeline
import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
private void setupPipeline() {
processor = TopicProcessor.<String>builder().autoCancel(false).build();
workProcessor = WorkQueueProcessor.<String>builder().autoCancel(false).build();
processor.subscribe(workProcessor);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:6,代码来源:ConsistentProcessorTests.java
注:本文中的reactor.core.publisher.TopicProcessor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论