• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Consumer类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.fn.Consumer的典型用法代码示例。如果您正苦于以下问题:Java Consumer类的具体用法?Java Consumer怎么用?Java Consumer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Consumer类属于reactor.fn包,在下文中一共展示了Consumer类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: apply

import reactor.fn.Consumer; //导入依赖的package包/类
@Override
public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream) {
	Promise<Void> closePromise = Promises.prepare();
	this.connectionHandler.afterConnected(new Reactor2TcpConnection<P>(channelStream, closePromise));
	channelStream
			.finallyDo(new Consumer<Signal<Message<P>>>() {
				@Override
				public void accept(Signal<Message<P>> signal) {
					if (signal.isOnError()) {
						connectionHandler.handleFailure(signal.getThrowable());
					}
					else if (signal.isOnComplete()) {
						connectionHandler.afterConnectionClosed();
					}
				}
			})
			.consume(new Consumer<Message<P>>() {
				@Override
				public void accept(Message<P> message) {
					connectionHandler.handleMessage(message);
				}
			});

	return closePromise;
}
 
开发者ID:langtianya,项目名称:spring4-understanding,代码行数:26,代码来源:Reactor2TcpClient.java


示例2: main

import reactor.fn.Consumer; //导入依赖的package包/类
public static void main(String[] args) {
	//Initialize context and get default dispatcher
	Environment.initializeIfEmpty();

	//RingBufferDispatcher with 8192 slots by default
	Dispatcher dispatcher = Environment.sharedDispatcher();

	//Create a callback
	Consumer<Integer> c = data ->
	        System.out.format("some data arrived: %s\n", data);

	//Create an error callback
	Consumer<Throwable> errorHandler = ex -> ex.printStackTrace();

	//Dispatch data asynchronously
	dispatcher.dispatch(1234, c, errorHandler);
	dispatcher.dispatch(5678, c, errorHandler);

	Environment.terminate();
}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:21,代码来源:Example1.java


示例3: main

import reactor.fn.Consumer; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
	Environment env = Environment.initialize();
	CountDownLatch latch = new CountDownLatch(5);

	Dispatcher trreadPoolDispatcher = new ThreadPoolExecutorDispatcher(5, 128);

	Consumer<String> consumer = ev -> {
	    LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
	    latch.countDown(); 
	};

	Consumer<Throwable> errorConsumer = error ->
	    error.printStackTrace();

	// a task is submitted to the thread pool dispatcher
	Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
	stream.dispatchOn(env).consume(ev -> {
				System.out.println(ev);
				trreadPoolDispatcher.dispatch(ev, consumer, errorConsumer);
			});

	latch.await(15, TimeUnit.SECONDS); // Wait for task to execute

}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:25,代码来源:ThreaadPoolExecutorDispatcher.java


示例4: main

import reactor.fn.Consumer; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
	Environment env = Environment.initialize();
	CountDownLatch latch = new CountDownLatch(5);

	DispatcherSupplier supplier = Environment.newCachedDispatchers(3, "myPool");
	
	Consumer<String> consumer = ev -> {
	    LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
	    latch.countDown(); 
	};

	Consumer<Throwable> errorConsumer = error ->
	    error.printStackTrace();

	// a task is submitted to the thread pool dispatcher
	Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
	stream.dispatchOn(env).partition().consume(
		groupStream -> 
			groupStream.dispatchOn(supplier.get()).consume(consumer, errorConsumer)
	);
	latch.await(15, TimeUnit.SECONDS); // Wait for task to execute

}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:24,代码来源:DispatcherSupplierPartitioning.java


示例5: init

import reactor.fn.Consumer; //导入依赖的package包/类
@Override
protected void init(VaadinRequest vaadinRequest) {

  final VerticalLayout layout = new VerticalLayout();
  layout.setMargin(true);
  layout.setSpacing(true);
  layout.addComponent(labelTime);
  setContent(layout);

  eventBus.on(Selectors.$(TOPIC), new Consumer<reactor.bus.Event<LocalTime>>() {
    @Override
    public void accept(reactor.bus.Event<LocalTime> event) {
      access(() -> labelTime.setValue(event.getData().format(dateTimeFormatter)));
    }
  });
}
 
