本文整理汇总了Java中reactor.fn.Consumer类的典型用法代码示例。如果您正苦于以下问题:Java Consumer类的具体用法?Java Consumer怎么用?Java Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Consumer类属于reactor.fn包,在下文中一共展示了Consumer类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: apply
import reactor.fn.Consumer; //导入依赖的package包/类
@Override
public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
Promise<Void> closePromise = Promises.prepare();
this.connectionHandler.afterConnected(new Reactor2TcpConnection<P>(channelStream, closePromise));
channelStream
.finallyDo(new Consumer<Signal<Message<P>>>() {
@Override
public void accept(Signal<Message<P>> signal) {
if (signal.isOnError()) {
connectionHandler.handleFailure(signal.getThrowable());
}
else if (signal.isOnComplete()) {
connectionHandler.afterConnectionClosed();
}
}
})
.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
});
return closePromise;
}
开发者ID:langtianya,项目名称:spring4-understanding,代码行数:26,代码来源:Reactor2TcpClient.java
示例2: main
import reactor.fn.Consumer; //导入依赖的package包/类
public static void main(String[] args) {
//Initialize context and get default dispatcher
Environment.initializeIfEmpty();
//RingBufferDispatcher with 8192 slots by default
Dispatcher dispatcher = Environment.sharedDispatcher();
//Create a callback
Consumer<Integer> c = data ->
System.out.format("some data arrived: %s\n", data);
//Create an error callback
Consumer<Throwable> errorHandler = ex -> ex.printStackTrace();
//Dispatch data asynchronously
dispatcher.dispatch(1234, c, errorHandler);
dispatcher.dispatch(5678, c, errorHandler);
Environment.terminate();
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:21,代码来源:Example1.java
示例3: main
import reactor.fn.Consumer; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Environment env = Environment.initialize();
CountDownLatch latch = new CountDownLatch(5);
Dispatcher trreadPoolDispatcher = new ThreadPoolExecutorDispatcher(5, 128);
Consumer<String> consumer = ev -> {
LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
latch.countDown();
};
Consumer<Throwable> errorConsumer = error ->
error.printStackTrace();
// a task is submitted to the thread pool dispatcher
Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
stream.dispatchOn(env).consume(ev -> {
System.out.println(ev);
trreadPoolDispatcher.dispatch(ev, consumer, errorConsumer);
});
latch.await(15, TimeUnit.SECONDS); // Wait for task to execute
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:25,代码来源:ThreaadPoolExecutorDispatcher.java
示例4: main
import reactor.fn.Consumer; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Environment env = Environment.initialize();
CountDownLatch latch = new CountDownLatch(5);
DispatcherSupplier supplier = Environment.newCachedDispatchers(3, "myPool");
Consumer<String> consumer = ev -> {
LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
latch.countDown();
};
Consumer<Throwable> errorConsumer = error ->
error.printStackTrace();
// a task is submitted to the thread pool dispatcher
Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
stream.dispatchOn(env).partition().consume(
groupStream ->
groupStream.dispatchOn(supplier.get()).consume(consumer, errorConsumer)
);
latch.await(15, TimeUnit.SECONDS); // Wait for task to execute
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:24,代码来源:DispatcherSupplierPartitioning.java
示例5: init
import reactor.fn.Consumer; //导入依赖的package包/类
@Override
protected void init(VaadinRequest vaadinRequest) {
final VerticalLayout layout = new VerticalLayout();
layout.setMargin(true);
layout.setSpacing(true);
layout.addComponent(labelTime);
setContent(layout);
eventBus.on(Selectors.$(TOPIC), new Consumer<reactor.bus.Event<LocalTime>>() {
@Override
public void accept(reactor.bus.Event<LocalTime> event) {
access(() -> labelTime.setValue(event.getData().format(dateTimeFormatter)));
}
});
}
开发者ID:jangalinski,项目名称:springbootcamp,代码行数:17,代码来源:ApplicationUI.java
示例6: tryDispatch
import reactor.fn.Consumer; //导入依赖的package包/类
public final <E> void tryDispatch(E event, Consumer<E> eventConsumer, Consumer<Throwable> errorConsumer) {
Assert.isTrue(alive(), "This Dispatcher has been shut down.");
boolean isInContext = inContext();
Task task;
if (isInContext) {
task = allocateRecursiveTask();
} else {
try {
task = tryAllocateTask();
} catch (InsufficientCapacityException e) {
throw new IllegalStateException("Error in task allocation");
}
}
task.setData(event).setErrorConsumer(errorConsumer).setEventConsumer(eventConsumer);
if (!isInContext) {
execute(task);
}
}
开发者ID:muoncore,项目名称:muon-java,代码行数:21,代码来源:RingBufferLocalDispatcher.java
示例7: dispatch
import reactor.fn.Consumer; //导入依赖的package包/类
public final <E> void dispatch(E event, Consumer<E> eventConsumer, Consumer<Throwable> errorConsumer) {
Assert.isTrue(alive(), "This Dispatcher has been shut down.");
Assert.isTrue(eventConsumer != null, "The signal consumer has not been passed.");
boolean isInContext = inContext();
Task task;
if (isInContext) {
task = allocateRecursiveTask();
} else {
task = allocateTask();
}
task.setData(event).setErrorConsumer(errorConsumer).setEventConsumer(eventConsumer);
if (!isInContext) {
execute(task);
}
}
开发者ID:muoncore,项目名称:muon-java,代码行数:19,代码来源:RingBufferLocalDispatcher.java
示例8: createProcessor
import reactor.fn.Consumer; //导入依赖的package包/类
@PostConstruct
private void createProcessor() {
processor = RingBufferProcessor.create();
Stream stream = Streams.wrap(processor);
stream.buffer(1, TimeUnit.SECONDS).consume(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> repositoryIds) {
for (Long repositoryId : Sets.newHashSet(repositoryIds)) {
setRepositoryStatsOutOfDate(repositoryId);
}
}
});
}
开发者ID:box,项目名称:mojito,代码行数:14,代码来源:RepositoryStatisticsUpdatedReactor.java
示例9: execute
import reactor.fn.Consumer; //导入依赖的package包/类
public void execute(final Runnable command) {
dispatch(null, new Consumer<Object>() {
@Override
public void accept(Object ev) {
command.run();
}
}, null);
}
开发者ID:muoncore,项目名称:muon-java,代码行数:9,代码来源:RingBufferLocalDispatcher.java
示例10: testInitialize
import reactor.fn.Consumer; //导入依赖的package包/类
@Test
public void testInitialize() {
List<FlowConfiguration<?>> flowConfigs = new ArrayList<>();
flowConfigs.add(new StackSyncFlowConfig());
flowConfigs.add(new StackTerminationFlowConfig());
given(this.flowConfigs.stream()).willReturn(flowConfigs.stream());
underTest.init();
verify(reactor, times(1)).on(any(Selector.class), any(Consumer.class));
}
开发者ID:hortonworks,项目名称:cloudbreak,代码行数:10,代码来源:Flow2InitializerTest.java
示例11: tags
import reactor.fn.Consumer; //导入依赖的package包/类
@Test
public void tags() throws IOException {
final Broadcaster<Object> broadcaster = SerializedBroadcaster.create();
Processor processor = new TopTags(1,10);
Stream<?> outputStream = processor.process(broadcaster);
outputStream.consume(new Consumer<Object>() {
@Override
public void accept(Object o) {
System.out.println("processed : " + o);
}
//TODO - expect
// processed : {"id":"55786760-7472-065d-8e62-eb83260948a4","timestamp":1422399628134,"hashtag":"AndroidGames","count":1}
// processed : {"id":"bd99050f-abfa-a239-c09a-f2fe721daafb","timestamp":1422399628182,"hashtag":"Android","count":1}
// processed : {"id":"10ce993c-fd57-322d-efa1-16f810918187","timestamp":1422399628184,"hashtag":"GameInsight","count":1}
});
ClassPathResource resource = new ClassPathResource("tweets.json");
Scanner scanner = new Scanner(resource.getInputStream());
while (scanner.hasNext()) {
String tweet = scanner.nextLine();
broadcaster.onNext(tweet);
//simulateLatency();
}
//System.in.read();
}
开发者ID:spring-projects,项目名称:spring-xd-samples,代码行数:31,代码来源:TopTagsTupleTest.java
示例12: decoder
import reactor.fn.Consumer; //导入依赖的package包/类
@Override
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
return new DecodingFunction(this.stompDecoder, messageConsumer);
}
开发者ID:langtianya,项目名称:spring4-understanding,代码行数:5,代码来源:Reactor2StompCodec.java
示例13: DecodingFunction
import reactor.fn.Consumer; //导入依赖的package包/类
public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
this.decoder = decoder;
this.messageConsumer = next;
}
开发者ID:langtianya,项目名称:spring4-understanding,代码行数:5,代码来源:Reactor2StompCodec.java
示例14: register
import reactor.fn.Consumer; //导入依赖的package包/类
public <E extends ApplicationEvent> void register(final Selector channel, final Consumer<Event<E>> eventListener) {
notNull(eventListener);
notNull(channel);
eventBus.on(channel, eventListener);
}
开发者ID:Omegapoint,项目名称:facepalm,代码行数:6,代码来源:ReactorEventService.java
示例15: decoder
import reactor.fn.Consumer; //导入依赖的package包/类
@Override
public Function<Buffer, Buffer> decoder(Consumer<Buffer> next) {
return null;
}
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:5,代码来源:GpfdistCodec.java
示例16: setEventConsumer
import reactor.fn.Consumer; //导入依赖的package包/类
public Task setEventConsumer(Consumer<?> eventConsumer) {
this.eventConsumer = eventConsumer;
return this;
}
开发者ID:muoncore,项目名称:muon-java,代码行数:5,代码来源:RingBufferLocalDispatcher.java
示例17: setErrorConsumer
import reactor.fn.Consumer; //导入依赖的package包/类
public Task setErrorConsumer(Consumer<Throwable> errorConsumer) {
this.errorConsumer = errorConsumer;
return this;
}
开发者ID:muoncore,项目名称:muon-java,代码行数:5,代码来源:RingBufferLocalDispatcher.java
示例18: RingBufferLocalDispatcher
import reactor.fn.Consumer; //导入依赖的package包/类
/**
* Creates a new {@literal RingBufferDispatcher} with the given {@code name}
* . It will use a {@link RingBuffer} with {@code bufferSize} slots,
* configured with a producer type of {@link ProducerType#MULTI MULTI} and a
* {@link BlockingWaitStrategy blocking wait. A given @param
* uncaughtExceptionHandler} will catch anything not handled e.g. by the
* owning {@code reactor.bus.EventBus} or {@code reactor.rx.Stream}.
*
* @param name
* The name of the dispatcher
* @param bufferSize
* The size to configure the ring buffer with
* @param uncaughtExceptionHandler
* The last resort exception handler
*/
public RingBufferLocalDispatcher(String name, int bufferSize, final Consumer<Throwable> uncaughtExceptionHandler) {
this(name, bufferSize, uncaughtExceptionHandler, ProducerType.MULTI, new BlockingWaitStrategy());
}
开发者ID:muoncore,项目名称:muon-java,代码行数:20,代码来源:RingBufferLocalDispatcher.java
注:本文中的reactor.fn.Consumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论