本文整理汇总了Java中reactor.rx.Streams类的典型用法代码示例。如果您正苦于以下问题:Java Streams类的具体用法?Java Streams怎么用?Java Streams使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Streams类属于reactor.rx包,在下文中一共展示了Streams类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: equals
import reactor.rx.Streams; //导入依赖的package包/类
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof RedissonListReactive))
return false;
Stream<Object> e1 = Streams.wrap((Publisher<Object>)iterator());
Stream<Object> e2 = Streams.wrap(((RedissonListReactive<Object>) o).iterator());
Long count = Streams.merge(e1, e2).groupBy(new Function<Object, Object>() {
@Override
public Object apply(Object t) {
return t;
}
}).count().next().poll();
boolean res = count.intValue() == Streams.wrap(size()).next().poll();
res &= count.intValue() == Streams.wrap(((RedissonListReactive<Object>) o).size()).next().poll();
return res;
}
开发者ID:qq1588518,项目名称:JRediClients,代码行数:21,代码来源:RedissonListReactive.java
示例2: mapFunction
import reactor.rx.Streams; //导入依赖的package包/类
private Function<Entry<K, V>, Boolean> mapFunction(final RMapReactive<Object, Object> m) {
return new Function<Map.Entry<K, V>, Boolean>() {
@Override
public Boolean apply(Entry<K, V> e) {
Object key = e.getKey();
Object value = e.getValue();
if (value == null) {
if (!(Streams.create(m.get(key)).next().poll() ==null && Streams.create(m.containsKey(key)).next().poll()))
return false;
} else {
if (!value.equals(Streams.create(m.get(key)).next().poll()))
return false;
}
return true;
}
};
}
开发者ID:qq1588518,项目名称:JRediClients,代码行数:18,代码来源:RedissonMapReactive.java
示例3: echoJsonStreamDecoding
import reactor.rx.Streams; //导入依赖的package包/类
private static void echoJsonStreamDecoding() {
TcpServer<Person, Person> transport = Netty4TcpServer.<Person, Person>create(
0,
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addFirst(
new JsonObjectDecoder(),
new JacksonJsonCodec());
}
});
ReactorTcpServer.create(transport)
.start(connection -> {
connection.log("input")
.observeComplete(v -> LOG.info("Connection input complete"))
.capacity(1)
.consume(person -> {
person = new Person(person.getLastName(), person.getFirstName());
Streams.wrap(connection.writeWith(Streams.just(person))).consume();
});
return Streams.never();
});
}
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:27,代码来源:CodecSample.java
示例4: getHandler
import reactor.rx.Streams; //导入依赖的package包/类
private ReactorChannelHandler<String, String, HttpChannel<String, String>> getHandler() {
return channel -> {
channel.headers()
.entries()
.forEach(
entry1 -> System.out.println(String.format(
"header [%s=>%s]", entry1.getKey(),
entry1.getValue())));
String response;
try {
response = getWebPage("src/main/java/webapp/ws.html");
} catch (IOException e) {
e.printStackTrace();
response = e.getMessage();
}
System.out.println(String.format("%s from thread %s",
response.toString(), Thread.currentThread()));
return channel.writeWith(Streams.just(response
.toString()));
};
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:22,代码来源:ReactorWishesWS.java
示例5: postHandler
import reactor.rx.Streams; //导入依赖的package包/类
private ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> postHandler() {
return channel -> {
channel.headers().entries().forEach(entry -> System.out.println(String.format("header [%s=>%s]", entry.getKey(),
entry.getValue())));
return channel.writeWith(Streams
.wrap(channel)
.take(1)
.log("received")
.flatMap(data -> {
final StringBuilder response = new StringBuilder().append("hello ").append(new String(data.asBytes()));
System.out.println(String.format("%s from thread %s", response.toString(), Thread.currentThread()));
return Streams.just(Buffer.wrap(response.toString()));
}));
};
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:18,代码来源:ReactorNetGetPost.java
示例6: main
import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Environment env = Environment.initialize();
CountDownLatch latch = new CountDownLatch(5);
Dispatcher trreadPoolDispatcher = new ThreadPoolExecutorDispatcher(5, 128);
Consumer<String> consumer = ev -> {
LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
latch.countDown();
};
Consumer<Throwable> errorConsumer = error ->
error.printStackTrace();
// a task is submitted to the thread pool dispatcher
Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
stream.dispatchOn(env).consume(ev -> {
System.out.println(ev);
trreadPoolDispatcher.dispatch(ev, consumer, errorConsumer);
});
latch.await(15, TimeUnit.SECONDS); // Wait for task to execute
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:25,代码来源:ThreaadPoolExecutorDispatcher.java
示例7: main
import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Environment env = Environment.initialize();
CountDownLatch latch = new CountDownLatch(5);
DispatcherSupplier supplier = Environment.newCachedDispatchers(3, "myPool");
Consumer<String> consumer = ev -> {
LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
latch.countDown();
};
Consumer<Throwable> errorConsumer = error ->
error.printStackTrace();
// a task is submitted to the thread pool dispatcher
Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
stream.dispatchOn(env).partition().consume(
groupStream ->
groupStream.dispatchOn(supplier.get()).consume(consumer, errorConsumer)
);
latch.await(15, TimeUnit.SECONDS); // Wait for task to execute
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:24,代码来源:DispatcherSupplierPartitioning.java
示例8: main
import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Environment env = Environment.initialize();
Processor<String, String> p = RingBufferProcessor.create("testProcessor", 32);
Stream<String> s1 = Streams.wrap(p);
s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev));
s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev));
s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev));
p.onNext("One");
p.onNext("Two");
p.onNext("Three");
p.onComplete();
Environment.terminate();
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:19,代码来源:RingBufferProcessorDemo.java
示例9: main
import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Environment env = Environment.initialize();
Processor<String, String> p = RingBufferWorkProcessor.create("testProcessor", 32);
Stream<String> s1 = Streams.wrap(p);
s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev));
s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev));
s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev));
p.onNext("One");
p.onNext("Two");
p.onNext("Three");
p.onNext("Four");
p.onNext("Five");
p.onComplete();
Environment.terminate();
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:21,代码来源:RingBufferWorkProcessorDemo.java
示例10: getHandler
import reactor.rx.Streams; //导入依赖的package包/类
private ReactorChannelHandler<String, String, HttpChannel<String, String>> getHandler() {
return channel -> {
// channel.headers()
// .entries()
// .forEach(
// entry1 -> System.out.println(String.format(
// "header [%s=>%s]", entry1.getKey(),
// entry1.getValue())));
// System.out.println(channel.uri());
String uri = channel.uri();
if (uri.equals("/"))
uri = "/index.html";
String path = "src/main/webapp" + uri;
String response;
try {
response = getStaticResource(path);
} catch (IOException e) {
e.printStackTrace();
response = e.getMessage();
}
return channel.writeWith(Streams.just(response
.toString()));
};
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:27,代码来源:ReactorWishesWS.java
示例11: process
import reactor.rx.Streams; //导入依赖的package包/类
@Override
public Stream<Tuple> process(Stream<String> stream) {
return stream.flatMap(tweet -> {
JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text");
return Streams.from(array.toArray(new String[array.size()]));
})
.map(w -> reactor.fn.tuple.Tuple.of(w, 1))
.window(timeWindow, SECONDS)
.flatMap(s -> BiStreams.reduceByKey(s, (acc, next) -> acc + next)
.sort((a, b) -> -a.t2.compareTo(b.t2))
.take(topN))
.map(entry -> tuple().of("hashtag", entry.t1, "count", entry.t2));
}
开发者ID:spring-projects,项目名称:spring-xd-samples,代码行数:17,代码来源:TopTags.java
示例12: getKeysByPattern
import reactor.rx.Streams; //导入依赖的package包/类
@Override
public Publisher<String> getKeysByPattern(final String pattern) {
List<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
publishers.add(createKeysIterator(entry, pattern));
}
return Streams.merge(publishers);
}
开发者ID:qq1588518,项目名称:JRediClients,代码行数:9,代码来源:RedissonKeysReactive.java
示例13: sync
import reactor.rx.Streams; //导入依赖的package包/类
public static <V> V sync(Publisher<V> ob) {
Promise<V> promise;
if (Promise.class.isAssignableFrom(ob.getClass())) {
promise = (Promise<V>) ob;
} else {
promise = Streams.wrap(ob).next();
}
V val = promise.poll();
if (promise.isError()) {
throw new RuntimeException(promise.reason());
}
return val;
}
开发者ID:qq1588518,项目名称:JRediClients,代码行数:15,代码来源:BaseReactiveTest.java
示例14: createProcessor
import reactor.rx.Streams; //导入依赖的package包/类
@PostConstruct
private void createProcessor() {
processor = RingBufferProcessor.create();
Stream stream = Streams.wrap(processor);
stream.buffer(1, TimeUnit.SECONDS).consume(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> repositoryIds) {
for (Long repositoryId : Sets.newHashSet(repositoryIds)) {
setRepositoryStatsOutOfDate(repositoryId);
}
}
});
}
开发者ID:box,项目名称:mojito,代码行数:14,代码来源:RepositoryStatisticsUpdatedReactor.java
示例15: writeMultipleValues
import reactor.rx.Streams; //导入依赖的package包/类
@Test
public void writeMultipleValues() throws IOException {
Promise<ByteBuf> chunk1 = Promises.success(Unpooled.buffer().writeBytes("This is".getBytes()));
Promise<ByteBuf> chunk2 = Promises.success(Unpooled.buffer().writeBytes(" a test!".getBytes()));
reactorServer.start(connection -> connection.writeWith(Streams.concat(chunk1, chunk2)));
assertEquals("This is a test!", SocketTestUtils.read("localhost", reactorServer.getPort()));
}
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:8,代码来源:ReactorTcpServerTests.java
示例16: echo
import reactor.rx.Streams; //导入依赖的package包/类
/**
* Keep echoing until the client goes away.
*/
private static void echo(TcpServer<ByteBuf, ByteBuf> transport) {
ReactorTcpServer.create(transport)
.startAndAwait(connection -> {
connection.flatMap(inByteBuf -> {
String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
return connection.writeWith(Streams.just(outByteBuf));
}).consume();
return Streams.never();
});
}
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:15,代码来源:ReactorTcpServerSample.java
示例17: echoWithQuitCommand
import reactor.rx.Streams; //导入依赖的package包/类
/**
* Keep echoing until the client sends "quit".
*/
private static void echoWithQuitCommand(TcpServer<ByteBuf, ByteBuf> transport) {
ReactorTcpServer.create(transport)
.start(connection -> connection
.map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
.takeWhile(input -> !"quit".equalsIgnoreCase(input.trim()))
.filter(input -> !"quit".equalsIgnoreCase(input.trim()))
.map(input -> "Hello " + input)
.flatMap(text -> connection.writeWith(
Streams.just(Unpooled.buffer().writeBytes(text.getBytes()))
)
)
.after()
);
}
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:18,代码来源:ReactorTcpServerSample.java
示例18: runLineBasedFrameDecoder
import reactor.rx.Streams; //导入依赖的package包/类
private static void runLineBasedFrameDecoder() {
TcpServer<String, String> transport = Netty4TcpServer.<String, String>create(
0,
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
int bufferSize = 1;
ChannelConfig config = channel.config();
config.setOption(ChannelOption.SO_RCVBUF, bufferSize);
config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize));
channel.pipeline().addFirst(
new LineBasedFrameDecoder(256),
new StringDecoder(CharsetUtil.UTF_8),
new StringEncoder(CharsetUtil.UTF_8));
}
});
ReactorTcpServer.create(transport).start(connection -> {
connection.log("input")
.observeComplete(v -> LOG.info("Connection input complete"))
.capacity(1)
.consume(line -> {
String response = "Hello " + line + "\n";
Streams.wrap(connection.writeWith(Streams.just(response))).consume();
});
return Streams.never();
});
}
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:30,代码来源:CodecSample.java
示例19: TaxiStream
import reactor.rx.Streams; //导入依赖的package包/类
public TaxiStream(String fileName) {
this.fileName = fileName;
// create a Processor with an internal RingBuffer capacity of 32 slots
this.tripsProcessor = RingBufferProcessor.create("trips", 32);
// create a Reactor Stream from this Reactive Streams Processor
this.trips = Streams.wrap(this.tripsProcessor);
}
开发者ID:jeryini,项目名称:TaxiManagementSystem,代码行数:10,代码来源:TaxiStream.java
示例20: main
import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String... args) throws InterruptedException {
Environment.initialize();
Streams.zip(
Streams.range(0, 3),
Streams.from(new String[]{"Hello", "from", "Reactor", "Websocket"}),
t2 -> t2).throttle(3000)
.consume( t2 -> System.out.println(t2.getT1() + ": " + t2.getT2()));
Thread.sleep(500);
}
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:12,代码来源:ZipStreamsDemo.java
注:本文中的reactor.rx.Streams类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论