开发者ID:jangalinski,项目名称:springbootcamp,代码行数:17,代码来源:ApplicationUI.java


示例6: tryDispatch

import reactor.fn.Consumer; //导入依赖的package包/类
public final <E> void tryDispatch(E event, Consumer<E> eventConsumer, Consumer<Throwable> errorConsumer) {
	Assert.isTrue(alive(), "This Dispatcher has been shut down.");
	boolean isInContext = inContext();
	Task task;
	if (isInContext) {
		task = allocateRecursiveTask();
	} else {
		try {
			task = tryAllocateTask();
		} catch (InsufficientCapacityException e) {
			throw new IllegalStateException("Error in task allocation");
		}
	}

	task.setData(event).setErrorConsumer(errorConsumer).setEventConsumer(eventConsumer);

	if (!isInContext) {
		execute(task);
	}
}
 
开发者ID:muoncore,项目名称:muon-java,代码行数:21,代码来源:RingBufferLocalDispatcher.java


示例7: dispatch

import reactor.fn.Consumer; //导入依赖的package包/类
public final <E> void dispatch(E event, Consumer<E> eventConsumer, Consumer<Throwable> errorConsumer) {

		Assert.isTrue(alive(), "This Dispatcher has been shut down.");
		Assert.isTrue(eventConsumer != null, "The signal consumer has not been passed.");
		boolean isInContext = inContext();
		Task task;
		if (isInContext) {
			task = allocateRecursiveTask();
		} else {
			task = allocateTask();
		}

		task.setData(event).setErrorConsumer(errorConsumer).setEventConsumer(eventConsumer);

		if (!isInContext) {
			execute(task);
		}
	}
 
开发者ID:muoncore,项目名称:muon-java,代码行数:19,代码来源:RingBufferLocalDispatcher.java


示例8: createProcessor

import reactor.fn.Consumer; //导入依赖的package包/类
@PostConstruct
private void createProcessor() {
    processor = RingBufferProcessor.create();
    Stream stream = Streams.wrap(processor);
    stream.buffer(1, TimeUnit.SECONDS).consume(new Consumer<List<Long>>() {
        @Override
        public void accept(List<Long> repositoryIds) {
            for (Long repositoryId : Sets.newHashSet(repositoryIds)) {
                setRepositoryStatsOutOfDate(repositoryId);
            }
        }
    });
}
 
开发者ID:box,项目名称:mojito,代码行数:14,代码来源:RepositoryStatisticsUpdatedReactor.java


示例9: execute

import reactor.fn.Consumer; //导入依赖的package包/类
public void execute(final Runnable command) {
	dispatch(null, new Consumer<Object>() {
		@Override
		public void accept(Object ev) {
			command.run();
		}
	}, null);
}
 
开发者ID:muoncore,项目名称:muon-java,代码行数:9,代码来源:RingBufferLocalDispatcher.java


示例10: testInitialize

import reactor.fn.Consumer; //导入依赖的package包/类
@Test
public void testInitialize() {
    List<FlowConfiguration<?>> flowConfigs = new ArrayList<>();
    flowConfigs.add(new StackSyncFlowConfig());
    flowConfigs.add(new StackTerminationFlowConfig());
    given(this.flowConfigs.stream()).willReturn(flowConfigs.stream());
    underTest.init();
    verify(reactor, times(1)).on(any(Selector.class), any(Consumer.class));
}
 
开发者ID:hortonworks,项目名称:cloudbreak,代码行数:10,代码来源:Flow2InitializerTest.java


示例11: tags

import reactor.fn.Consumer; //导入依赖的package包/类
@Test
    public void tags() throws IOException {

        final Broadcaster<Object> broadcaster = SerializedBroadcaster.create();

        Processor processor = new TopTags(1,10);
        Stream<?> outputStream = processor.process(broadcaster);


        outputStream.consume(new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                System.out.println("processed : " + o);
            }
            //TODO - expect
//            processed : {"id":"55786760-7472-065d-8e62-eb83260948a4","timestamp":1422399628134,"hashtag":"AndroidGames","count":1}
//            processed : {"id":"bd99050f-abfa-a239-c09a-f2fe721daafb","timestamp":1422399628182,"hashtag":"Android","count":1}
//            processed : {"id":"10ce993c-fd57-322d-efa1-16f810918187","timestamp":1422399628184,"hashtag":"GameInsight","count":1}
        });

        ClassPathResource resource = new ClassPathResource("tweets.json");
        Scanner scanner = new Scanner(resource.getInputStream());
        while (scanner.hasNext()) {
            String tweet = scanner.nextLine();
            broadcaster.onNext(tweet);
            //simulateLatency();
        }
        //System.in.read();

    }
 
开发者ID:spring-projects,项目名称:spring-xd-samples,代码行数:31,代码来源:TopTagsTupleTest.java


示例12: decoder

import reactor.fn.Consumer; //导入依赖的package包/类
@Override
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) {
	return new DecodingFunction(this.stompDecoder, messageConsumer);
}
 
开发者ID:langtianya,项目名称:spring4-understanding,代码行数:5,代码来源:Reactor2StompCodec.java


示例13: DecodingFunction

import reactor.fn.Consumer; //导入依赖的package包/类
public DecodingFunction(StompDecoder decoder, Consumer<Message<byte[]>> next) {
	this.decoder = decoder;
	this.messageConsumer = next;
}
 
开发者ID:langtianya,项目名称:spring4-understanding,代码行数:5,代码来源:Reactor2StompCodec.java


示例14: register

import reactor.fn.Consumer; //导入依赖的package包/类
public <E extends ApplicationEvent> void register(final Selector channel, final Consumer<Event<E>> eventListener) {
    notNull(eventListener);
    notNull(channel);
    eventBus.on(channel, eventListener);
}
 
开发者ID:Omegapoint,项目名称:facepalm,代码行数:6,代码来源:ReactorEventService.java


示例15: decoder

import reactor.fn.Consumer; //导入依赖的package包/类
@Override
public Function<Buffer, Buffer> decoder(Consumer<Buffer> next) {
	return null;
}
 
开发者ID:spring-cloud,项目名称:spring-cloud-stream-app-starters,代码行数:5,代码来源:GpfdistCodec.java


示例16: setEventConsumer

import reactor.fn.Consumer; //导入依赖的package包/类
public Task setEventConsumer(Consumer<?> eventConsumer) {
	this.eventConsumer = eventConsumer;
	return this;
}
 
开发者ID:muoncore,项目名称:muon-java,代码行数:5,代码来源:RingBufferLocalDispatcher.java


示例17: setErrorConsumer

import reactor.fn.Consumer; //导入依赖的package包/类
public Task setErrorConsumer(Consumer<Throwable> errorConsumer) {
	this.errorConsumer = errorConsumer;
	return this;
}
 
开发者ID:muoncore,项目名称:muon-java,代码行数:5,代码来源:RingBufferLocalDispatcher.java


示例18: RingBufferLocalDispatcher

import reactor.fn.Consumer; //导入依赖的package包/类
/**
 * Creates a new {@literal RingBufferDispatcher} with the given {@code name}
 * . It will use a {@link RingBuffer} with {@code bufferSize} slots,
 * configured with a producer type of {@link ProducerType#MULTI MULTI} and a
 * {@link BlockingWaitStrategy blocking wait. A given @param
 * uncaughtExceptionHandler} will catch anything not handled e.g. by the
 * owning {@code reactor.bus.EventBus} or {@code reactor.rx.Stream}.
 *
 * @param name
 *            The name of the dispatcher
 * @param bufferSize
 *            The size to configure the ring buffer with
 * @param uncaughtExceptionHandler
 *            The last resort exception handler
 */
public RingBufferLocalDispatcher(String name, int bufferSize, final Consumer<Throwable> uncaughtExceptionHandler) {
	this(name, bufferSize, uncaughtExceptionHandler, ProducerType.MULTI, new BlockingWaitStrategy());

}
 
开发者ID:muoncore,项目名称:muon-java,代码行数:20,代码来源:RingBufferLocalDispatcher.java



注:本文中的reactor.fn.Consumer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Image类代码示例发布时间:2022-05-21
下一篇:
Java CanExecute类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap